云数据架构:大数据与AI平台设计
目录
引言
在数字化转型的浪潮中,数据已成为企业最宝贵的资产。云数据架构为组织提供了处理海量数据、构建智能应用的强大平台。本文将深入探讨如何在云环境中设计和实施大数据与AI平台,涵盖从数据采集、存储、处理到分析的完整生命周期。
现代云数据架构需要支持多样化的数据源、实时和批处理工作负载、机器学习工作流,同时确保数据安全、治理和合规性。通过合理的架构设计,企业可以构建敏捷、可扩展且成本效益高的数据平台。
数据架构概述
现代数据架构模式
graph TB
subgraph "数据源层"
A[业务系统] --> D[数据采集层]
B[IoT设备] --> D
C[外部API] --> D
E[日志文件] --> D
F[流数据] --> D
end
subgraph "数据采集层"
D --> G[批量ETL]
D --> H[流处理]
D --> I[CDC变更捕获]
end
subgraph "数据存储层"
G --> J[数据湖]
H --> J
I --> J
J --> K[数据仓库]
J --> L[特征存储]
end
subgraph "数据处理层"
K --> M[批处理引擎]
J --> N[流处理引擎]
L --> O[ML训练]
end
subgraph "数据服务层"
M --> P[数据API]
N --> P
O --> Q[模型服务]
P --> R[BI工具]
Q --> S[AI应用]
end
subgraph "数据治理层"
T[元数据管理] -.-> J
T -.-> K
T -.-> L
U[数据质量] -.-> M
U -.-> N
V[数据安全] -.-> P
V -.-> Q
end
数据架构分析器
import json
import time
import uuid
from typing import Dict, Any, List, Optional, Union, Tuple
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
import boto3
import pandas as pd
import numpy as np
from abc import ABC, abstractmethod
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("data-architecture")
class DataSourceType(Enum):
"""数据源类型"""
RELATIONAL_DB = "relational_db"
NOSQL_DB = "nosql_db"
FILE_SYSTEM = "file_system"
STREAMING = "streaming"
API = "api"
IOT = "iot"
LOG = "log"
class DataFormat(Enum):
"""数据格式"""
JSON = "json"
CSV = "csv"
PARQUET = "parquet"
AVRO = "avro"
ORC = "orc"
XML = "xml"
BINARY = "binary"
class ProcessingType(Enum):
"""处理类型"""
BATCH = "batch"
STREAMING = "streaming"
MICRO_BATCH = "micro_batch"
REAL_TIME = "real_time"
@dataclass
class DataSource:
"""数据源定义"""
id: str
name: str
type: DataSourceType
format: DataFormat
location: str
schema: Dict[str, Any]
volume_gb_per_day: float
velocity_records_per_second: int
variety_score: int # 1-10, 数据多样性评分
created_at: datetime
updated_at: datetime
@dataclass
class DataPipeline:
"""数据管道定义"""
id: str
name: str
source_ids: List[str]
processing_type: ProcessingType
transformations: List[Dict[str, Any]]
destination: str
schedule: Optional[str]
sla_minutes: int
created_at: datetime
updated_at: datetime
@dataclass
class DataAsset:
"""数据资产定义"""
id: str
name: str
description: str
owner: str
tags: List[str]
schema: Dict[str, Any]
quality_score: float
usage_frequency: str
business_value: str
created_at: datetime
updated_at: datetime
class DataArchitectureAnalyzer:
"""数据架构分析器"""
def __init__(self, aws_region: str = "us-east-1"):
self.aws_region = aws_region
self.data_sources: Dict[str, DataSource] = {}
self.data_pipelines: Dict[str, DataPipeline] = {}
self.data_assets: Dict[str, DataAsset] = {}
# AWS服务客户端
self.s3 = boto3.client('s3', region_name=aws_region)
self.glue = boto3.client('glue', region_name=aws_region)
self.athena = boto3.client('athena', region_name=aws_region)
self.redshift = boto3.client('redshift', region_name=aws_region)
self.kinesis = boto3.client('kinesis', region_name=aws_region)
def add_data_source(self, data_source: DataSource):
"""添加数据源"""
self.data_sources[data_source.id] = data_source
logger.info(f"Added data source: {data_source.name}")
def add_data_pipeline(self, pipeline: DataPipeline):
"""添加数据管道"""
self.data_pipelines[pipeline.id] = pipeline
logger.info(f"Added data pipeline: {pipeline.name}")
def add_data_asset(self, asset: DataAsset):
"""添加数据资产"""
self.data_assets[asset.id] = asset
logger.info(f"Added data asset: {asset.name}")
def analyze_data_volume(self) -> Dict[str, Any]:
"""分析数据量"""
total_volume = sum(source.volume_gb_per_day for source in self.data_sources.values())
volume_by_type = {}
for source in self.data_sources.values():
source_type = source.type.value
if source_type not in volume_by_type:
volume_by_type[source_type] = 0
volume_by_type[source_type] += source.volume_gb_per_day
return {
"total_volume_gb_per_day": total_volume,
"volume_by_type": volume_by_type,
"projected_monthly_volume_gb": total_volume * 30,
"projected_yearly_volume_tb": (total_volume * 365) / 1024
}
def analyze_data_velocity(self) -> Dict[str, Any]:
"""分析数据速度"""
total_velocity = sum(source.velocity_records_per_second for source in self.data_sources.values())
velocity_by_type = {}
for source in self.data_sources.values():
source_type = source.type.value
if source_type not in velocity_by_type:
velocity_by_type[source_type] = 0
velocity_by_type[source_type] += source.velocity_records_per_second
# 分类速度级别
speed_categories = {
"low": 0, # < 100 records/sec
"medium": 0, # 100-1000 records/sec
"high": 0, # 1000-10000 records/sec
"very_high": 0 # > 10000 records/sec
}
for source in self.data_sources.values():
velocity = source.velocity_records_per_second
if velocity < 100:
speed_categories["low"] += 1
elif velocity < 1000:
speed_categories["medium"] += 1
elif velocity < 10000:
speed_categories["high"] += 1
else:
speed_categories["very_high"] += 1
return {
"total_velocity_records_per_second": total_velocity,
"velocity_by_type": velocity_by_type,
"speed_distribution": speed_categories,
"peak_daily_records": total_velocity * 86400
}
def analyze_data_variety(self) -> Dict[str, Any]:
"""分析数据多样性"""
format_distribution = {}
type_distribution = {}
variety_scores = []
for source in self.data_sources.values():
# 格式分布
format_name = source.format.value
if format_name not in format_distribution:
format_distribution[format_name] = 0
format_distribution[format_name] += 1
# 类型分布
type_name = source.type.value
if type_name not in type_distribution:
type_distribution[type_name] = 0
type_distribution[type_name] += 1
# 多样性评分
variety_scores.append(source.variety_score)
avg_variety_score = np.mean(variety_scores) if variety_scores else 0
return {
"format_distribution": format_distribution,
"type_distribution": type_distribution,
"average_variety_score": avg_variety_score,
"total_data_sources": len(self.data_sources),
"unique_formats": len(format_distribution),
"unique_types": len(type_distribution)
}
def analyze_pipeline_complexity(self) -> Dict[str, Any]:
"""分析管道复杂性"""
processing_type_distribution = {}
transformation_complexity = []
sla_distribution = {"under_1h": 0, "1h_to_4h": 0, "4h_to_24h": 0, "over_24h": 0}
for pipeline in self.data_pipelines.values():
# 处理类型分布
proc_type = pipeline.processing_type.value
if proc_type not in processing_type_distribution:
processing_type_distribution[proc_type] = 0
processing_type_distribution[proc_type] += 1
# 转换复杂性
transformation_complexity.append(len(pipeline.transformations))
# SLA分布
sla_hours = pipeline.sla_minutes / 60
if sla_hours < 1:
sla_distribution["under_1h"] += 1
elif sla_hours < 4:
sla_distribution["1h_to_4h"] += 1
elif sla_hours < 24:
sla_distribution["4h_to_24h"] += 1
else:
sla_distribution["over_24h"] += 1
avg_transformations = np.mean(transformation_complexity) if transformation_complexity else 0
return {
"total_pipelines": len(self.data_pipelines),
"processing_type_distribution": processing_type_distribution,
"average_transformations_per_pipeline": avg_transformations,
"sla_distribution": sla_distribution,
"complex_pipelines": len([p for p in self.data_pipelines.values()
if len(p.transformations) > 5])
}
def generate_architecture_recommendations(self) -> Dict[str, Any]:
"""生成架构建议"""
volume_analysis = self.analyze_data_volume()
velocity_analysis = self.analyze_data_velocity()
variety_analysis = self.analyze_data_variety()
pipeline_analysis = self.analyze_pipeline_complexity()
recommendations = {
"storage_recommendations": [],
"processing_recommendations": [],
"architecture_patterns": [],
"technology_stack": [],
"cost_optimization": [],
"performance_optimization": []
}
# 存储建议
total_volume = volume_analysis["total_volume_gb_per_day"]
if total_volume > 1000: # > 1TB/day
recommendations["storage_recommendations"].append({
"type": "data_lake",
"reason": "大数据量需要可扩展的数据湖架构",
"technology": ["Amazon S3", "Azure Data Lake", "Google Cloud Storage"]
})
if variety_analysis["unique_formats"] > 3:
recommendations["storage_recommendations"].append({
"type": "multi_format_support",
"reason": "多种数据格式需要灵活的存储方案",
"technology": ["Apache Parquet", "Delta Lake", "Apache Iceberg"]
})
# 处理建议
total_velocity = velocity_analysis["total_velocity_records_per_second"]
if total_velocity > 1000:
recommendations["processing_recommendations"].append({
"type": "stream_processing",
"reason": "高速数据流需要实时处理能力",
"technology": ["Apache Kafka", "Amazon Kinesis", "Apache Flink"]
})
if pipeline_analysis["complex_pipelines"] > 0:
recommendations["processing_recommendations"].append({
"type": "workflow_orchestration",
"reason": "复杂管道需要工作流编排",
"technology": ["Apache Airflow", "AWS Step Functions", "Azure Data Factory"]
})
# 架构模式建议
if (total_volume > 100 and total_velocity > 100 and
variety_analysis["average_variety_score"] > 5):
recommendations["architecture_patterns"].append({
"pattern": "lambda_architecture",
"description": "批处理和流处理混合架构",
"use_case": "处理大量、高速、多样化数据"
})
if variety_analysis["unique_types"] > 5:
recommendations["architecture_patterns"].append({
"pattern": "data_mesh",
"description": "去中心化数据架构",
"use_case": "多域数据管理和治理"
})
return recommendations
def generate_cost_estimation(self) -> Dict[str, Any]:
"""生成成本估算"""
volume_analysis = self.analyze_data_volume()
velocity_analysis = self.analyze_data_velocity()
# 存储成本估算(基于AWS S3定价)
monthly_volume_gb = volume_analysis["projected_monthly_volume_gb"]
storage_cost_per_gb = 0.023 # AWS S3标准存储
monthly_storage_cost = monthly_volume_gb * storage_cost_per_gb
# 计算成本估算(基于数据处理量)
daily_records = velocity_analysis["peak_daily_records"]
processing_cost_per_million_records = 1.0 # 假设成本
monthly_processing_cost = (daily_records * 30 / 1000000) * processing_cost_per_million_records
# 传输成本估算
monthly_transfer_cost = monthly_volume_gb * 0.09 # 数据传输成本
total_monthly_cost = monthly_storage_cost + monthly_processing_cost + monthly_transfer_cost
return {
"monthly_costs": {
"storage": monthly_storage_cost,
"processing": monthly_processing_cost,
"transfer": monthly_transfer_cost,
"total": total_monthly_cost
},
"yearly_projection": total_monthly_cost * 12,
"cost_breakdown": {
"storage_percentage": (monthly_storage_cost / total_monthly_cost) * 100,
"processing_percentage": (monthly_processing_cost / total_monthly_cost) * 100,
"transfer_percentage": (monthly_transfer_cost / total_monthly_cost) * 100
}
}
def generate_architecture_report(self) -> Dict[str, Any]:
"""生成架构报告"""
return {
"timestamp": datetime.utcnow().isoformat(),
"summary": {
"total_data_sources": len(self.data_sources),
"total_pipelines": len(self.data_pipelines),
"total_assets": len(self.data_assets)
},
"volume_analysis": self.analyze_data_volume(),
"velocity_analysis": self.analyze_data_velocity(),
"variety_analysis": self.analyze_data_variety(),
"pipeline_analysis": self.analyze_pipeline_complexity(),
"recommendations": self.generate_architecture_recommendations(),
"cost_estimation": self.generate_cost_estimation()
}
# 使用示例
def data_architecture_example():
"""数据架构示例"""
analyzer = DataArchitectureAnalyzer()
# 添加数据源
sources = [
DataSource(
id="src-001",
name="用户行为数据库",
type=DataSourceType.RELATIONAL_DB,
format=DataFormat.JSON,
location="mysql://prod-db/user_events",
schema={"user_id": "string", "event_type": "string", "timestamp": "datetime"},
volume_gb_per_day=50.0,
velocity_records_per_second=500,
variety_score=6,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
),
DataSource(
id="src-002",
name="IoT传感器数据",
type=DataSourceType.STREAMING,
format=DataFormat.AVRO,
location="kafka://iot-cluster/sensor-data",
schema={"device_id": "string", "temperature": "float", "humidity": "float"},
volume_gb_per_day=200.0,
velocity_records_per_second=2000,
variety_score=8,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
),
DataSource(
id="src-003",
name="应用日志",
type=DataSourceType.LOG,
format=DataFormat.JSON,
location="s3://logs-bucket/app-logs/",
schema={"timestamp": "datetime", "level": "string", "message": "string"},
volume_gb_per_day=100.0,
velocity_records_per_second=1000,
variety_score=4,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
]
for source in sources:
analyzer.add_data_source(source)
# 添加数据管道
pipelines = [
DataPipeline(
id="pipe-001",
name="用户行为ETL",
source_ids=["src-001"],
processing_type=ProcessingType.BATCH,
transformations=[
{"type": "filter", "condition": "event_type != 'test'"},
{"type": "aggregate", "group_by": "user_id", "metrics": ["count", "avg"]},
{"type": "join", "table": "user_profiles", "key": "user_id"}
],
destination="s3://data-lake/user-behavior/",
schedule="0 2 * * *", # 每天凌晨2点
sla_minutes=120,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
),
DataPipeline(
id="pipe-002",
name="IoT实时处理",
source_ids=["src-002"],
processing_type=ProcessingType.STREAMING,
transformations=[
{"type": "validate", "schema": "iot_schema"},
{"type": "enrich", "lookup": "device_metadata"},
{"type": "window", "type": "tumbling", "duration": "5m"},
{"type": "aggregate", "metrics": ["avg", "max", "min"]}
],
destination="kinesis://processed-iot-stream",
schedule=None,
sla_minutes=5,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
]
for pipeline in pipelines:
analyzer.add_data_pipeline(pipeline)
# 添加数据资产
assets = [
DataAsset(
id="asset-001",
name="用户行为分析表",
description="用户行为数据的聚合分析结果",
owner="数据团队",
tags=["用户分析", "行为数据", "营销"],
schema={"user_id": "string", "total_events": "int", "avg_session_duration": "float"},
quality_score=8.5,
usage_frequency="daily",
business_value="high",
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
),
DataAsset(
id="asset-002",
name="IoT设备监控仪表板",
description="实时IoT设备状态监控数据",
owner="运维团队",
tags=["IoT", "监控", "实时"],
schema={"device_id": "string", "status": "string", "last_update": "datetime"},
quality_score=9.2,
usage_frequency="real-time",
business_value="critical",
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
]
for asset in assets:
analyzer.add_data_asset(asset)
# 生成架构报告
report = analyzer.generate_architecture_report()
print("数据架构分析报告")
print("=" * 50)
print(f"数据源总数: {report['summary']['total_data_sources']}")
print(f"数据管道总数: {report['summary']['total_pipelines']}")
print(f"数据资产总数: {report['summary']['total_assets']}")
print("\n数据量分析:")
volume = report['volume_analysis']
print(f" 每日数据量: {volume['total_volume_gb_per_day']:.2f} GB")
print(f" 年度预计数据量: {volume['projected_yearly_volume_tb']:.2f} TB")
print("\n数据速度分析:")
velocity = report['velocity_analysis']
print(f" 总记录速度: {velocity['total_velocity_records_per_second']:,} records/sec")
print(f" 每日峰值记录: {velocity['peak_daily_records']:,} records")
print("\n架构建议:")
recommendations = report['recommendations']
for category, items in recommendations.items():
if items:
print(f" {category}:")
for item in items:
if isinstance(item, dict):
print(f" - {item.get('type', item.get('pattern', 'N/A'))}: {item.get('reason', item.get('description', 'N/A'))}")
print("\n成本估算:")
costs = report['cost_estimation']
print(f" 月度总成本: ${costs['monthly_costs']['total']:.2f}")
print(f" 年度预计成本: ${costs['yearly_projection']:.2f}")
print(f" 存储成本占比: {costs['cost_breakdown']['storage_percentage']:.1f}%")
print(f" 处理成本占比: {costs['cost_breakdown']['processing_percentage']:.1f}%")
if __name__ == "__main__":
data_architecture_example()
数据湖架构设计
分层数据湖架构
数据湖采用分层架构,确保数据的有序管理和高效访问:
import boto3
import json
import yaml
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
import os
class DataLakeLayer(Enum):
"""数据湖层级"""
RAW = "raw" # 原始数据层
BRONZE = "bronze" # 青铜层(清洗后)
SILVER = "silver" # 银层(结构化)
GOLD = "gold" # 金层(业务就绪)
class DataFormat(Enum):
"""数据格式"""
PARQUET = "parquet"
DELTA = "delta"
ICEBERG = "iceberg"
JSON = "json"
CSV = "csv"
AVRO = "avro"
@dataclass
class DataLakeConfig:
"""数据湖配置"""
bucket_name: str
region: str
encryption_key: str
lifecycle_policies: Dict[str, Any]
access_policies: Dict[str, Any]
metadata_catalog: str
class DataLakeManager:
"""数据湖管理器"""
def __init__(self, config: DataLakeConfig):
self.config = config
self.s3 = boto3.client('s3', region_name=config.region)
self.glue = boto3.client('glue', region_name=config.region)
self.lakeformation = boto3.client('lakeformation', region_name=config.region)
def create_data_lake_structure(self) -> Dict[str, Any]:
"""创建数据湖结构"""
bucket_name = self.config.bucket_name
# 创建S3存储桶
try:
if self.config.region != 'us-east-1':
self.s3.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={'LocationConstraint': self.config.region}
)
else:
self.s3.create_bucket(Bucket=bucket_name)
# 启用版本控制
self.s3.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={'Status': 'Enabled'}
)
# 设置加密
self.s3.put_bucket_encryption(
Bucket=bucket_name,
ServerSideEncryptionConfiguration={
'Rules': [{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'aws:kms',
'KMSMasterKeyID': self.config.encryption_key
}
}]
}
)
# 创建分层目录结构
layers = [
"raw/",
"bronze/",
"silver/",
"gold/",
"metadata/",
"scripts/",
"temp/"
]
for layer in layers:
self.s3.put_object(
Bucket=bucket_name,
Key=layer,
Body=b''
)
# 设置生命周期策略
self._set_lifecycle_policies(bucket_name)
# 设置访问策略
self._set_bucket_policies(bucket_name)
return {
"status": "success",
"bucket_name": bucket_name,
"layers_created": layers,
"encryption_enabled": True,
"versioning_enabled": True
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def _set_lifecycle_policies(self, bucket_name: str):
"""设置生命周期策略"""
lifecycle_config = {
'Rules': [
{
'ID': 'RawDataTransition',
'Status': 'Enabled',
'Filter': {'Prefix': 'raw/'},
'Transitions': [
{
'Days': 30,
'StorageClass': 'STANDARD_IA'
},
{
'Days': 90,
'StorageClass': 'GLACIER'
},
{
'Days': 365,
'StorageClass': 'DEEP_ARCHIVE'
}
]
},
{
'ID': 'TempDataCleanup',
'Status': 'Enabled',
'Filter': {'Prefix': 'temp/'},
'Expiration': {'Days': 7}
},
{
'ID': 'IncompleteMultipartUploads',
'Status': 'Enabled',
'Filter': {},
'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 1}
}
]
}
self.s3.put_bucket_lifecycle_configuration(
Bucket=bucket_name,
LifecycleConfiguration=lifecycle_config
)
def _set_bucket_policies(self, bucket_name: str):
"""设置存储桶策略"""
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DenyInsecureConnections",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:*",
"Resource": [
f"arn:aws:s3:::{bucket_name}",
f"arn:aws:s3:::{bucket_name}/*"
],
"Condition": {
"Bool": {
"aws:SecureTransport": "false"
}
}
},
{
"Sid": "RequireSSEKMS",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:PutObject",
"Resource": f"arn:aws:s3:::{bucket_name}/*",
"Condition": {
"StringNotEquals": {
"s3:x-amz-server-side-encryption": "aws:kms"
}
}
}
]
}
self.s3.put_bucket_policy(
Bucket=bucket_name,
Policy=json.dumps(policy)
)
def create_data_catalog(self, database_name: str) -> Dict[str, Any]:
"""创建数据目录"""
try:
# 创建Glue数据库
self.glue.create_database(
DatabaseInput={
'Name': database_name,
'Description': f'Data Lake catalog for {self.config.bucket_name}',
'LocationUri': f's3://{self.config.bucket_name}/',
'Parameters': {
'classification': 'data-lake',
'created_by': 'data-lake-manager',
'created_at': datetime.utcnow().isoformat()
}
}
)
# 为每个层级创建表
layers = ['raw', 'bronze', 'silver', 'gold']
tables_created = []
for layer in layers:
table_name = f"{layer}_data"
try:
self.glue.create_table(
DatabaseName=database_name,
TableInput={
'Name': table_name,
'Description': f'{layer.title()} layer data',
'StorageDescriptor': {
'Columns': [
{'Name': 'partition_date', 'Type': 'string'},
{'Name': 'data_source', 'Type': 'string'},
{'Name': 'record_count', 'Type': 'bigint'}
],
'Location': f's3://{self.config.bucket_name}/{layer}/',
'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
'SerdeInfo': {
'SerializationLibrary': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
}
},
'PartitionKeys': [
{'Name': 'year', 'Type': 'string'},
{'Name': 'month', 'Type': 'string'},
{'Name': 'day', 'Type': 'string'}
],
'Parameters': {
'classification': 'parquet',
'layer': layer
}
}
)
tables_created.append(table_name)
except Exception as e:
print(f"Warning: Could not create table {table_name}: {e}")
return {
"status": "success",
"database_name": database_name,
"tables_created": tables_created
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def setup_lake_formation_permissions(self, database_name: str,
principals: List[str]) -> Dict[str, Any]:
"""设置Lake Formation权限"""
try:
permissions_granted = []
for principal in principals:
# 授予数据库权限
self.lakeformation.grant_permissions(
Principal={'DataLakePrincipalIdentifier': principal},
Resource={'Database': {'Name': database_name}},
Permissions=['ALL'],
PermissionsWithGrantOption=['ALL']
)
# 授予表权限
self.lakeformation.grant_permissions(
Principal={'DataLakePrincipalIdentifier': principal},
Resource={
'Table': {
'DatabaseName': database_name,
'TableWildcard': {}
}
},
Permissions=['ALL'],
PermissionsWithGrantOption=['ALL']
)
permissions_granted.append(principal)
return {
"status": "success",
"permissions_granted": permissions_granted
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def create_data_quality_framework(self) -> str:
"""创建数据质量框架"""
dq_script = """
import boto3
import pandas as pd
import numpy as np
from typing import Dict, Any, List
import json
from datetime import datetime
class DataQualityChecker:
def __init__(self, s3_bucket: str, glue_database: str):
self.s3_bucket = s3_bucket
self.glue_database = glue_database
self.s3 = boto3.client('s3')
self.athena = boto3.client('athena')
def check_completeness(self, table_name: str, required_columns: List[str]) -> Dict[str, Any]:
\"\"\"检查数据完整性\"\"\"
query = f\"\"\"
SELECT
{', '.join([f'COUNT({col}) as {col}_count' for col in required_columns])},
COUNT(*) as total_rows
FROM {self.glue_database}.{table_name}
\"\"\"
result = self._execute_athena_query(query)
completeness_scores = {}
if result and len(result) > 0:
row = result[0]
total_rows = row.get('total_rows', 0)
for col in required_columns:
col_count = row.get(f'{col}_count', 0)
completeness_scores[col] = (col_count / total_rows) * 100 if total_rows > 0 else 0
return {
'table': table_name,
'completeness_scores': completeness_scores,
'overall_score': np.mean(list(completeness_scores.values())) if completeness_scores else 0,
'timestamp': datetime.utcnow().isoformat()
}
def check_uniqueness(self, table_name: str, unique_columns: List[str]) -> Dict[str, Any]:
\"\"\"检查数据唯一性\"\"\"
uniqueness_scores = {}
for col in unique_columns:
query = f\"\"\"
SELECT
COUNT(*) as total_rows,
COUNT(DISTINCT {col}) as unique_values
FROM {self.glue_database}.{table_name}
\"\"\"
result = self._execute_athena_query(query)
if result and len(result) > 0:
row = result[0]
total_rows = row.get('total_rows', 0)
unique_values = row.get('unique_values', 0)
uniqueness_scores[col] = (unique_values / total_rows) * 100 if total_rows > 0 else 0
return {
'table': table_name,
'uniqueness_scores': uniqueness_scores,
'overall_score': np.mean(list(uniqueness_scores.values())) if uniqueness_scores else 0,
'timestamp': datetime.utcnow().isoformat()
}
def check_validity(self, table_name: str, validation_rules: Dict[str, str]) -> Dict[str, Any]:
\"\"\"检查数据有效性\"\"\"
validity_scores = {}
for col, rule in validation_rules.items():
query = f\"\"\"
SELECT
COUNT(*) as total_rows,
COUNT(CASE WHEN {rule} THEN 1 END) as valid_rows
FROM {self.glue_database}.{table_name}
\"\"\"
result = self._execute_athena_query(query)
if result and len(result) > 0:
row = result[0]
total_rows = row.get('total_rows', 0)
valid_rows = row.get('valid_rows', 0)
validity_scores[col] = (valid_rows / total_rows) * 100 if total_rows > 0 else 0
return {
'table': table_name,
'validity_scores': validity_scores,
'overall_score': np.mean(list(validity_scores.values())) if validity_scores else 0,
'timestamp': datetime.utcnow().isoformat()
}
def _execute_athena_query(self, query: str) -> List[Dict[str, Any]]:
\"\"\"执行Athena查询\"\"\"
# 这里应该实现实际的Athena查询逻辑
# 为了示例,返回模拟数据
return [{'total_rows': 1000, 'unique_values': 950}]
def generate_quality_report(self, table_name: str,
required_columns: List[str],
unique_columns: List[str],
validation_rules: Dict[str, str]) -> Dict[str, Any]:
\"\"\"生成数据质量报告\"\"\"
completeness = self.check_completeness(table_name, required_columns)
uniqueness = self.check_uniqueness(table_name, unique_columns)
validity = self.check_validity(table_name, validation_rules)
overall_score = np.mean([
completeness['overall_score'],
uniqueness['overall_score'],
validity['overall_score']
])
return {
'table': table_name,
'overall_quality_score': overall_score,
'completeness': completeness,
'uniqueness': uniqueness,
'validity': validity,
'quality_grade': self._get_quality_grade(overall_score),
'timestamp': datetime.utcnow().isoformat()
}
def _get_quality_grade(self, score: float) -> str:
\"\"\"获取质量等级\"\"\"
if score >= 95:
return 'A'
elif score >= 85:
return 'B'
elif score >= 75:
return 'C'
elif score >= 65:
return 'D'
else:
return 'F'
"""
# 将脚本保存到S3
script_key = "scripts/data_quality_checker.py"
self.s3.put_object(
Bucket=self.config.bucket_name,
Key=script_key,
Body=dq_script.encode('utf-8'),
ContentType='text/x-python'
)
return f"s3://{self.config.bucket_name}/{script_key}"
# 使用示例
def data_lake_example():
"""数据湖示例"""
config = DataLakeConfig(
bucket_name="my-enterprise-data-lake",
region="us-west-2",
encryption_key="arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012",
lifecycle_policies={},
access_policies={},
metadata_catalog="glue"
)
manager = DataLakeManager(config)
# 创建数据湖结构
print("创建数据湖结构...")
structure_result = manager.create_data_lake_structure()
print(f"结果: {structure_result}")
# 创建数据目录
print("\n创建数据目录...")
catalog_result = manager.create_data_catalog("enterprise_data_lake")
print(f"结果: {catalog_result}")
# 设置权限
print("\n设置Lake Formation权限...")
principals = [
"arn:aws:iam::123456789012:role/DataAnalystRole",
"arn:aws:iam::123456789012:role/DataScientistRole"
]
permissions_result = manager.setup_lake_formation_permissions("enterprise_data_lake", principals)
print(f"结果: {permissions_result}")
# 创建数据质量框架
print("\n创建数据质量框架...")
dq_script_location = manager.create_data_quality_framework()
print(f"数据质量脚本位置: {dq_script_location}")
if __name__ == "__main__":
data_lake_example()
数据仓库与数据集市
现代数据仓库架构
import boto3
import json
import psycopg2
from typing import Dict, Any, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
import pandas as pd
import numpy as np
class DimensionType(Enum):
"""维度类型"""
SCD_TYPE_1 = "scd_type_1" # 覆盖更新
SCD_TYPE_2 = "scd_type_2" # 历史追踪
SCD_TYPE_3 = "scd_type_3" # 部分历史
class FactType(Enum):
"""事实表类型"""
TRANSACTION = "transaction" # 事务事实表
SNAPSHOT = "snapshot" # 快照事实表
ACCUMULATING = "accumulating" # 累积快照事实表
@dataclass
class DimensionTable:
"""维度表定义"""
name: str
type: DimensionType
business_key: str
attributes: List[str]
source_system: str
update_frequency: str
retention_days: int
@dataclass
class FactTable:
"""事实表定义"""
name: str
type: FactType
grain: str
dimensions: List[str]
measures: List[str]
source_system: str
update_frequency: str
partition_key: str
class DataWarehouseManager:
"""数据仓库管理器"""
def __init__(self, cluster_identifier: str, region: str = "us-east-1"):
self.cluster_identifier = cluster_identifier
self.region = region
self.redshift = boto3.client('redshift', region_name=region)
self.s3 = boto3.client('s3', region_name=region)
self.connection = None
def create_redshift_cluster(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""创建Redshift集群"""
try:
response = self.redshift.create_cluster(
ClusterIdentifier=self.cluster_identifier,
NodeType=config.get('node_type', 'dc2.large'),
MasterUsername=config['master_username'],
MasterUserPassword=config['master_password'],
DBName=config.get('db_name', 'datawarehouse'),
ClusterType=config.get('cluster_type', 'multi-node'),
NumberOfNodes=config.get('number_of_nodes', 2),
VpcSecurityGroupIds=config.get('vpc_security_group_ids', []),
ClusterSubnetGroupName=config.get('cluster_subnet_group_name'),
PubliclyAccessible=config.get('publicly_accessible', False),
Encrypted=config.get('encrypted', True),
KmsKeyId=config.get('kms_key_id'),
EnhancedVpcRouting=config.get('enhanced_vpc_routing', True),
ClusterParameterGroupName=config.get('parameter_group_name'),
Tags=[
{'Key': 'Environment', 'Value': config.get('environment', 'production')},
{'Key': 'Purpose', 'Value': 'DataWarehouse'},
{'Key': 'CreatedBy', 'Value': 'DataWarehouseManager'}
]
)
return {
"status": "success",
"cluster_identifier": self.cluster_identifier,
"endpoint": response['Cluster']['Endpoint']['Address'],
"port": response['Cluster']['Endpoint']['Port']
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def connect_to_cluster(self, host: str, port: int, database: str,
username: str, password: str):
"""连接到Redshift集群"""
try:
self.connection = psycopg2.connect(
host=host,
port=port,
database=database,
user=username,
password=password
)
return True
except Exception as e:
print(f"连接失败: {e}")
return False
def create_schema_structure(self) -> Dict[str, Any]:
"""创建模式结构"""
if not self.connection:
return {"status": "error", "error": "No database connection"}
schemas = [
"staging", # 临时数据区
"ods", # 操作数据存储
"dw", # 数据仓库核心层
"dm", # 数据集市
"metadata", # 元数据管理
"audit" # 审计日志
]
created_schemas = []
cursor = self.connection.cursor()
try:
for schema in schemas:
cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}")
created_schemas.append(schema)
self.connection.commit()
return {
"status": "success",
"schemas_created": created_schemas
}
except Exception as e:
self.connection.rollback()
return {
"status": "error",
"error": str(e)
}
finally:
cursor.close()
def create_dimension_table(self, dimension: DimensionTable) -> str:
"""创建维度表"""
if dimension.type == DimensionType.SCD_TYPE_2:
return self._create_scd_type2_dimension(dimension)
else:
return self._create_standard_dimension(dimension)
def _create_scd_type2_dimension(self, dimension: DimensionTable) -> str:
"""创建SCD Type 2维度表"""
attributes_sql = ",\n ".join([f"{attr} VARCHAR(255)" for attr in dimension.attributes])
sql = f"""
CREATE TABLE IF NOT EXISTS dw.{dimension.name} (
{dimension.name}_sk BIGINT IDENTITY(1,1) PRIMARY KEY,
{dimension.business_key} VARCHAR(255) NOT NULL,
{attributes_sql},
effective_date DATE NOT NULL,
expiry_date DATE,
is_current BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT GETDATE(),
updated_at TIMESTAMP DEFAULT GETDATE(),
source_system VARCHAR(100) DEFAULT '{dimension.source_system}'
)
DISTSTYLE KEY
DISTKEY ({dimension.business_key})
SORTKEY (effective_date, {dimension.business_key});
-- 创建索引
CREATE INDEX IF NOT EXISTS idx_{dimension.name}_business_key
ON dw.{dimension.name} ({dimension.business_key});
CREATE INDEX IF NOT EXISTS idx_{dimension.name}_current
ON dw.{dimension.name} (is_current) WHERE is_current = TRUE;
"""
return sql
def _create_standard_dimension(self, dimension: DimensionTable) -> str:
"""创建标准维度表"""
attributes_sql = ",\n ".join([f"{attr} VARCHAR(255)" for attr in dimension.attributes])
sql = f"""
CREATE TABLE IF NOT EXISTS dw.{dimension.name} (
{dimension.name}_sk BIGINT IDENTITY(1,1) PRIMARY KEY,
{dimension.business_key} VARCHAR(255) UNIQUE NOT NULL,
{attributes_sql},
created_at TIMESTAMP DEFAULT GETDATE(),
updated_at TIMESTAMP DEFAULT GETDATE(),
source_system VARCHAR(100) DEFAULT '{dimension.source_system}'
)
DISTSTYLE KEY
DISTKEY ({dimension.business_key})
SORTKEY ({dimension.business_key});
"""
return sql
def create_fact_table(self, fact: FactTable) -> str:
"""创建事实表"""
# 维度外键
dimension_fks = ",\n ".join([f"{dim}_sk BIGINT NOT NULL" for dim in fact.dimensions])
# 度量字段
measures_sql = ",\n ".join([f"{measure} DECIMAL(18,4)" for measure in fact.measures])
# 外键约束
foreign_keys = "\n".join([
f"ALTER TABLE dw.{fact.name} ADD CONSTRAINT fk_{fact.name}_{dim} "
f"FOREIGN KEY ({dim}_sk) REFERENCES dw.{dim}({dim}_sk);"
for dim in fact.dimensions
])
sql = f"""
CREATE TABLE IF NOT EXISTS dw.{fact.name} (
{fact.name}_sk BIGINT IDENTITY(1,1) PRIMARY KEY,
{dimension_fks},
{measures_sql},
{fact.partition_key} DATE NOT NULL,
created_at TIMESTAMP DEFAULT GETDATE(),
source_system VARCHAR(100)
)
DISTSTYLE KEY
DISTKEY ({fact.dimensions[0]}_sk)
SORTKEY ({fact.partition_key});
{foreign_keys}
"""
return sql
def create_etl_procedures(self) -> Dict[str, str]:
"""创建ETL存储过程"""
procedures = {}
# 维度表加载过程(SCD Type 2)
procedures["load_scd_type2_dimension"] = """
CREATE OR REPLACE PROCEDURE dw.load_scd_type2_dimension(
p_table_name VARCHAR(255),
p_source_query VARCHAR(65535)
)
AS $$
BEGIN
-- 创建临时表
EXECUTE 'CREATE TEMP TABLE temp_' || p_table_name || ' AS ' || p_source_query;
-- 更新过期记录
EXECUTE '
UPDATE dw.' || p_table_name || '
SET expiry_date = CURRENT_DATE - 1,
is_current = FALSE,
updated_at = GETDATE()
WHERE business_key IN (
SELECT business_key FROM temp_' || p_table_name || '
WHERE business_key IN (
SELECT business_key FROM dw.' || p_table_name || ' WHERE is_current = TRUE
)
) AND is_current = TRUE';
-- 插入新记录
EXECUTE '
INSERT INTO dw.' || p_table_name || '
SELECT * FROM temp_' || p_table_name;
-- 清理临时表
EXECUTE 'DROP TABLE temp_' || p_table_name;
END;
$$ LANGUAGE plpgsql;
"""
# 事实表增量加载过程
procedures["load_fact_incremental"] = """
CREATE OR REPLACE PROCEDURE dw.load_fact_incremental(
p_fact_table VARCHAR(255),
p_source_query VARCHAR(65535),
p_partition_date DATE
)
AS $$
BEGIN
-- 删除当日数据(重新加载)
EXECUTE 'DELETE FROM dw.' || p_fact_table ||
' WHERE partition_date = ''' || p_partition_date || '''';
-- 插入新数据
EXECUTE 'INSERT INTO dw.' || p_fact_table || ' ' || p_source_query;
-- 更新统计信息
EXECUTE 'ANALYZE dw.' || p_fact_table;
END;
$$ LANGUAGE plpgsql;
"""
# 数据质量检查过程
procedures["data_quality_check"] = """
CREATE OR REPLACE PROCEDURE audit.data_quality_check(
p_table_name VARCHAR(255),
p_check_date DATE
)
AS $$
DECLARE
v_row_count BIGINT;
v_null_count BIGINT;
v_duplicate_count BIGINT;
BEGIN
-- 行数检查
EXECUTE 'SELECT COUNT(*) FROM ' || p_table_name
|| ' WHERE partition_date = ''' || p_check_date || ''''
INTO v_row_count;
-- 空值检查
EXECUTE 'SELECT COUNT(*) FROM ' || p_table_name
|| ' WHERE partition_date = ''' || p_check_date || ''' AND business_key IS NULL'
INTO v_null_count;
-- 重复值检查
EXECUTE 'SELECT COUNT(*) - COUNT(DISTINCT business_key) FROM ' || p_table_name
|| ' WHERE partition_date = ''' || p_check_date || ''''
INTO v_duplicate_count;
-- 记录检查结果
INSERT INTO audit.data_quality_log (
table_name, check_date, row_count, null_count, duplicate_count, check_timestamp
) VALUES (
p_table_name, p_check_date, v_row_count, v_null_count, v_duplicate_count, GETDATE()
);
END;
$$ LANGUAGE plpgsql;
"""
return procedures
def create_data_marts(self, business_areas: List[str]) -> Dict[str, List[str]]:
"""创建数据集市"""
data_marts = {}
for area in business_areas:
mart_tables = []
# 销售数据集市示例
if area == "sales":
mart_tables = [
f"""
CREATE TABLE dm.sales_summary AS
SELECT
d.date_key,
p.product_category,
c.customer_segment,
g.geography_region,
SUM(f.sales_amount) as total_sales,
SUM(f.quantity) as total_quantity,
COUNT(DISTINCT f.order_id) as order_count,
AVG(f.sales_amount) as avg_order_value
FROM dw.fact_sales f
JOIN dw.dim_date d ON f.date_sk = d.date_sk
JOIN dw.dim_product p ON f.product_sk = p.product_sk
JOIN dw.dim_customer c ON f.customer_sk = c.customer_sk
JOIN dw.dim_geography g ON f.geography_sk = g.geography_sk
GROUP BY d.date_key, p.product_category, c.customer_segment, g.geography_region;
""",
f"""
CREATE TABLE dm.sales_trends AS
SELECT
date_key,
product_category,
total_sales,
LAG(total_sales, 1) OVER (PARTITION BY product_category ORDER BY date_key) as prev_sales,
(total_sales - LAG(total_sales, 1) OVER (PARTITION BY product_category ORDER BY date_key)) /
LAG(total_sales, 1) OVER (PARTITION BY product_category ORDER BY date_key) * 100 as growth_rate
FROM dm.sales_summary;
"""
]
# 客户数据集市示例
elif area == "customer":
mart_tables = [
f"""
CREATE TABLE dm.customer_360 AS
SELECT
c.customer_id,
c.customer_name,
c.customer_segment,
c.registration_date,
COUNT(DISTINCT f.order_id) as total_orders,
SUM(f.sales_amount) as lifetime_value,
AVG(f.sales_amount) as avg_order_value,
MAX(f.order_date) as last_order_date,
DATEDIFF(day, MAX(f.order_date), CURRENT_DATE) as days_since_last_order
FROM dw.dim_customer c
LEFT JOIN dw.fact_sales f ON c.customer_sk = f.customer_sk
GROUP BY c.customer_id, c.customer_name, c.customer_segment, c.registration_date;
"""
]
data_marts[area] = mart_tables
return data_marts
def setup_monitoring_and_alerts(self) -> Dict[str, str]:
"""设置监控和告警"""
monitoring_queries = {
"cluster_performance": """
SELECT
query_id,
userid,
query,
starttime,
endtime,
DATEDIFF(seconds, starttime, endtime) as duration_seconds,
aborted
FROM stl_query
WHERE starttime >= DATEADD(hour, -1, GETDATE())
AND duration_seconds > 300
ORDER BY duration_seconds DESC;
""",
"storage_usage": """
SELECT
schema_name,
table_name,
size_mb,
pct_used
FROM svv_table_info
WHERE size_mb > 1000
ORDER BY size_mb DESC;
""",
"data_freshness": """
SELECT
schemaname,
tablename,
MAX(created_at) as last_update,
DATEDIFF(hour, MAX(created_at), GETDATE()) as hours_since_update
FROM information_schema.tables t
JOIN pg_stat_user_tables s ON t.table_name = s.relname
WHERE schemaname IN ('dw', 'dm')
GROUP BY schemaname, tablename
HAVING hours_since_update > 24;
"""
}
return monitoring_queries
# 使用示例
def data_warehouse_example():
"""数据仓库示例"""
manager = DataWarehouseManager("my-dw-cluster")
# 创建集群配置
cluster_config = {
"node_type": "dc2.large",
"master_username": "dwadmin",
"master_password": "SecurePassword123!",
"db_name": "datawarehouse",
"cluster_type": "multi-node",
"number_of_nodes": 3,
"encrypted": True,
"environment": "production"
}
# 创建Redshift集群
print("创建Redshift集群...")
cluster_result = manager.create_redshift_cluster(cluster_config)
print(f"结果: {cluster_result}")
# 创建模式结构
print("\n创建模式结构...")
schema_result = manager.create_schema_structure()
print(f"结果: {schema_result}")
# 定义维度表
customer_dim = DimensionTable(
name="dim_customer",
type=DimensionType.SCD_TYPE_2,
business_key="customer_id",
attributes=["customer_name", "email", "phone", "address", "customer_segment"],
source_system="CRM",
update_frequency="daily",
retention_days=2555 # 7年
)
product_dim = DimensionTable(
name="dim_product",
type=DimensionType.SCD_TYPE_2,
business_key="product_id",
attributes=["product_name", "category", "brand", "price", "status"],
source_system="ERP",
update_frequency="daily",
retention_days=2555
)
# 定义事实表
sales_fact = FactTable(
name="fact_sales",
type=FactType.TRANSACTION,
grain="每个销售交易",
dimensions=["dim_customer", "dim_product", "dim_date", "dim_geography"],
measures=["sales_amount", "quantity", "discount_amount", "tax_amount"],
source_system="POS",
update_frequency="hourly",
partition_key="order_date"
)
# 创建表结构
print("\n创建维度表...")
customer_sql = manager.create_dimension_table(customer_dim)
product_sql = manager.create_dimension_table(product_dim)
print("\n创建事实表...")
sales_sql = manager.create_fact_table(sales_fact)
# 创建ETL过程
print("\n创建ETL存储过程...")
procedures = manager.create_etl_procedures()
# 创建数据集市
print("\n创建数据集市...")
marts = manager.create_data_marts(["sales", "customer"])
# 设置监控
print("\n设置监控查询...")
monitoring = manager.setup_monitoring_and_alerts()
print("\n数据仓库设置完成!")
if __name__ == "__main__":
data_warehouse_example()
实时数据处理
流处理架构
现代企业需要实时处理和分析数据流,以支持实时决策和响应。以下是一个完整的流处理架构实现:
import json
import time
import asyncio
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
import boto3
from abc import ABC, abstractmethod
import logging
import uuid
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("stream-processing")
class StreamType(Enum):
"""流类型"""
KINESIS = "kinesis"
KAFKA = "kafka"
PULSAR = "pulsar"
EVENTHUB = "eventhub"
class ProcessingMode(Enum):
"""处理模式"""
AT_LEAST_ONCE = "at_least_once"
AT_MOST_ONCE = "at_most_once"
EXACTLY_ONCE = "exactly_once"
@dataclass
class StreamEvent:
"""流事件"""
event_id: str
event_type: str
timestamp: datetime
data: Dict[str, Any]
source: str
partition_key: str
@dataclass
class ProcessingWindow:
"""处理窗口"""
window_type: str # tumbling, sliding, session
size_seconds: int
slide_seconds: Optional[int] = None
gap_seconds: Optional[int] = None
class StreamProcessor(ABC):
"""流处理器抽象基类"""
@abstractmethod
async def process_event(self, event: StreamEvent) -> Optional[Dict[str, Any]]:
"""处理单个事件"""
pass
@abstractmethod
async def process_window(self, events: List[StreamEvent],
window: ProcessingWindow) -> Optional[Dict[str, Any]]:
"""处理窗口事件"""
pass
class RealTimeAnalyticsProcessor(StreamProcessor):
"""实时分析处理器"""
def __init__(self):
self.event_counts = {}
self.metrics_cache = {}
async def process_event(self, event: StreamEvent) -> Optional[Dict[str, Any]]:
"""处理单个事件"""
event_type = event.event_type
# 更新事件计数
if event_type not in self.event_counts:
self.event_counts[event_type] = 0
self.event_counts[event_type] += 1
# 实时指标计算
metrics = {
"event_id": event.event_id,
"event_type": event_type,
"timestamp": event.timestamp.isoformat(),
"total_count": self.event_counts[event_type],
"processing_latency_ms": (datetime.utcnow() - event.timestamp).total_seconds() * 1000
}
# 异常检测
if "error" in event.data or "exception" in event.data:
metrics["alert"] = {
"type": "error_detected",
"severity": "high",
"message": f"Error detected in {event_type} event"
}
return metrics
async def process_window(self, events: List[StreamEvent],
window: ProcessingWindow) -> Optional[Dict[str, Any]]:
"""处理窗口事件"""
if not events:
return None
window_start = min(event.timestamp for event in events)
window_end = max(event.timestamp for event in events)
# 按事件类型分组
events_by_type = {}
for event in events:
event_type = event.event_type
if event_type not in events_by_type:
events_by_type[event_type] = []
events_by_type[event_type].append(event)
# 计算窗口指标
window_metrics = {
"window_id": str(uuid.uuid4()),
"window_start": window_start.isoformat(),
"window_end": window_end.isoformat(),
"window_duration_seconds": (window_end - window_start).total_seconds(),
"total_events": len(events),
"events_by_type": {k: len(v) for k, v in events_by_type.items()},
"events_per_second": len(events) / window.size_seconds
}
# 计算业务指标
if "user_action" in events_by_type:
user_actions = events_by_type["user_action"]
unique_users = len(set(event.data.get("user_id") for event in user_actions))
window_metrics["unique_users"] = unique_users
window_metrics["actions_per_user"] = len(user_actions) / unique_users if unique_users > 0 else 0
return window_metrics
class StreamingPlatform:
"""流处理平台"""
def __init__(self, platform_type: StreamType, config: Dict[str, Any]):
self.platform_type = platform_type
self.config = config
self.processors: List[StreamProcessor] = []
self.running = False
# 初始化平台客户端
if platform_type == StreamType.KINESIS:
self.kinesis = boto3.client('kinesis', region_name=config.get('region', 'us-east-1'))
self.firehose = boto3.client('firehose', region_name=config.get('region', 'us-east-1'))
def add_processor(self, processor: StreamProcessor):
"""添加处理器"""
self.processors.append(processor)
logger.info(f"Added processor: {processor.__class__.__name__}")
async def create_kinesis_stream(self, stream_name: str, shard_count: int = 1) -> Dict[str, Any]:
"""创建Kinesis流"""
try:
response = self.kinesis.create_stream(
StreamName=stream_name,
ShardCount=shard_count
)
# 等待流变为活跃状态
waiter = self.kinesis.get_waiter('stream_exists')
waiter.wait(StreamName=stream_name)
return {
"status": "success",
"stream_name": stream_name,
"shard_count": shard_count
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
async def create_delivery_stream(self, delivery_stream_name: str,
destination_config: Dict[str, Any]) -> Dict[str, Any]:
"""创建Kinesis Firehose投递流"""
try:
# S3目标配置
if destination_config["type"] == "s3":
s3_config = {
'RoleARN': destination_config['role_arn'],
'BucketARN': destination_config['bucket_arn'],
'Prefix': destination_config.get('prefix', 'data/'),
'ErrorOutputPrefix': destination_config.get('error_prefix', 'errors/'),
'BufferingHints': {
'SizeInMBs': destination_config.get('buffer_size_mb', 5),
'IntervalInSeconds': destination_config.get('buffer_interval_seconds', 300)
},
'CompressionFormat': destination_config.get('compression', 'GZIP'),
'DataFormatConversionConfiguration': {
'Enabled': True,
'OutputFormatConfiguration': {
'Serializer': {
'ParquetSerDe': {}
}
}
}
}
response = self.firehose.create_delivery_stream(
DeliveryStreamName=delivery_stream_name,
DeliveryStreamType='DirectPut',
ExtendedS3DestinationConfiguration=s3_config
)
return {
"status": "success",
"delivery_stream_name": delivery_stream_name,
"delivery_stream_arn": response['DeliveryStreamARN']
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
async def publish_event(self, stream_name: str, event: StreamEvent) -> Dict[str, Any]:
"""发布事件到流"""
try:
if self.platform_type == StreamType.KINESIS:
response = self.kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(asdict(event), default=str),
PartitionKey=event.partition_key
)
return {
"status": "success",
"shard_id": response['ShardId'],
"sequence_number": response['SequenceNumber']
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
async def consume_stream(self, stream_name: str, shard_iterator_type: str = 'LATEST'):
"""消费流数据"""
if self.platform_type != StreamType.KINESIS:
logger.error("Only Kinesis consumption is implemented")
return
try:
# 获取分片信息
response = self.kinesis.describe_stream(StreamName=stream_name)
shards = response['StreamDescription']['Shards']
# 为每个分片创建消费任务
tasks = []
for shard in shards:
task = asyncio.create_task(
self._consume_shard(stream_name, shard['ShardId'], shard_iterator_type)
)
tasks.append(task)
# 等待所有任务完成
await asyncio.gather(*tasks)
except Exception as e:
logger.error(f"Error consuming stream: {e}")
async def _consume_shard(self, stream_name: str, shard_id: str,
shard_iterator_type: str):
"""消费单个分片"""
try:
# 获取分片迭代器
response = self.kinesis.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType=shard_iterator_type
)
shard_iterator = response['ShardIterator']
while self.running and shard_iterator:
# 获取记录
response = self.kinesis.get_records(
ShardIterator=shard_iterator,
Limit=100
)
records = response['Records']
shard_iterator = response.get('NextShardIterator')
# 处理记录
for record in records:
try:
# 解析事件数据
event_data = json.loads(record['Data'])
event = StreamEvent(**event_data)
# 使用所有处理器处理事件
for processor in self.processors:
result = await processor.process_event(event)
if result:
logger.info(f"Processed event {event.event_id}: {result}")
except Exception as e:
logger.error(f"Error processing record: {e}")
# 避免过于频繁的API调用
if not records:
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Error consuming shard {shard_id}: {e}")
async def start_processing(self, stream_name: str):
"""开始处理"""
self.running = True
logger.info(f"Starting stream processing for {stream_name}")
await self.consume_stream(stream_name)
def stop_processing(self):
"""停止处理"""
self.running = False
logger.info("Stopping stream processing")
class WindowManager:
"""窗口管理器"""
def __init__(self, window_config: ProcessingWindow):
self.window_config = window_config
self.current_window_events: List[StreamEvent] = []
self.window_start_time: Optional[datetime] = None
self.processors: List[StreamProcessor] = []
def add_processor(self, processor: StreamProcessor):
"""添加处理器"""
self.processors.append(processor)
async def add_event(self, event: StreamEvent):
"""添加事件到窗口"""
current_time = datetime.utcnow()
# 初始化窗口
if self.window_start_time is None:
self.window_start_time = current_time
# 检查是否需要触发窗口处理
window_duration = timedelta(seconds=self.window_config.size_seconds)
if current_time - self.window_start_time >= window_duration:
# 处理当前窗口
await self._process_current_window()
# 重置窗口
if self.window_config.window_type == "tumbling":
self.current_window_events = [event]
self.window_start_time = current_time
elif self.window_config.window_type == "sliding":
# 滑动窗口:移除过期事件
slide_duration = timedelta(seconds=self.window_config.slide_seconds or self.window_config.size_seconds)
cutoff_time = current_time - slide_duration
self.current_window_events = [
e for e in self.current_window_events
if e.timestamp >= cutoff_time
]
self.current_window_events.append(event)
self.window_start_time = current_time - slide_duration
else:
self.current_window_events.append(event)
async def _process_current_window(self):
"""处理当前窗口"""
if not self.current_window_events:
return
for processor in self.processors:
try:
result = await processor.process_window(
self.current_window_events,
self.window_config
)
if result:
logger.info(f"Window processed: {result}")
except Exception as e:
logger.error(f"Error processing window: {e}")
# 使用示例
async def stream_processing_example():
"""流处理示例"""
# 配置流处理平台
config = {
"region": "us-east-1",
"buffer_size": 1000,
"batch_timeout": 5
}
platform = StreamingPlatform(StreamType.KINESIS, config)
# 添加处理器
analytics_processor = RealTimeAnalyticsProcessor()
platform.add_processor(analytics_processor)
# 创建流
stream_name = "real-time-events"
stream_result = await platform.create_kinesis_stream(stream_name, shard_count=2)
print(f"Stream creation result: {stream_result}")
# 创建投递流
delivery_config = {
"type": "s3",
"role_arn": "arn:aws:iam::123456789012:role/firehose-delivery-role",
"bucket_arn": "arn:aws:s3:::my-data-lake",
"prefix": "real-time-data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/",
"buffer_size_mb": 5,
"buffer_interval_seconds": 300
}
delivery_result = await platform.create_delivery_stream(
"real-time-delivery",
delivery_config
)
print(f"Delivery stream creation result: {delivery_result}")
# 设置窗口处理
window_config = ProcessingWindow(
window_type="tumbling",
size_seconds=60 # 1分钟窗口
)
window_manager = WindowManager(window_config)
window_manager.add_processor(analytics_processor)
# 模拟发布事件
events = [
StreamEvent(
event_id=str(uuid.uuid4()),
event_type="user_action",
timestamp=datetime.utcnow(),
data={"user_id": "user123", "action": "click", "page": "homepage"},
source="web_app",
partition_key="user123"
),
StreamEvent(
event_id=str(uuid.uuid4()),
event_type="system_metric",
timestamp=datetime.utcnow(),
data={"cpu_usage": 75.5, "memory_usage": 60.2, "server": "web01"},
source="monitoring",
partition_key="web01"
),
StreamEvent(
event_id=str(uuid.uuid4()),
event_type="error",
timestamp=datetime.utcnow(),
data={"error_code": "500", "message": "Internal server error", "service": "api"},
source="application",
partition_key="api"
)
]
# 发布事件
for event in events:
publish_result = await platform.publish_event(stream_name, event)
print(f"Event published: {publish_result}")
# 添加到窗口管理器
await window_manager.add_event(event)
# 模拟延迟
await asyncio.sleep(0.1)
print("Stream processing example completed")
if __name__ == "__main__":
asyncio.run(stream_processing_example())
AI平台架构设计
机器学习平台
构建企业级机器学习平台需要考虑数据管道、模型训练、部署和监控的完整生命周期:
import json
import pickle
import joblib
import numpy as np
import pandas as pd
from typing import Dict, Any, List, Optional, Union, Tuple
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
import boto3
import mlflow
import mlflow.sklearn
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ml-platform")
class ModelType(Enum):
"""模型类型"""
CLASSIFICATION = "classification"
REGRESSION = "regression"
CLUSTERING = "clustering"
DEEP_LEARNING = "deep_learning"
class DeploymentTarget(Enum):
"""部署目标"""
SAGEMAKER = "sagemaker"
LAMBDA = "lambda"
ECS = "ecs"
KUBERNETES = "kubernetes"
@dataclass
class ModelMetadata:
"""模型元数据"""
model_id: str
model_name: str
model_type: ModelType
version: str
created_at: datetime
created_by: str
description: str
tags: Dict[str, str]
metrics: Dict[str, float]
parameters: Dict[str, Any]
@dataclass
class TrainingJob:
"""训练任务"""
job_id: str
model_name: str
dataset_path: str
algorithm: str
hyperparameters: Dict[str, Any]
instance_type: str
instance_count: int
max_runtime_seconds: int
output_path: str
status: str = "CREATED"
class MLPlatform:
"""机器学习平台"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.sagemaker = boto3.client('sagemaker', region_name=config.get('region', 'us-east-1'))
self.s3 = boto3.client('s3', region_name=config.get('region', 'us-east-1'))
self.lambda_client = boto3.client('lambda', region_name=config.get('region', 'us-east-1'))
# 初始化MLflow
mlflow.set_tracking_uri(config.get('mlflow_uri', 'http://localhost:5000'))
self.models_registry = {}
self.training_jobs = {}
def create_training_job(self, job_config: TrainingJob) -> Dict[str, Any]:
"""创建训练任务"""
try:
# 准备训练任务配置
training_job_config = {
'TrainingJobName': job_config.job_id,
'AlgorithmSpecification': {
'TrainingImage': self._get_algorithm_image(job_config.algorithm),
'TrainingInputMode': 'File'
},
'RoleArn': self.config['sagemaker_role'],
'InputDataConfig': [
{
'ChannelName': 'training',
'DataSource': {
'S3DataSource': {
'S3DataType': 'S3Prefix',
'S3Uri': job_config.dataset_path,
'S3DataDistributionType': 'FullyReplicated'
}
},
'ContentType': 'text/csv',
'CompressionType': 'None'
}
],
'OutputDataConfig': {
'S3OutputPath': job_config.output_path
},
'ResourceConfig': {
'InstanceType': job_config.instance_type,
'InstanceCount': job_config.instance_count,
'VolumeSizeInGB': 30
},
'StoppingCondition': {
'MaxRuntimeInSeconds': job_config.max_runtime_seconds
},
'HyperParameters': {k: str(v) for k, v in job_config.hyperparameters.items()}
}
# 启动训练任务
response = self.sagemaker.create_training_job(**training_job_config)
# 更新任务状态
job_config.status = "IN_PROGRESS"
self.training_jobs[job_config.job_id] = job_config
return {
"status": "success",
"job_id": job_config.job_id,
"training_job_arn": response['TrainingJobArn']
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def _get_algorithm_image(self, algorithm: str) -> str:
"""获取算法镜像"""
algorithm_images = {
"xgboost": "246618743249.dkr.ecr.us-west-2.amazonaws.com/xgboost:latest",
"sklearn": "246618743249.dkr.ecr.us-west-2.amazonaws.com/scikit-learn:0.23-1-cpu-py3",
"tensorflow": "246618743249.dkr.ecr.us-west-2.amazonaws.com/tensorflow-training:2.4.1-cpu-py37",
"pytorch": "246618743249.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:1.8.1-cpu-py36"
}
return algorithm_images.get(algorithm, algorithm_images["sklearn"])
def get_training_job_status(self, job_id: str) -> Dict[str, Any]:
"""获取训练任务状态"""
try:
response = self.sagemaker.describe_training_job(TrainingJobName=job_id)
status_info = {
"job_id": job_id,
"status": response['TrainingJobStatus'],
"creation_time": response['CreationTime'],
"training_start_time": response.get('TrainingStartTime'),
"training_end_time": response.get('TrainingEndTime'),
"model_artifacts": response.get('ModelArtifacts', {}).get('S3ModelArtifacts'),
"final_metric_data_list": response.get('FinalMetricDataList', [])
}
# 更新本地状态
if job_id in self.training_jobs:
self.training_jobs[job_id].status = response['TrainingJobStatus']
return status_info
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def register_model(self, model_metadata: ModelMetadata, model_path: str) -> Dict[str, Any]:
"""注册模型"""
try:
# 使用MLflow注册模型
with mlflow.start_run():
# 记录模型参数和指标
mlflow.log_params(model_metadata.parameters)
mlflow.log_metrics(model_metadata.metrics)
# 记录模型
if model_metadata.model_type == ModelType.CLASSIFICATION:
# 假设是sklearn模型
model = joblib.load(model_path)
mlflow.sklearn.log_model(model, "model")
# 注册模型版本
model_uri = f"runs:/{mlflow.active_run().info.run_id}/model"
mlflow.register_model(model_uri, model_metadata.model_name)
# 存储到本地注册表
self.models_registry[model_metadata.model_id] = model_metadata
return {
"status": "success",
"model_id": model_metadata.model_id,
"model_version": model_metadata.version
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def create_model_endpoint(self, model_id: str, endpoint_config: Dict[str, Any]) -> Dict[str, Any]:
"""创建模型端点"""
try:
model_metadata = self.models_registry.get(model_id)
if not model_metadata:
return {"status": "error", "error": "Model not found"}
# 创建SageMaker模型
model_name = f"{model_metadata.model_name}-{model_metadata.version}"
create_model_response = self.sagemaker.create_model(
ModelName=model_name,
PrimaryContainer={
'Image': endpoint_config['inference_image'],
'ModelDataUrl': endpoint_config['model_data_url'],
'Environment': endpoint_config.get('environment', {})
},
ExecutionRoleArn=self.config['sagemaker_role']
)
# 创建端点配置
endpoint_config_name = f"{model_name}-config"
create_endpoint_config_response = self.sagemaker.create_endpoint_config(
EndpointConfigName=endpoint_config_name,
ProductionVariants=[
{
'VariantName': 'primary',
'ModelName': model_name,
'InitialInstanceCount': endpoint_config.get('instance_count', 1),
'InstanceType': endpoint_config.get('instance_type', 'ml.t2.medium'),
'InitialVariantWeight': 1
}
]
)
# 创建端点
endpoint_name = f"{model_name}-endpoint"
create_endpoint_response = self.sagemaker.create_endpoint(
EndpointName=endpoint_name,
EndpointConfigName=endpoint_config_name
)
return {
"status": "success",
"endpoint_name": endpoint_name,
"endpoint_arn": create_endpoint_response['EndpointArn']
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def invoke_endpoint(self, endpoint_name: str, input_data: Any) -> Dict[str, Any]:
"""调用模型端点"""
try:
runtime = boto3.client('sagemaker-runtime', region_name=self.config.get('region', 'us-east-1'))
# 准备输入数据
if isinstance(input_data, (dict, list)):
payload = json.dumps(input_data)
content_type = 'application/json'
else:
payload = str(input_data)
content_type = 'text/csv'
# 调用端点
response = runtime.invoke_endpoint(
EndpointName=endpoint_name,
ContentType=content_type,
Body=payload
)
# 解析响应
result = json.loads(response['Body'].read().decode())
return {
"status": "success",
"prediction": result,
"model_latency_ms": response['ResponseMetadata']['HTTPHeaders'].get('x-amzn-invoked-production-variant-latency', 0)
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def create_batch_transform_job(self, job_config: Dict[str, Any]) -> Dict[str, Any]:
"""创建批量转换任务"""
try:
transform_job_config = {
'TransformJobName': job_config['job_name'],
'ModelName': job_config['model_name'],
'TransformInput': {
'DataSource': {
'S3DataSource': {
'S3DataType': 'S3Prefix',
'S3Uri': job_config['input_data_path']
}
},
'ContentType': job_config.get('content_type', 'text/csv'),
'SplitType': job_config.get('split_type', 'Line')
},
'TransformOutput': {
'S3OutputPath': job_config['output_data_path'],
'Accept': job_config.get('accept', 'text/csv')
},
'TransformResources': {
'InstanceType': job_config.get('instance_type', 'ml.m4.xlarge'),
'InstanceCount': job_config.get('instance_count', 1)
}
}
response = self.sagemaker.create_transform_job(**transform_job_config)
return {
"status": "success",
"job_name": job_config['job_name'],
"transform_job_arn": response['TransformJobArn']
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def setup_model_monitoring(self, endpoint_name: str, monitoring_config: Dict[str, Any]) -> Dict[str, Any]:
"""设置模型监控"""
try:
# 创建数据质量监控
monitoring_schedule_name = f"{endpoint_name}-monitoring"
create_monitoring_schedule_config = {
'MonitoringScheduleName': monitoring_schedule_name,
'MonitoringScheduleConfig': {
'ScheduleConfig': {
'ScheduleExpression': monitoring_config.get('schedule', 'cron(0 * * * ? *)') # 每小时
},
'MonitoringJobDefinition': {
'MonitoringInputs': [
{
'EndpointInput': {
'EndpointName': endpoint_name,
'LocalPath': '/opt/ml/processing/input/endpoint'
}
}
],
'MonitoringOutputConfig': {
'MonitoringOutputs': [
{
'S3Output': {
'S3Uri': monitoring_config['output_s3_uri'],
'LocalPath': '/opt/ml/processing/output'
}
}
]
},
'MonitoringResources': {
'ClusterConfig': {
'InstanceCount': 1,
'InstanceType': 'ml.t3.medium',
'VolumeSizeInGB': 20
}
},
'MonitoringAppSpecification': {
'ImageUri': monitoring_config['monitoring_image'],
'RecordPreprocessorSourceUri': monitoring_config.get('preprocessor_uri'),
'PostAnalyticsProcessorSourceUri': monitoring_config.get('postprocessor_uri')
},
'RoleArn': self.config['sagemaker_role']
}
}
}
response = self.sagemaker.create_monitoring_schedule(**create_monitoring_schedule_config)
return {
"status": "success",
"monitoring_schedule_name": monitoring_schedule_name,
"monitoring_schedule_arn": response['MonitoringScheduleArn']
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
class AutoMLPipeline:
"""自动机器学习管道"""
def __init__(self, ml_platform: MLPlatform):
self.ml_platform = ml_platform
self.experiments = {}
def create_automl_experiment(self, experiment_config: Dict[str, Any]) -> Dict[str, Any]:
"""创建AutoML实验"""
try:
experiment_name = experiment_config['experiment_name']
# 创建SageMaker AutoML任务
automl_job_config = {
'AutoMLJobName': experiment_name,
'InputDataConfig': [
{
'DataSource': {
'S3DataSource': {
'S3DataType': 'S3Prefix',
'S3Uri': experiment_config['training_data_s3_uri']
}
},
'TargetAttributeName': experiment_config['target_column']
}
],
'OutputDataConfig': {
'S3OutputPath': experiment_config['output_s3_uri']
},
'ProblemType': experiment_config.get('problem_type', 'BinaryClassification'),
'AutoMLJobObjective': {
'MetricName': experiment_config.get('objective_metric', 'F1')
},
'RoleArn': self.ml_platform.config['sagemaker_role'],
'GenerateCandidateDefinitionsOnly': False,
'AutoMLJobConfig': {
'CompletionCriteria': {
'MaxCandidates': experiment_config.get('max_candidates', 250),
'MaxRuntimePerTrainingJobInSeconds': experiment_config.get('max_runtime_per_job', 3600),
'MaxAutoMLJobRuntimeInSeconds': experiment_config.get('max_total_runtime', 86400)
}
}
}
response = self.ml_platform.sagemaker.create_auto_ml_job(**automl_job_config)
# 存储实验信息
self.experiments[experiment_name] = {
'config': experiment_config,
'job_arn': response['AutoMLJobArn'],
'status': 'InProgress',
'created_at': datetime.utcnow()
}
return {
"status": "success",
"experiment_name": experiment_name,
"job_arn": response['AutoMLJobArn']
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def get_automl_experiment_status(self, experiment_name: str) -> Dict[str, Any]:
"""获取AutoML实验状态"""
try:
response = self.ml_platform.sagemaker.describe_auto_ml_job(AutoMLJobName=experiment_name)
status_info = {
"experiment_name": experiment_name,
"status": response['AutoMLJobStatus'],
"creation_time": response['CreationTime'],
"end_time": response.get('EndTime'),
"best_candidate": response.get('BestCandidate'),
"auto_ml_job_secondary_status": response.get('AutoMLJobSecondaryStatus'),
"failure_reason": response.get('FailureReason')
}
# 更新本地状态
if experiment_name in self.experiments:
self.experiments[experiment_name]['status'] = response['AutoMLJobStatus']
return status_info
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def get_automl_candidates(self, experiment_name: str) -> Dict[str, Any]:
"""获取AutoML候选模型"""
try:
response = self.ml_platform.sagemaker.list_candidates_for_auto_ml_job(
AutoMLJobName=experiment_name,
MaxResults=50
)
candidates = []
for candidate in response['Candidates']:
candidate_info = {
'candidate_name': candidate['CandidateName'],
'objective_score': candidate.get('FinalAutoMLJobObjectiveMetric', {}).get('Value'),
'candidate_status': candidate['CandidateStatus'],
'creation_time': candidate['CreationTime'],
'inference_containers': candidate.get('InferenceContainers', [])
}
candidates.append(candidate_info)
return {
"status": "success",
"candidates": candidates,
"total_candidates": len(candidates)
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
# 使用示例
def ml_platform_example():
"""机器学习平台示例"""
# 配置平台
config = {
"region": "us-east-1",
"sagemaker_role": "arn:aws:iam::123456789012:role/SageMakerExecutionRole",
"mlflow_uri": "http://mlflow.example.com:5000"
}
platform = MLPlatform(config)
# 创建训练任务
training_job = TrainingJob(
job_id="customer-churn-training-001",
model_name="customer-churn-model",
dataset_path="s3://my-ml-bucket/datasets/customer-churn/train.csv",
algorithm="xgboost",
hyperparameters={
"max_depth": 5,
"eta": 0.2,
"gamma": 4,
"min_child_weight": 6,
"subsample": 0.8,
"objective": "binary:logistic",
"num_round": 100
},
instance_type="ml.m4.xlarge",
instance_count=1,
max_runtime_seconds=3600,
output_path="s3://my-ml-bucket/models/customer-churn/"
)
# 启动训练
print("启动训练任务...")
training_result = platform.create_training_job(training_job)
print(f"训练结果: {training_result}")
# 检查训练状态
print("\n检查训练状态...")
status = platform.get_training_job_status(training_job.job_id)
print(f"训练状态: {status}")
# 注册模型
model_metadata = ModelMetadata(
model_id="customer-churn-v1",
model_name="customer-churn-model",
model_type=ModelType.CLASSIFICATION,
version="1.0",
created_at=datetime.utcnow(),
created_by="data-scientist@company.com",
description="Customer churn prediction model using XGBoost",
tags={"team": "data-science", "use-case": "churn-prediction"},
metrics={"accuracy": 0.85, "precision": 0.82, "recall": 0.88, "f1": 0.85},
parameters=training_job.hyperparameters
)
print("\n注册模型...")
register_result = platform.register_model(
model_metadata,
"s3://my-ml-bucket/models/customer-churn/model.tar.gz"
)
print(f"注册结果: {register_result}")
# 创建模型端点
endpoint_config = {
"inference_image": "246618743249.dkr.ecr.us-west-2.amazonaws.com/xgboost:latest",
"model_data_url": "s3://my-ml-bucket/models/customer-churn/model.tar.gz",
"instance_type": "ml.t2.medium",
"instance_count": 1
}
print("\n创建模型端点...")
endpoint_result = platform.create_model_endpoint("customer-churn-v1", endpoint_config)
print(f"端点结果: {endpoint_result}")
# 设置模型监控
monitoring_config = {
"schedule": "cron(0 * * * ? *)", # 每小时
"output_s3_uri": "s3://my-ml-bucket/monitoring/customer-churn/",
"monitoring_image": "246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-model-monitor-analyzer"
}
print("\n设置模型监控...")
monitoring_result = platform.setup_model_monitoring(
endpoint_result.get("endpoint_name", ""),
monitoring_config
)
print(f"监控结果: {monitoring_result}")
# AutoML示例
automl = AutoMLPipeline(platform)
automl_config = {
"experiment_name": "customer-churn-automl",
"training_data_s3_uri": "s3://my-ml-bucket/datasets/customer-churn/automl-train.csv",
"target_column": "churn",
"output_s3_uri": "s3://my-ml-bucket/automl-output/",
"problem_type": "BinaryClassification",
"objective_metric": "F1",
"max_candidates": 50,
"max_runtime_per_job": 1800,
"max_total_runtime": 7200
}
print("\n创建AutoML实验...")
automl_result = automl.create_automl_experiment(automl_config)
print(f"AutoML结果: {automl_result}")
print("\nML平台示例完成!")
if __name__ == "__main__":
ml_platform_example()
数据治理与安全
数据治理框架
企业级数据治理需要建立完整的数据管理、质量控制和合规性框架:
import json
import hashlib
import uuid
from typing import Dict, Any, List, Optional, Set
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
import boto3
from abc import ABC, abstractmethod
import logging
import re
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("data-governance")
class DataClassification(Enum):
"""数据分类"""
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
class DataSensitivity(Enum):
"""数据敏感性"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class ComplianceFramework(Enum):
"""合规框架"""
GDPR = "gdpr"
CCPA = "ccpa"
HIPAA = "hipaa"
SOX = "sox"
PCI_DSS = "pci_dss"
@dataclass
class DataAsset:
"""数据资产"""
asset_id: str
name: str
description: str
owner: str
steward: str
classification: DataClassification
sensitivity: DataSensitivity
location: str
schema: Dict[str, Any]
tags: Dict[str, str]
created_at: datetime
updated_at: datetime
retention_period_days: int
compliance_frameworks: List[ComplianceFramework]
@dataclass
class DataLineage:
"""数据血缘"""
lineage_id: str
source_asset_id: str
target_asset_id: str
transformation_logic: str
created_at: datetime
created_by: str
@dataclass
class DataQualityRule:
"""数据质量规则"""
rule_id: str
name: str
description: str
rule_type: str # completeness, accuracy, consistency, validity, uniqueness
column_name: str
condition: str
threshold: float
severity: str # warning, error, critical
@dataclass
class DataQualityResult:
"""数据质量结果"""
result_id: str
rule_id: str
asset_id: str
execution_time: datetime
passed: bool
score: float
details: Dict[str, Any]
class DataCatalog:
"""数据目录"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.glue = boto3.client('glue', region_name=config.get('region', 'us-east-1'))
self.assets: Dict[str, DataAsset] = {}
self.lineage: List[DataLineage] = []
def register_data_asset(self, asset: DataAsset) -> Dict[str, Any]:
"""注册数据资产"""
try:
# 存储到本地目录
self.assets[asset.asset_id] = asset
# 在AWS Glue中创建表
database_name = self.config.get('catalog_database', 'data_catalog')
table_name = asset.name.replace('-', '_').lower()
# 构建表结构
columns = []
for column_name, column_info in asset.schema.items():
columns.append({
'Name': column_name,
'Type': column_info.get('type', 'string'),
'Comment': column_info.get('description', '')
})
table_input = {
'Name': table_name,
'Description': asset.description,
'Owner': asset.owner,
'StorageDescriptor': {
'Columns': columns,
'Location': asset.location,
'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
'SerdeInfo': {
'SerializationLibrary': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
}
},
'Parameters': {
'classification': asset.classification.value,
'sensitivity': asset.sensitivity.value,
'owner': asset.owner,
'steward': asset.steward,
'retention_days': str(asset.retention_period_days),
'compliance_frameworks': ','.join([f.value for f in asset.compliance_frameworks])
}
}
# 添加标签
for key, value in asset.tags.items():
table_input['Parameters'][f'tag_{key}'] = value
# 创建表
self.glue.create_table(
DatabaseName=database_name,
TableInput=table_input
)
logger.info(f"Data asset {asset.asset_id} registered successfully")
return {
"status": "success",
"asset_id": asset.asset_id,
"catalog_table": f"{database_name}.{table_name}"
}
except Exception as e:
logger.error(f"Error registering data asset: {e}")
return {
"status": "error",
"error": str(e)
}
def search_assets(self, query: str, filters: Optional[Dict[str, Any]] = None) -> List[DataAsset]:
"""搜索数据资产"""
results = []
for asset in self.assets.values():
# 文本搜索
if query.lower() in asset.name.lower() or query.lower() in asset.description.lower():
match = True
# 应用过滤器
if filters:
if 'classification' in filters and asset.classification != filters['classification']:
match = False
if 'owner' in filters and asset.owner != filters['owner']:
match = False
if 'tags' in filters:
for tag_key, tag_value in filters['tags'].items():
if asset.tags.get(tag_key) != tag_value:
match = False
break
if match:
results.append(asset)
return results
def add_lineage(self, lineage: DataLineage) -> Dict[str, Any]:
"""添加数据血缘"""
try:
self.lineage.append(lineage)
logger.info(f"Data lineage {lineage.lineage_id} added successfully")
return {
"status": "success",
"lineage_id": lineage.lineage_id
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def get_lineage_graph(self, asset_id: str, direction: str = "both") -> Dict[str, Any]:
"""获取数据血缘图"""
upstream = []
downstream = []
for lineage in self.lineage:
if direction in ["upstream", "both"] and lineage.target_asset_id == asset_id:
upstream.append({
"source_asset_id": lineage.source_asset_id,
"transformation": lineage.transformation_logic,
"created_by": lineage.created_by
})
if direction in ["downstream", "both"] and lineage.source_asset_id == asset_id:
downstream.append({
"target_asset_id": lineage.target_asset_id,
"transformation": lineage.transformation_logic,
"created_by": lineage.created_by
})
return {
"asset_id": asset_id,
"upstream": upstream,
"downstream": downstream
}
class DataQualityEngine:
"""数据质量引擎"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.rules: Dict[str, DataQualityRule] = {}
self.results: List[DataQualityResult] = []
def add_quality_rule(self, rule: DataQualityRule) -> Dict[str, Any]:
"""添加数据质量规则"""
try:
self.rules[rule.rule_id] = rule
logger.info(f"Data quality rule {rule.rule_id} added successfully")
return {
"status": "success",
"rule_id": rule.rule_id
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def execute_quality_checks(self, asset_id: str, data_sample: Any) -> List[DataQualityResult]:
"""执行数据质量检查"""
results = []
for rule in self.rules.values():
try:
result = self._execute_single_rule(rule, asset_id, data_sample)
results.append(result)
self.results.append(result)
except Exception as e:
logger.error(f"Error executing rule {rule.rule_id}: {e}")
# 创建失败结果
error_result = DataQualityResult(
result_id=str(uuid.uuid4()),
rule_id=rule.rule_id,
asset_id=asset_id,
execution_time=datetime.utcnow(),
passed=False,
score=0.0,
details={"error": str(e)}
)
results.append(error_result)
self.results.append(error_result)
return results
def _execute_single_rule(self, rule: DataQualityRule, asset_id: str, data_sample: Any) -> DataQualityResult:
"""执行单个质量规则"""
import pandas as pd
# 假设data_sample是pandas DataFrame
if not isinstance(data_sample, pd.DataFrame):
raise ValueError("Data sample must be a pandas DataFrame")
result_id = str(uuid.uuid4())
execution_time = datetime.utcnow()
if rule.rule_type == "completeness":
# 完整性检查
total_rows = len(data_sample)
non_null_rows = data_sample[rule.column_name].notna().sum()
score = non_null_rows / total_rows if total_rows > 0 else 0
passed = score >= rule.threshold
details = {
"total_rows": total_rows,
"non_null_rows": int(non_null_rows),
"null_rows": total_rows - int(non_null_rows),
"completeness_rate": score
}
elif rule.rule_type == "uniqueness":
# 唯一性检查
total_rows = len(data_sample)
unique_rows = data_sample[rule.column_name].nunique()
score = unique_rows / total_rows if total_rows > 0 else 0
passed = score >= rule.threshold
details = {
"total_rows": total_rows,
"unique_values": unique_rows,
"duplicate_rows": total_rows - unique_rows,
"uniqueness_rate": score
}
elif rule.rule_type == "validity":
# 有效性检查(基于正则表达式)
if rule.condition.startswith("regex:"):
pattern = rule.condition[6:] # 移除"regex:"前缀
valid_rows = data_sample[rule.column_name].astype(str).str.match(pattern).sum()
total_rows = len(data_sample)
score = valid_rows / total_rows if total_rows > 0 else 0
passed = score >= rule.threshold
details = {
"total_rows": total_rows,
"valid_rows": int(valid_rows),
"invalid_rows": total_rows - int(valid_rows),
"validity_rate": score,
"pattern": pattern
}
else:
raise ValueError(f"Unsupported validity condition: {rule.condition}")
elif rule.rule_type == "range":
# 范围检查
if rule.condition.startswith("between:"):
range_str = rule.condition[8:] # 移除"between:"前缀
min_val, max_val = map(float, range_str.split(","))
numeric_data = pd.to_numeric(data_sample[rule.column_name], errors='coerce')
valid_rows = ((numeric_data >= min_val) & (numeric_data <= max_val)).sum()
total_rows = len(data_sample)
score = valid_rows / total_rows if total_rows > 0 else 0
passed = score >= rule.threshold
details = {
"total_rows": total_rows,
"valid_rows": int(valid_rows),
"out_of_range_rows": total_rows - int(valid_rows),
"range_compliance_rate": score,
"min_value": min_val,
"max_value": max_val
}
else:
raise ValueError(f"Unsupported range condition: {rule.condition}")
else:
raise ValueError(f"Unsupported rule type: {rule.rule_type}")
return DataQualityResult(
result_id=result_id,
rule_id=rule.rule_id,
asset_id=asset_id,
execution_time=execution_time,
passed=passed,
score=score,
details=details
)
def get_quality_report(self, asset_id: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> Dict[str, Any]:
"""生成数据质量报告"""
filtered_results = self.results
# 应用过滤器
if asset_id:
filtered_results = [r for r in filtered_results if r.asset_id == asset_id]
if start_date:
filtered_results = [r for r in filtered_results if r.execution_time >= start_date]
if end_date:
filtered_results = [r for r in filtered_results if r.execution_time <= end_date]
if not filtered_results:
return {
"total_checks": 0,
"passed_checks": 0,
"failed_checks": 0,
"overall_score": 0.0,
"results_by_rule": {},
"trends": []
}
# 计算统计信息
total_checks = len(filtered_results)
passed_checks = sum(1 for r in filtered_results if r.passed)
failed_checks = total_checks - passed_checks
overall_score = sum(r.score for r in filtered_results) / total_checks
# 按规则分组结果
results_by_rule = {}
for result in filtered_results:
rule_id = result.rule_id
if rule_id not in results_by_rule:
results_by_rule[rule_id] = {
"rule_name": self.rules[rule_id].name if rule_id in self.rules else "Unknown",
"total_executions": 0,
"passed_executions": 0,
"average_score": 0.0,
"latest_result": None
}
rule_stats = results_by_rule[rule_id]
rule_stats["total_executions"] += 1
if result.passed:
rule_stats["passed_executions"] += 1
# 更新平均分数
current_total = rule_stats["average_score"] * (rule_stats["total_executions"] - 1)
rule_stats["average_score"] = (current_total + result.score) / rule_stats["total_executions"]
# 更新最新结果
if (rule_stats["latest_result"] is None or
result.execution_time > rule_stats["latest_result"]["execution_time"]):
rule_stats["latest_result"] = {
"execution_time": result.execution_time,
"passed": result.passed,
"score": result.score
}
return {
"total_checks": total_checks,
"passed_checks": passed_checks,
"failed_checks": failed_checks,
"overall_score": overall_score,
"pass_rate": passed_checks / total_checks,
"results_by_rule": results_by_rule,
"execution_period": {
"start": min(r.execution_time for r in filtered_results),
"end": max(r.execution_time for r in filtered_results)
}
}
class DataPrivacyManager:
"""数据隐私管理器"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.privacy_policies: Dict[str, Dict[str, Any]] = {}
self.anonymization_rules: Dict[str, Dict[str, Any]] = {}
def create_privacy_policy(self, policy_id: str, policy_config: Dict[str, Any]) -> Dict[str, Any]:
"""创建隐私政策"""
try:
policy = {
"policy_id": policy_id,
"name": policy_config["name"],
"description": policy_config["description"],
"applicable_data_types": policy_config["applicable_data_types"],
"retention_period_days": policy_config["retention_period_days"],
"anonymization_required": policy_config.get("anonymization_required", False),
"encryption_required": policy_config.get("encryption_required", True),
"access_controls": policy_config.get("access_controls", []),
"compliance_frameworks": policy_config.get("compliance_frameworks", []),
"created_at": datetime.utcnow(),
"created_by": policy_config.get("created_by", "system")
}
self.privacy_policies[policy_id] = policy
logger.info(f"Privacy policy {policy_id} created successfully")
return {
"status": "success",
"policy_id": policy_id
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def anonymize_data(self, data: Any, anonymization_config: Dict[str, Any]) -> Dict[str, Any]:
"""数据匿名化"""
try:
import pandas as pd
import hashlib
import random
if not isinstance(data, pd.DataFrame):
raise ValueError("Data must be a pandas DataFrame")
anonymized_data = data.copy()
for column, method in anonymization_config.items():
if column not in anonymized_data.columns:
continue
if method["type"] == "hash":
# 哈希匿名化
salt = method.get("salt", "default_salt")
anonymized_data[column] = anonymized_data[column].apply(
lambda x: hashlib.sha256(f"{x}{salt}".encode()).hexdigest()[:10]
)
elif method["type"] == "mask":
# 掩码匿名化
mask_char = method.get("mask_char", "*")
preserve_chars = method.get("preserve_chars", 2)
def mask_value(value):
if pd.isna(value):
return value
str_value = str(value)
if len(str_value) <= preserve_chars:
return mask_char * len(str_value)
return str_value[:preserve_chars] + mask_char * (len(str_value) - preserve_chars)
anonymized_data[column] = anonymized_data[column].apply(mask_value)
elif method["type"] == "generalize":
# 泛化匿名化
if method.get("age_ranges"):
# 年龄范围泛化
def generalize_age(age):
if pd.isna(age):
return age
age = int(age)
for range_def in method["age_ranges"]:
if range_def["min"] <= age <= range_def["max"]:
return range_def["label"]
return "Unknown"
anonymized_data[column] = anonymized_data[column].apply(generalize_age)
elif method["type"] == "noise":
# 添加噪声
noise_level = method.get("noise_level", 0.1)
numeric_data = pd.to_numeric(anonymized_data[column], errors='coerce')
noise = numeric_data * noise_level * (2 * pd.Series([random.random() for _ in range(len(numeric_data))]) - 1)
anonymized_data[column] = numeric_data + noise
elif method["type"] == "remove":
# 移除敏感列
anonymized_data = anonymized_data.drop(columns=[column])
return {
"status": "success",
"anonymized_data": anonymized_data,
"original_rows": len(data),
"anonymized_rows": len(anonymized_data),
"anonymization_summary": {
column: method["type"] for column, method in anonymization_config.items()
}
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def check_compliance(self, asset: DataAsset, framework: ComplianceFramework) -> Dict[str, Any]:
"""检查合规性"""
compliance_checks = []
if framework == ComplianceFramework.GDPR:
# GDPR合规检查
checks = [
{
"check": "data_classification",
"passed": asset.classification in [DataClassification.PUBLIC, DataClassification.INTERNAL],
"description": "Data must be properly classified"
},
{
"check": "retention_policy",
"passed": asset.retention_period_days > 0,
"description": "Data retention period must be defined"
},
{
"check": "data_owner",
"passed": bool(asset.owner),
"description": "Data owner must be assigned"
},
{
"check": "sensitive_data_protection",
"passed": asset.sensitivity != DataSensitivity.CRITICAL or asset.classification == DataClassification.RESTRICTED,
"description": "Critical sensitive data must be restricted"
}
]
compliance_checks.extend(checks)
elif framework == ComplianceFramework.HIPAA:
# HIPAA合规检查
checks = [
{
"check": "phi_identification",
"passed": "phi" in asset.tags,
"description": "PHI data must be properly tagged"
},
{
"check": "access_controls",
"passed": asset.classification in [DataClassification.CONFIDENTIAL, DataClassification.RESTRICTED],
"description": "PHI must have appropriate access controls"
},
{
"check": "encryption_required",
"passed": "encrypted" in asset.tags and asset.tags["encrypted"] == "true",
"description": "PHI must be encrypted"
}
]
compliance_checks.extend(checks)
# 计算合规分数
total_checks = len(compliance_checks)
passed_checks = sum(1 for check in compliance_checks if check["passed"])
compliance_score = passed_checks / total_checks if total_checks > 0 else 0
return {
"framework": framework.value,
"asset_id": asset.asset_id,
"compliance_score": compliance_score,
"total_checks": total_checks,
"passed_checks": passed_checks,
"failed_checks": total_checks - passed_checks,
"checks": compliance_checks,
"overall_status": "compliant" if compliance_score >= 0.8 else "non_compliant"
}
# 使用示例
def data_governance_example():
"""数据治理示例"""
import pandas as pd
# 配置
config = {
"region": "us-east-1",
"catalog_database": "enterprise_catalog"
}
# 初始化组件
catalog = DataCatalog(config)
quality_engine = DataQualityEngine(config)
privacy_manager = DataPrivacyManager(config)
# 注册数据资产
customer_asset = DataAsset(
asset_id="customer-data-001",
name="customer-master-data",
description="Customer master data including personal information",
owner="data-team@company.com",
steward="john.doe@company.com",
classification=DataClassification.CONFIDENTIAL,
sensitivity=DataSensitivity.HIGH,
location="s3://data-lake/customer/",
schema={
"customer_id": {"type": "string", "description": "Unique customer identifier"},
"first_name": {"type": "string", "description": "Customer first name"},
"last_name": {"type": "string", "description": "Customer last name"},
"email": {"type": "string", "description": "Customer email address"},
"phone": {"type": "string", "description": "Customer phone number"},
"birth_date": {"type": "date", "description": "Customer birth date"},
"registration_date": {"type": "timestamp", "description": "Account registration date"}
},
tags={"team": "customer-analytics", "pii": "true", "encrypted": "true"},
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
retention_period_days=2555, # 7年
compliance_frameworks=[ComplianceFramework.GDPR, ComplianceFramework.CCPA]
)
print("注册数据资产...")
register_result = catalog.register_data_asset(customer_asset)
print(f"注册结果: {register_result}")
# 添加数据质量规则
quality_rules = [
DataQualityRule(
rule_id="email-completeness",
name="Email Completeness",
description="Email field should be complete",
rule_type="completeness",
column_name="email",
condition="not_null",
threshold=0.95,
severity="error"
),
DataQualityRule(
rule_id="email-validity",
name="Email Validity",
description="Email should be in valid format",
rule_type="validity",
column_name="email",
condition="regex:^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$",
threshold=0.98,
severity="error"
),
DataQualityRule(
rule_id="customer-id-uniqueness",
name="Customer ID Uniqueness",
description="Customer ID should be unique",
rule_type="uniqueness",
column_name="customer_id",
condition="unique",
threshold=1.0,
severity="critical"
)
]
print("\n添加数据质量规则...")
for rule in quality_rules:
rule_result = quality_engine.add_quality_rule(rule)
print(f"规则 {rule.name}: {rule_result}")
# 模拟数据样本
sample_data = pd.DataFrame({
'customer_id': ['C001', 'C002', 'C003', 'C004', 'C005'],
'first_name': ['John', 'Jane', 'Bob', 'Alice', 'Charlie'],
'last_name': ['Doe', 'Smith', 'Johnson', 'Brown', 'Wilson'],
'email': ['john@example.com', 'jane@example.com', 'invalid-email', 'alice@example.com', None],
'phone': ['+1234567890', '+1234567891', '+1234567892', '+1234567893', '+1234567894'],
'birth_date': ['1990-01-01', '1985-05-15', '1992-12-25', '1988-07-30', '1995-03-10']
})
# 执行数据质量检查
print("\n执行数据质量检查...")
quality_results = quality_engine.execute_quality_checks("customer-data-001", sample_data)
for result in quality_results:
print(f"规则 {result.rule_id}: {'通过' if result.passed else '失败'} (分数: {result.score:.2f})")
print(f" 详情: {result.details}")
# 生成质量报告
print("\n生成数据质量报告...")
quality_report = quality_engine.get_quality_report("customer-data-001")
print(f"总体质量分数: {quality_report['overall_score']:.2f}")
print(f"通过率: {quality_report['pass_rate']:.2%}")
# 数据匿名化
anonymization_config = {
"first_name": {"type": "mask", "mask_char": "*", "preserve_chars": 1},
"last_name": {"type": "mask", "mask_char": "*", "preserve_chars": 1},
"email": {"type": "hash", "salt": "customer_salt_2024"},
"phone": {"type": "mask", "mask_char": "X", "preserve_chars": 3}
}
print("\n执行数据匿名化...")
anonymization_result = privacy_manager.anonymize_data(sample_data, anonymization_config)
if anonymization_result["status"] == "success":
print("匿名化成功")
print("匿名化后的数据:")
print(anonymization_result["anonymized_data"])
# 合规性检查
print("\n执行GDPR合规性检查...")
gdpr_compliance = privacy_manager.check_compliance(customer_asset, ComplianceFramework.GDPR)
print(f"GDPR合规分数: {gdpr_compliance['compliance_score']:.2f}")
print(f"合规状态: {gdpr_compliance['overall_status']}")
for check in gdpr_compliance['checks']:
status = "✓" if check['passed'] else "✗"
print(f" {status} {check['check']}: {check['description']}")
print("\n数据治理示例完成!")
if __name__ == "__main__":
data_governance_example()
最佳实践与建议
架构设计原则
-
分层架构设计
- 数据采集层:统一数据接入标准
- 数据存储层:湖仓一体化架构
- 数据处理层:批流一体化处理
- 数据服务层:统一数据服务接口
- 数据应用层:面向业务的数据产品
-
数据治理优先
- 建立数据标准和规范
- 实施数据质量管控
- 确保数据安全和隐私保护
- 建立数据血缘和影响分析
-
技术选型策略
- 优先选择云原生服务
- 考虑成本效益和性能平衡
- 确保技术栈的兼容性
- 关注长期维护和扩展性
实施建议
-
分阶段实施
- 第一阶段:建立数据湖基础设施
- 第二阶段:实现核心数据管道
- 第三阶段:构建AI/ML平台
- 第四阶段:完善数据治理体系
-
团队建设
- 数据工程师:负责数据管道开发
- 数据科学家:负责模型开发和分析
- 数据治理专员:负责数据质量和合规
- 平台工程师:负责基础设施维护
-
监控和运维
- 建立全面的监控体系
- 实施自动化运维流程
- 定期进行性能优化
- 建立故障应急响应机制
总结
云数据架构的设计需要综合考虑数据湖、数据仓库、实时处理、AI平台和数据治理等多个方面。通过采用现代化的技术栈和最佳实践,企业可以构建一个高效、安全、可扩展的大数据和AI平台,为业务创新提供强有力的数据支撑。
成功的云数据架构应该具备以下特征:
- 统一性:统一的数据标准和接口
- 灵活性:支持多种数据类型和处理模式
- 可扩展性:能够随业务增长而扩展
- 安全性:全面的数据安全和隐私保护
- 智能化:内置AI/ML能力支持智能决策
随着技术的不断发展,云数据架构也将持续演进,企业需要保持技术敏感性,及时采用新技术来提升数据处理能力和业务价值。