跳转到主要内容

云数据架构:大数据与AI平台设计

博主
38 分钟
7981 字
--

AI 导读

深刻理解和准确把握"云数据架构:大数据与AI平台设计"这一重要概念的核心要义,本文从理论基础、实践应用和发展前景等多个维度进行了系统性阐述,为读者提供了全面而深入的分析视角。

内容由AI智能生成

云数据架构:大数据与AI平台设计

目录

  1. 引言
  2. 数据架构概述
  3. 数据湖架构设计
  4. 数据仓库与数据集市
  5. 实时数据处理
  6. 机器学习平台
  7. 数据治理与质量
  8. 安全与合规
  9. 性能优化
  10. 最佳实践与建议
  11. 总结

引言

在数字化转型的浪潮中,数据已成为企业最宝贵的资产。云数据架构为组织提供了处理海量数据、构建智能应用的强大平台。本文将深入探讨如何在云环境中设计和实施大数据与AI平台,涵盖从数据采集、存储、处理到分析的完整生命周期。

现代云数据架构需要支持多样化的数据源、实时和批处理工作负载、机器学习工作流,同时确保数据安全、治理和合规性。通过合理的架构设计,企业可以构建敏捷、可扩展且成本效益高的数据平台。

数据架构概述

现代数据架构模式

graph TB
    subgraph "数据源层"
        A[业务系统] --> D[数据采集层]
        B[IoT设备] --> D
        C[外部API] --> D
        E[日志文件] --> D
        F[流数据] --> D
    end
    
    subgraph "数据采集层"
        D --> G[批量ETL]
        D --> H[流处理]
        D --> I[CDC变更捕获]
    end
    
    subgraph "数据存储层"
        G --> J[数据湖]
        H --> J
        I --> J
        J --> K[数据仓库]
        J --> L[特征存储]
    end
    
    subgraph "数据处理层"
        K --> M[批处理引擎]
        J --> N[流处理引擎]
        L --> O[ML训练]
    end
    
    subgraph "数据服务层"
        M --> P[数据API]
        N --> P
        O --> Q[模型服务]
        P --> R[BI工具]
        Q --> S[AI应用]
    end
    
    subgraph "数据治理层"
        T[元数据管理] -.-> J
        T -.-> K
        T -.-> L
        U[数据质量] -.-> M
        U -.-> N
        V[数据安全] -.-> P
        V -.-> Q
    end

数据架构分析器

import json
import time
import uuid
from typing import Dict, Any, List, Optional, Union, Tuple
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
import boto3
import pandas as pd
import numpy as np
from abc import ABC, abstractmethod
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("data-architecture")

class DataSourceType(Enum):
    """数据源类型"""
    RELATIONAL_DB = "relational_db"
    NOSQL_DB = "nosql_db"
    FILE_SYSTEM = "file_system"
    STREAMING = "streaming"
    API = "api"
    IOT = "iot"
    LOG = "log"

class DataFormat(Enum):
    """数据格式"""
    JSON = "json"
    CSV = "csv"
    PARQUET = "parquet"
    AVRO = "avro"
    ORC = "orc"
    XML = "xml"
    BINARY = "binary"

class ProcessingType(Enum):
    """处理类型"""
    BATCH = "batch"
    STREAMING = "streaming"
    MICRO_BATCH = "micro_batch"
    REAL_TIME = "real_time"

@dataclass
class DataSource:
    """数据源定义"""
    id: str
    name: str
    type: DataSourceType
    format: DataFormat
    location: str
    schema: Dict[str, Any]
    volume_gb_per_day: float
    velocity_records_per_second: int
    variety_score: int  # 1-10, 数据多样性评分
    created_at: datetime
    updated_at: datetime

@dataclass
class DataPipeline:
    """数据管道定义"""
    id: str
    name: str
    source_ids: List[str]
    processing_type: ProcessingType
    transformations: List[Dict[str, Any]]
    destination: str
    schedule: Optional[str]
    sla_minutes: int
    created_at: datetime
    updated_at: datetime

@dataclass
class DataAsset:
    """数据资产定义"""
    id: str
    name: str
    description: str
    owner: str
    tags: List[str]
    schema: Dict[str, Any]
    quality_score: float
    usage_frequency: str
    business_value: str
    created_at: datetime
    updated_at: datetime

class DataArchitectureAnalyzer:
    """数据架构分析器"""
    
    def __init__(self, aws_region: str = "us-east-1"):
        self.aws_region = aws_region
        self.data_sources: Dict[str, DataSource] = {}
        self.data_pipelines: Dict[str, DataPipeline] = {}
        self.data_assets: Dict[str, DataAsset] = {}
        
        # AWS服务客户端
        self.s3 = boto3.client('s3', region_name=aws_region)
        self.glue = boto3.client('glue', region_name=aws_region)
        self.athena = boto3.client('athena', region_name=aws_region)
        self.redshift = boto3.client('redshift', region_name=aws_region)
        self.kinesis = boto3.client('kinesis', region_name=aws_region)
    
    def add_data_source(self, data_source: DataSource):
        """添加数据源"""
        self.data_sources[data_source.id] = data_source
        logger.info(f"Added data source: {data_source.name}")
    
    def add_data_pipeline(self, pipeline: DataPipeline):
        """添加数据管道"""
        self.data_pipelines[pipeline.id] = pipeline
        logger.info(f"Added data pipeline: {pipeline.name}")
    
    def add_data_asset(self, asset: DataAsset):
        """添加数据资产"""
        self.data_assets[asset.id] = asset
        logger.info(f"Added data asset: {asset.name}")
    
    def analyze_data_volume(self) -> Dict[str, Any]:
        """分析数据量"""
        total_volume = sum(source.volume_gb_per_day for source in self.data_sources.values())
        volume_by_type = {}
        
        for source in self.data_sources.values():
            source_type = source.type.value
            if source_type not in volume_by_type:
                volume_by_type[source_type] = 0
            volume_by_type[source_type] += source.volume_gb_per_day
        
        return {
            "total_volume_gb_per_day": total_volume,
            "volume_by_type": volume_by_type,
            "projected_monthly_volume_gb": total_volume * 30,
            "projected_yearly_volume_tb": (total_volume * 365) / 1024
        }
    
    def analyze_data_velocity(self) -> Dict[str, Any]:
        """分析数据速度"""
        total_velocity = sum(source.velocity_records_per_second for source in self.data_sources.values())
        velocity_by_type = {}
        
        for source in self.data_sources.values():
            source_type = source.type.value
            if source_type not in velocity_by_type:
                velocity_by_type[source_type] = 0
            velocity_by_type[source_type] += source.velocity_records_per_second
        
        # 分类速度级别
        speed_categories = {
            "low": 0,      # < 100 records/sec
            "medium": 0,   # 100-1000 records/sec
            "high": 0,     # 1000-10000 records/sec
            "very_high": 0 # > 10000 records/sec
        }
        
        for source in self.data_sources.values():
            velocity = source.velocity_records_per_second
            if velocity < 100:
                speed_categories["low"] += 1
            elif velocity < 1000:
                speed_categories["medium"] += 1
            elif velocity < 10000:
                speed_categories["high"] += 1
            else:
                speed_categories["very_high"] += 1
        
        return {
            "total_velocity_records_per_second": total_velocity,
            "velocity_by_type": velocity_by_type,
            "speed_distribution": speed_categories,
            "peak_daily_records": total_velocity * 86400
        }
    
    def analyze_data_variety(self) -> Dict[str, Any]:
        """分析数据多样性"""
        format_distribution = {}
        type_distribution = {}
        variety_scores = []
        
        for source in self.data_sources.values():
            # 格式分布
            format_name = source.format.value
            if format_name not in format_distribution:
                format_distribution[format_name] = 0
            format_distribution[format_name] += 1
            
            # 类型分布
            type_name = source.type.value
            if type_name not in type_distribution:
                type_distribution[type_name] = 0
            type_distribution[type_name] += 1
            
            # 多样性评分
            variety_scores.append(source.variety_score)
        
        avg_variety_score = np.mean(variety_scores) if variety_scores else 0
        
        return {
            "format_distribution": format_distribution,
            "type_distribution": type_distribution,
            "average_variety_score": avg_variety_score,
            "total_data_sources": len(self.data_sources),
            "unique_formats": len(format_distribution),
            "unique_types": len(type_distribution)
        }
    
    def analyze_pipeline_complexity(self) -> Dict[str, Any]:
        """分析管道复杂性"""
        processing_type_distribution = {}
        transformation_complexity = []
        sla_distribution = {"under_1h": 0, "1h_to_4h": 0, "4h_to_24h": 0, "over_24h": 0}
        
        for pipeline in self.data_pipelines.values():
            # 处理类型分布
            proc_type = pipeline.processing_type.value
            if proc_type not in processing_type_distribution:
                processing_type_distribution[proc_type] = 0
            processing_type_distribution[proc_type] += 1
            
            # 转换复杂性
            transformation_complexity.append(len(pipeline.transformations))
            
            # SLA分布
            sla_hours = pipeline.sla_minutes / 60
            if sla_hours < 1:
                sla_distribution["under_1h"] += 1
            elif sla_hours < 4:
                sla_distribution["1h_to_4h"] += 1
            elif sla_hours < 24:
                sla_distribution["4h_to_24h"] += 1
            else:
                sla_distribution["over_24h"] += 1
        
        avg_transformations = np.mean(transformation_complexity) if transformation_complexity else 0
        
        return {
            "total_pipelines": len(self.data_pipelines),
            "processing_type_distribution": processing_type_distribution,
            "average_transformations_per_pipeline": avg_transformations,
            "sla_distribution": sla_distribution,
            "complex_pipelines": len([p for p in self.data_pipelines.values() 
                                    if len(p.transformations) > 5])
        }
    
    def generate_architecture_recommendations(self) -> Dict[str, Any]:
        """生成架构建议"""
        volume_analysis = self.analyze_data_volume()
        velocity_analysis = self.analyze_data_velocity()
        variety_analysis = self.analyze_data_variety()
        pipeline_analysis = self.analyze_pipeline_complexity()
        
        recommendations = {
            "storage_recommendations": [],
            "processing_recommendations": [],
            "architecture_patterns": [],
            "technology_stack": [],
            "cost_optimization": [],
            "performance_optimization": []
        }
        
        # 存储建议
        total_volume = volume_analysis["total_volume_gb_per_day"]
        if total_volume > 1000:  # > 1TB/day
            recommendations["storage_recommendations"].append({
                "type": "data_lake",
                "reason": "大数据量需要可扩展的数据湖架构",
                "technology": ["Amazon S3", "Azure Data Lake", "Google Cloud Storage"]
            })
        
        if variety_analysis["unique_formats"] > 3:
            recommendations["storage_recommendations"].append({
                "type": "multi_format_support",
                "reason": "多种数据格式需要灵活的存储方案",
                "technology": ["Apache Parquet", "Delta Lake", "Apache Iceberg"]
            })
        
        # 处理建议
        total_velocity = velocity_analysis["total_velocity_records_per_second"]
        if total_velocity > 1000:
            recommendations["processing_recommendations"].append({
                "type": "stream_processing",
                "reason": "高速数据流需要实时处理能力",
                "technology": ["Apache Kafka", "Amazon Kinesis", "Apache Flink"]
            })
        
        if pipeline_analysis["complex_pipelines"] > 0:
            recommendations["processing_recommendations"].append({
                "type": "workflow_orchestration",
                "reason": "复杂管道需要工作流编排",
                "technology": ["Apache Airflow", "AWS Step Functions", "Azure Data Factory"]
            })
        
        # 架构模式建议
        if (total_volume > 100 and total_velocity > 100 and 
            variety_analysis["average_variety_score"] > 5):
            recommendations["architecture_patterns"].append({
                "pattern": "lambda_architecture",
                "description": "批处理和流处理混合架构",
                "use_case": "处理大量、高速、多样化数据"
            })
        
        if variety_analysis["unique_types"] > 5:
            recommendations["architecture_patterns"].append({
                "pattern": "data_mesh",
                "description": "去中心化数据架构",
                "use_case": "多域数据管理和治理"
            })
        
        return recommendations
    
    def generate_cost_estimation(self) -> Dict[str, Any]:
        """生成成本估算"""
        volume_analysis = self.analyze_data_volume()
        velocity_analysis = self.analyze_data_velocity()
        
        # 存储成本估算(基于AWS S3定价)
        monthly_volume_gb = volume_analysis["projected_monthly_volume_gb"]
        storage_cost_per_gb = 0.023  # AWS S3标准存储
        monthly_storage_cost = monthly_volume_gb * storage_cost_per_gb
        
        # 计算成本估算(基于数据处理量)
        daily_records = velocity_analysis["peak_daily_records"]
        processing_cost_per_million_records = 1.0  # 假设成本
        monthly_processing_cost = (daily_records * 30 / 1000000) * processing_cost_per_million_records
        
        # 传输成本估算
        monthly_transfer_cost = monthly_volume_gb * 0.09  # 数据传输成本
        
        total_monthly_cost = monthly_storage_cost + monthly_processing_cost + monthly_transfer_cost
        
        return {
            "monthly_costs": {
                "storage": monthly_storage_cost,
                "processing": monthly_processing_cost,
                "transfer": monthly_transfer_cost,
                "total": total_monthly_cost
            },
            "yearly_projection": total_monthly_cost * 12,
            "cost_breakdown": {
                "storage_percentage": (monthly_storage_cost / total_monthly_cost) * 100,
                "processing_percentage": (monthly_processing_cost / total_monthly_cost) * 100,
                "transfer_percentage": (monthly_transfer_cost / total_monthly_cost) * 100
            }
        }
    
    def generate_architecture_report(self) -> Dict[str, Any]:
        """生成架构报告"""
        return {
            "timestamp": datetime.utcnow().isoformat(),
            "summary": {
                "total_data_sources": len(self.data_sources),
                "total_pipelines": len(self.data_pipelines),
                "total_assets": len(self.data_assets)
            },
            "volume_analysis": self.analyze_data_volume(),
            "velocity_analysis": self.analyze_data_velocity(),
            "variety_analysis": self.analyze_data_variety(),
            "pipeline_analysis": self.analyze_pipeline_complexity(),
            "recommendations": self.generate_architecture_recommendations(),
            "cost_estimation": self.generate_cost_estimation()
        }

# 使用示例
def data_architecture_example():
    """数据架构示例"""
    analyzer = DataArchitectureAnalyzer()
    
    # 添加数据源
    sources = [
        DataSource(
            id="src-001",
            name="用户行为数据库",
            type=DataSourceType.RELATIONAL_DB,
            format=DataFormat.JSON,
            location="mysql://prod-db/user_events",
            schema={"user_id": "string", "event_type": "string", "timestamp": "datetime"},
            volume_gb_per_day=50.0,
            velocity_records_per_second=500,
            variety_score=6,
            created_at=datetime.utcnow(),
            updated_at=datetime.utcnow()
        ),
        DataSource(
            id="src-002",
            name="IoT传感器数据",
            type=DataSourceType.STREAMING,
            format=DataFormat.AVRO,
            location="kafka://iot-cluster/sensor-data",
            schema={"device_id": "string", "temperature": "float", "humidity": "float"},
            volume_gb_per_day=200.0,
            velocity_records_per_second=2000,
            variety_score=8,
            created_at=datetime.utcnow(),
            updated_at=datetime.utcnow()
        ),
        DataSource(
            id="src-003",
            name="应用日志",
            type=DataSourceType.LOG,
            format=DataFormat.JSON,
            location="s3://logs-bucket/app-logs/",
            schema={"timestamp": "datetime", "level": "string", "message": "string"},
            volume_gb_per_day=100.0,
            velocity_records_per_second=1000,
            variety_score=4,
            created_at=datetime.utcnow(),
            updated_at=datetime.utcnow()
        )
    ]
    
    for source in sources:
        analyzer.add_data_source(source)
    
    # 添加数据管道
    pipelines = [
        DataPipeline(
            id="pipe-001",
            name="用户行为ETL",
            source_ids=["src-001"],
            processing_type=ProcessingType.BATCH,
            transformations=[
                {"type": "filter", "condition": "event_type != 'test'"},
                {"type": "aggregate", "group_by": "user_id", "metrics": ["count", "avg"]},
                {"type": "join", "table": "user_profiles", "key": "user_id"}
            ],
            destination="s3://data-lake/user-behavior/",
            schedule="0 2 * * *",  # 每天凌晨2点
            sla_minutes=120,
            created_at=datetime.utcnow(),
            updated_at=datetime.utcnow()
        ),
        DataPipeline(
            id="pipe-002",
            name="IoT实时处理",
            source_ids=["src-002"],
            processing_type=ProcessingType.STREAMING,
            transformations=[
                {"type": "validate", "schema": "iot_schema"},
                {"type": "enrich", "lookup": "device_metadata"},
                {"type": "window", "type": "tumbling", "duration": "5m"},
                {"type": "aggregate", "metrics": ["avg", "max", "min"]}
            ],
            destination="kinesis://processed-iot-stream",
            schedule=None,
            sla_minutes=5,
            created_at=datetime.utcnow(),
            updated_at=datetime.utcnow()
        )
    ]
    
    for pipeline in pipelines:
        analyzer.add_data_pipeline(pipeline)
    
    # 添加数据资产
    assets = [
        DataAsset(
            id="asset-001",
            name="用户行为分析表",
            description="用户行为数据的聚合分析结果",
            owner="数据团队",
            tags=["用户分析", "行为数据", "营销"],
            schema={"user_id": "string", "total_events": "int", "avg_session_duration": "float"},
            quality_score=8.5,
            usage_frequency="daily",
            business_value="high",
            created_at=datetime.utcnow(),
            updated_at=datetime.utcnow()
        ),
        DataAsset(
            id="asset-002",
            name="IoT设备监控仪表板",
            description="实时IoT设备状态监控数据",
            owner="运维团队",
            tags=["IoT", "监控", "实时"],
            schema={"device_id": "string", "status": "string", "last_update": "datetime"},
            quality_score=9.2,
            usage_frequency="real-time",
            business_value="critical",
            created_at=datetime.utcnow(),
            updated_at=datetime.utcnow()
        )
    ]
    
    for asset in assets:
        analyzer.add_data_asset(asset)
    
    # 生成架构报告
    report = analyzer.generate_architecture_report()
    
    print("数据架构分析报告")
    print("=" * 50)
    print(f"数据源总数: {report['summary']['total_data_sources']}")
    print(f"数据管道总数: {report['summary']['total_pipelines']}")
    print(f"数据资产总数: {report['summary']['total_assets']}")
    
    print("\n数据量分析:")
    volume = report['volume_analysis']
    print(f"  每日数据量: {volume['total_volume_gb_per_day']:.2f} GB")
    print(f"  年度预计数据量: {volume['projected_yearly_volume_tb']:.2f} TB")
    
    print("\n数据速度分析:")
    velocity = report['velocity_analysis']
    print(f"  总记录速度: {velocity['total_velocity_records_per_second']:,} records/sec")
    print(f"  每日峰值记录: {velocity['peak_daily_records']:,} records")
    
    print("\n架构建议:")
    recommendations = report['recommendations']
    for category, items in recommendations.items():
        if items:
            print(f"  {category}:")
            for item in items:
                if isinstance(item, dict):
                    print(f"    - {item.get('type', item.get('pattern', 'N/A'))}: {item.get('reason', item.get('description', 'N/A'))}")
    
    print("\n成本估算:")
    costs = report['cost_estimation']
    print(f"  月度总成本: ${costs['monthly_costs']['total']:.2f}")
    print(f"  年度预计成本: ${costs['yearly_projection']:.2f}")
    print(f"  存储成本占比: {costs['cost_breakdown']['storage_percentage']:.1f}%")
    print(f"  处理成本占比: {costs['cost_breakdown']['processing_percentage']:.1f}%")

if __name__ == "__main__":
    data_architecture_example()

数据湖架构设计

分层数据湖架构

数据湖采用分层架构,确保数据的有序管理和高效访问:

import boto3
import json
import yaml
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
import os

class DataLakeLayer(Enum):
    """数据湖层级"""
    RAW = "raw"           # 原始数据层
    BRONZE = "bronze"     # 青铜层(清洗后)
    SILVER = "silver"     # 银层(结构化)
    GOLD = "gold"         # 金层(业务就绪)

class DataFormat(Enum):
    """数据格式"""
    PARQUET = "parquet"
    DELTA = "delta"
    ICEBERG = "iceberg"
    JSON = "json"
    CSV = "csv"
    AVRO = "avro"

@dataclass
class DataLakeConfig:
    """数据湖配置"""
    bucket_name: str
    region: str
    encryption_key: str
    lifecycle_policies: Dict[str, Any]
    access_policies: Dict[str, Any]
    metadata_catalog: str

class DataLakeManager:
    """数据湖管理器"""
    
    def __init__(self, config: DataLakeConfig):
        self.config = config
        self.s3 = boto3.client('s3', region_name=config.region)
        self.glue = boto3.client('glue', region_name=config.region)
        self.lakeformation = boto3.client('lakeformation', region_name=config.region)
        
    def create_data_lake_structure(self) -> Dict[str, Any]:
        """创建数据湖结构"""
        bucket_name = self.config.bucket_name
        
        # 创建S3存储桶
        try:
            if self.config.region != 'us-east-1':
                self.s3.create_bucket(
                    Bucket=bucket_name,
                    CreateBucketConfiguration={'LocationConstraint': self.config.region}
                )
            else:
                self.s3.create_bucket(Bucket=bucket_name)
            
            # 启用版本控制
            self.s3.put_bucket_versioning(
                Bucket=bucket_name,
                VersioningConfiguration={'Status': 'Enabled'}
            )
            
            # 设置加密
            self.s3.put_bucket_encryption(
                Bucket=bucket_name,
                ServerSideEncryptionConfiguration={
                    'Rules': [{
                        'ApplyServerSideEncryptionByDefault': {
                            'SSEAlgorithm': 'aws:kms',
                            'KMSMasterKeyID': self.config.encryption_key
                        }
                    }]
                }
            )
            
            # 创建分层目录结构
            layers = [
                "raw/",
                "bronze/",
                "silver/",
                "gold/",
                "metadata/",
                "scripts/",
                "temp/"
            ]
            
            for layer in layers:
                self.s3.put_object(
                    Bucket=bucket_name,
                    Key=layer,
                    Body=b''
                )
            
            # 设置生命周期策略
            self._set_lifecycle_policies(bucket_name)
            
            # 设置访问策略
            self._set_bucket_policies(bucket_name)
            
            return {
                "status": "success",
                "bucket_name": bucket_name,
                "layers_created": layers,
                "encryption_enabled": True,
                "versioning_enabled": True
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    def _set_lifecycle_policies(self, bucket_name: str):
        """设置生命周期策略"""
        lifecycle_config = {
            'Rules': [
                {
                    'ID': 'RawDataTransition',
                    'Status': 'Enabled',
                    'Filter': {'Prefix': 'raw/'},
                    'Transitions': [
                        {
                            'Days': 30,
                            'StorageClass': 'STANDARD_IA'
                        },
                        {
                            'Days': 90,
                            'StorageClass': 'GLACIER'
                        },
                        {
                            'Days': 365,
                            'StorageClass': 'DEEP_ARCHIVE'
                        }
                    ]
                },
                {
                    'ID': 'TempDataCleanup',
                    'Status': 'Enabled',
                    'Filter': {'Prefix': 'temp/'},
                    'Expiration': {'Days': 7}
                },
                {
                    'ID': 'IncompleteMultipartUploads',
                    'Status': 'Enabled',
                    'Filter': {},
                    'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 1}
                }
            ]
        }
        
        self.s3.put_bucket_lifecycle_configuration(
            Bucket=bucket_name,
            LifecycleConfiguration=lifecycle_config
        )
    
    def _set_bucket_policies(self, bucket_name: str):
        """设置存储桶策略"""
        policy = {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "DenyInsecureConnections",
                    "Effect": "Deny",
                    "Principal": "*",
                    "Action": "s3:*",
                    "Resource": [
                        f"arn:aws:s3:::{bucket_name}",
                        f"arn:aws:s3:::{bucket_name}/*"
                    ],
                    "Condition": {
                        "Bool": {
                            "aws:SecureTransport": "false"
                        }
                    }
                },
                {
                    "Sid": "RequireSSEKMS",
                    "Effect": "Deny",
                    "Principal": "*",
                    "Action": "s3:PutObject",
                    "Resource": f"arn:aws:s3:::{bucket_name}/*",
                    "Condition": {
                        "StringNotEquals": {
                            "s3:x-amz-server-side-encryption": "aws:kms"
                        }
                    }
                }
            ]
        }
        
        self.s3.put_bucket_policy(
            Bucket=bucket_name,
            Policy=json.dumps(policy)
        )
    
    def create_data_catalog(self, database_name: str) -> Dict[str, Any]:
        """创建数据目录"""
        try:
            # 创建Glue数据库
            self.glue.create_database(
                DatabaseInput={
                    'Name': database_name,
                    'Description': f'Data Lake catalog for {self.config.bucket_name}',
                    'LocationUri': f's3://{self.config.bucket_name}/',
                    'Parameters': {
                        'classification': 'data-lake',
                        'created_by': 'data-lake-manager',
                        'created_at': datetime.utcnow().isoformat()
                    }
                }
            )
            
            # 为每个层级创建表
            layers = ['raw', 'bronze', 'silver', 'gold']
            tables_created = []
            
            for layer in layers:
                table_name = f"{layer}_data"
                try:
                    self.glue.create_table(
                        DatabaseName=database_name,
                        TableInput={
                            'Name': table_name,
                            'Description': f'{layer.title()} layer data',
                            'StorageDescriptor': {
                                'Columns': [
                                    {'Name': 'partition_date', 'Type': 'string'},
                                    {'Name': 'data_source', 'Type': 'string'},
                                    {'Name': 'record_count', 'Type': 'bigint'}
                                ],
                                'Location': f's3://{self.config.bucket_name}/{layer}/',
                                'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
                                'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
                                'SerdeInfo': {
                                    'SerializationLibrary': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
                                }
                            },
                            'PartitionKeys': [
                                {'Name': 'year', 'Type': 'string'},
                                {'Name': 'month', 'Type': 'string'},
                                {'Name': 'day', 'Type': 'string'}
                            ],
                            'Parameters': {
                                'classification': 'parquet',
                                'layer': layer
                            }
                        }
                    )
                    tables_created.append(table_name)
                except Exception as e:
                    print(f"Warning: Could not create table {table_name}: {e}")
            
            return {
                "status": "success",
                "database_name": database_name,
                "tables_created": tables_created
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    def setup_lake_formation_permissions(self, database_name: str, 
                                       principals: List[str]) -> Dict[str, Any]:
        """设置Lake Formation权限"""
        try:
            permissions_granted = []
            
            for principal in principals:
                # 授予数据库权限
                self.lakeformation.grant_permissions(
                    Principal={'DataLakePrincipalIdentifier': principal},
                    Resource={'Database': {'Name': database_name}},
                    Permissions=['ALL'],
                    PermissionsWithGrantOption=['ALL']
                )
                
                # 授予表权限
                self.lakeformation.grant_permissions(
                    Principal={'DataLakePrincipalIdentifier': principal},
                    Resource={
                        'Table': {
                            'DatabaseName': database_name,
                            'TableWildcard': {}
                        }
                    },
                    Permissions=['ALL'],
                    PermissionsWithGrantOption=['ALL']
                )
                
                permissions_granted.append(principal)
            
            return {
                "status": "success",
                "permissions_granted": permissions_granted
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    def create_data_quality_framework(self) -> str:
        """创建数据质量框架"""
        dq_script = """
import boto3
import pandas as pd
import numpy as np
from typing import Dict, Any, List
import json
from datetime import datetime

class DataQualityChecker:
    def __init__(self, s3_bucket: str, glue_database: str):
        self.s3_bucket = s3_bucket
        self.glue_database = glue_database
        self.s3 = boto3.client('s3')
        self.athena = boto3.client('athena')
    
    def check_completeness(self, table_name: str, required_columns: List[str]) -> Dict[str, Any]:
        \"\"\"检查数据完整性\"\"\"
        query = f\"\"\"
        SELECT 
            {', '.join([f'COUNT({col}) as {col}_count' for col in required_columns])},
            COUNT(*) as total_rows
        FROM {self.glue_database}.{table_name}
        \"\"\"
        
        result = self._execute_athena_query(query)
        
        completeness_scores = {}
        if result and len(result) > 0:
            row = result[0]
            total_rows = row.get('total_rows', 0)
            
            for col in required_columns:
                col_count = row.get(f'{col}_count', 0)
                completeness_scores[col] = (col_count / total_rows) * 100 if total_rows > 0 else 0
        
        return {
            'table': table_name,
            'completeness_scores': completeness_scores,
            'overall_score': np.mean(list(completeness_scores.values())) if completeness_scores else 0,
            'timestamp': datetime.utcnow().isoformat()
        }
    
    def check_uniqueness(self, table_name: str, unique_columns: List[str]) -> Dict[str, Any]:
        \"\"\"检查数据唯一性\"\"\"
        uniqueness_scores = {}
        
        for col in unique_columns:
            query = f\"\"\"
            SELECT 
                COUNT(*) as total_rows,
                COUNT(DISTINCT {col}) as unique_values
            FROM {self.glue_database}.{table_name}
            \"\"\"
            
            result = self._execute_athena_query(query)
            
            if result and len(result) > 0:
                row = result[0]
                total_rows = row.get('total_rows', 0)
                unique_values = row.get('unique_values', 0)
                uniqueness_scores[col] = (unique_values / total_rows) * 100 if total_rows > 0 else 0
        
        return {
            'table': table_name,
            'uniqueness_scores': uniqueness_scores,
            'overall_score': np.mean(list(uniqueness_scores.values())) if uniqueness_scores else 0,
            'timestamp': datetime.utcnow().isoformat()
        }
    
    def check_validity(self, table_name: str, validation_rules: Dict[str, str]) -> Dict[str, Any]:
        \"\"\"检查数据有效性\"\"\"
        validity_scores = {}
        
        for col, rule in validation_rules.items():
            query = f\"\"\"
            SELECT 
                COUNT(*) as total_rows,
                COUNT(CASE WHEN {rule} THEN 1 END) as valid_rows
            FROM {self.glue_database}.{table_name}
            \"\"\"
            
            result = self._execute_athena_query(query)
            
            if result and len(result) > 0:
                row = result[0]
                total_rows = row.get('total_rows', 0)
                valid_rows = row.get('valid_rows', 0)
                validity_scores[col] = (valid_rows / total_rows) * 100 if total_rows > 0 else 0
        
        return {
            'table': table_name,
            'validity_scores': validity_scores,
            'overall_score': np.mean(list(validity_scores.values())) if validity_scores else 0,
            'timestamp': datetime.utcnow().isoformat()
        }
    
    def _execute_athena_query(self, query: str) -> List[Dict[str, Any]]:
        \"\"\"执行Athena查询\"\"\"
        # 这里应该实现实际的Athena查询逻辑
        # 为了示例,返回模拟数据
        return [{'total_rows': 1000, 'unique_values': 950}]
    
    def generate_quality_report(self, table_name: str, 
                              required_columns: List[str],
                              unique_columns: List[str],
                              validation_rules: Dict[str, str]) -> Dict[str, Any]:
        \"\"\"生成数据质量报告\"\"\"
        completeness = self.check_completeness(table_name, required_columns)
        uniqueness = self.check_uniqueness(table_name, unique_columns)
        validity = self.check_validity(table_name, validation_rules)
        
        overall_score = np.mean([
            completeness['overall_score'],
            uniqueness['overall_score'],
            validity['overall_score']
        ])
        
        return {
            'table': table_name,
            'overall_quality_score': overall_score,
            'completeness': completeness,
            'uniqueness': uniqueness,
            'validity': validity,
            'quality_grade': self._get_quality_grade(overall_score),
            'timestamp': datetime.utcnow().isoformat()
        }
    
    def _get_quality_grade(self, score: float) -> str:
        \"\"\"获取质量等级\"\"\"
        if score >= 95:
            return 'A'
        elif score >= 85:
            return 'B'
        elif score >= 75:
            return 'C'
        elif score >= 65:
            return 'D'
        else:
            return 'F'
"""
        
        # 将脚本保存到S3
        script_key = "scripts/data_quality_checker.py"
        self.s3.put_object(
            Bucket=self.config.bucket_name,
            Key=script_key,
            Body=dq_script.encode('utf-8'),
            ContentType='text/x-python'
        )
        
        return f"s3://{self.config.bucket_name}/{script_key}"

# 使用示例
def data_lake_example():
    """数据湖示例"""
    config = DataLakeConfig(
        bucket_name="my-enterprise-data-lake",
        region="us-west-2",
        encryption_key="arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012",
        lifecycle_policies={},
        access_policies={},
        metadata_catalog="glue"
    )
    
    manager = DataLakeManager(config)
    
    # 创建数据湖结构
    print("创建数据湖结构...")
    structure_result = manager.create_data_lake_structure()
    print(f"结果: {structure_result}")
    
    # 创建数据目录
    print("\n创建数据目录...")
    catalog_result = manager.create_data_catalog("enterprise_data_lake")
    print(f"结果: {catalog_result}")
    
    # 设置权限
    print("\n设置Lake Formation权限...")
    principals = [
        "arn:aws:iam::123456789012:role/DataAnalystRole",
        "arn:aws:iam::123456789012:role/DataScientistRole"
    ]
    permissions_result = manager.setup_lake_formation_permissions("enterprise_data_lake", principals)
    print(f"结果: {permissions_result}")
    
    # 创建数据质量框架
    print("\n创建数据质量框架...")
    dq_script_location = manager.create_data_quality_framework()
    print(f"数据质量脚本位置: {dq_script_location}")

if __name__ == "__main__":
    data_lake_example()

数据仓库与数据集市

现代数据仓库架构

import boto3
import json
import psycopg2
from typing import Dict, Any, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
import pandas as pd
import numpy as np

class DimensionType(Enum):
    """维度类型"""
    SCD_TYPE_1 = "scd_type_1"  # 覆盖更新
    SCD_TYPE_2 = "scd_type_2"  # 历史追踪
    SCD_TYPE_3 = "scd_type_3"  # 部分历史

class FactType(Enum):
    """事实表类型"""
    TRANSACTION = "transaction"      # 事务事实表
    SNAPSHOT = "snapshot"           # 快照事实表
    ACCUMULATING = "accumulating"   # 累积快照事实表

@dataclass
class DimensionTable:
    """维度表定义"""
    name: str
    type: DimensionType
    business_key: str
    attributes: List[str]
    source_system: str
    update_frequency: str
    retention_days: int

@dataclass
class FactTable:
    """事实表定义"""
    name: str
    type: FactType
    grain: str
    dimensions: List[str]
    measures: List[str]
    source_system: str
    update_frequency: str
    partition_key: str

class DataWarehouseManager:
    """数据仓库管理器"""
    
    def __init__(self, cluster_identifier: str, region: str = "us-east-1"):
        self.cluster_identifier = cluster_identifier
        self.region = region
        self.redshift = boto3.client('redshift', region_name=region)
        self.s3 = boto3.client('s3', region_name=region)
        self.connection = None
    
    def create_redshift_cluster(self, config: Dict[str, Any]) -> Dict[str, Any]:
        """创建Redshift集群"""
        try:
            response = self.redshift.create_cluster(
                ClusterIdentifier=self.cluster_identifier,
                NodeType=config.get('node_type', 'dc2.large'),
                MasterUsername=config['master_username'],
                MasterUserPassword=config['master_password'],
                DBName=config.get('db_name', 'datawarehouse'),
                ClusterType=config.get('cluster_type', 'multi-node'),
                NumberOfNodes=config.get('number_of_nodes', 2),
                VpcSecurityGroupIds=config.get('vpc_security_group_ids', []),
                ClusterSubnetGroupName=config.get('cluster_subnet_group_name'),
                PubliclyAccessible=config.get('publicly_accessible', False),
                Encrypted=config.get('encrypted', True),
                KmsKeyId=config.get('kms_key_id'),
                EnhancedVpcRouting=config.get('enhanced_vpc_routing', True),
                ClusterParameterGroupName=config.get('parameter_group_name'),
                Tags=[
                    {'Key': 'Environment', 'Value': config.get('environment', 'production')},
                    {'Key': 'Purpose', 'Value': 'DataWarehouse'},
                    {'Key': 'CreatedBy', 'Value': 'DataWarehouseManager'}
                ]
            )
            
            return {
                "status": "success",
                "cluster_identifier": self.cluster_identifier,
                "endpoint": response['Cluster']['Endpoint']['Address'],
                "port": response['Cluster']['Endpoint']['Port']
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    def connect_to_cluster(self, host: str, port: int, database: str, 
                          username: str, password: str):
        """连接到Redshift集群"""
        try:
            self.connection = psycopg2.connect(
                host=host,
                port=port,
                database=database,
                user=username,
                password=password
            )
            return True
        except Exception as e:
            print(f"连接失败: {e}")
            return False
    
    def create_schema_structure(self) -> Dict[str, Any]:
        """创建模式结构"""
        if not self.connection:
            return {"status": "error", "error": "No database connection"}
        
        schemas = [
            "staging",      # 临时数据区
            "ods",         # 操作数据存储
            "dw",          # 数据仓库核心层
            "dm",          # 数据集市
            "metadata",    # 元数据管理
            "audit"        # 审计日志
        ]
        
        created_schemas = []
        cursor = self.connection.cursor()
        
        try:
            for schema in schemas:
                cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}")
                created_schemas.append(schema)
            
            self.connection.commit()
            
            return {
                "status": "success",
                "schemas_created": created_schemas
            }
            
        except Exception as e:
            self.connection.rollback()
            return {
                "status": "error",
                "error": str(e)
            }
        finally:
            cursor.close()
    
    def create_dimension_table(self, dimension: DimensionTable) -> str:
        """创建维度表"""
        if dimension.type == DimensionType.SCD_TYPE_2:
            return self._create_scd_type2_dimension(dimension)
        else:
            return self._create_standard_dimension(dimension)
    
    def _create_scd_type2_dimension(self, dimension: DimensionTable) -> str:
        """创建SCD Type 2维度表"""
        attributes_sql = ",\n    ".join([f"{attr} VARCHAR(255)" for attr in dimension.attributes])
        
        sql = f"""
        CREATE TABLE IF NOT EXISTS dw.{dimension.name} (
            {dimension.name}_sk BIGINT IDENTITY(1,1) PRIMARY KEY,
            {dimension.business_key} VARCHAR(255) NOT NULL,
            {attributes_sql},
            effective_date DATE NOT NULL,
            expiry_date DATE,
            is_current BOOLEAN DEFAULT TRUE,
            created_at TIMESTAMP DEFAULT GETDATE(),
            updated_at TIMESTAMP DEFAULT GETDATE(),
            source_system VARCHAR(100) DEFAULT '{dimension.source_system}'
        )
        DISTSTYLE KEY
        DISTKEY ({dimension.business_key})
        SORTKEY (effective_date, {dimension.business_key});
        
        -- 创建索引
        CREATE INDEX IF NOT EXISTS idx_{dimension.name}_business_key 
        ON dw.{dimension.name} ({dimension.business_key});
        
        CREATE INDEX IF NOT EXISTS idx_{dimension.name}_current 
        ON dw.{dimension.name} (is_current) WHERE is_current = TRUE;
        """
        
        return sql
    
    def _create_standard_dimension(self, dimension: DimensionTable) -> str:
        """创建标准维度表"""
        attributes_sql = ",\n    ".join([f"{attr} VARCHAR(255)" for attr in dimension.attributes])
        
        sql = f"""
        CREATE TABLE IF NOT EXISTS dw.{dimension.name} (
            {dimension.name}_sk BIGINT IDENTITY(1,1) PRIMARY KEY,
            {dimension.business_key} VARCHAR(255) UNIQUE NOT NULL,
            {attributes_sql},
            created_at TIMESTAMP DEFAULT GETDATE(),
            updated_at TIMESTAMP DEFAULT GETDATE(),
            source_system VARCHAR(100) DEFAULT '{dimension.source_system}'
        )
        DISTSTYLE KEY
        DISTKEY ({dimension.business_key})
        SORTKEY ({dimension.business_key});
        """
        
        return sql
    
    def create_fact_table(self, fact: FactTable) -> str:
        """创建事实表"""
        # 维度外键
        dimension_fks = ",\n    ".join([f"{dim}_sk BIGINT NOT NULL" for dim in fact.dimensions])
        
        # 度量字段
        measures_sql = ",\n    ".join([f"{measure} DECIMAL(18,4)" for measure in fact.measures])
        
        # 外键约束
        foreign_keys = "\n".join([
            f"ALTER TABLE dw.{fact.name} ADD CONSTRAINT fk_{fact.name}_{dim} "
            f"FOREIGN KEY ({dim}_sk) REFERENCES dw.{dim}({dim}_sk);"
            for dim in fact.dimensions
        ])
        
        sql = f"""
        CREATE TABLE IF NOT EXISTS dw.{fact.name} (
            {fact.name}_sk BIGINT IDENTITY(1,1) PRIMARY KEY,
            {dimension_fks},
            {measures_sql},
            {fact.partition_key} DATE NOT NULL,
            created_at TIMESTAMP DEFAULT GETDATE(),
            source_system VARCHAR(100)
        )
        DISTSTYLE KEY
        DISTKEY ({fact.dimensions[0]}_sk)
        SORTKEY ({fact.partition_key});
        
        {foreign_keys}
        """
        
        return sql
    
    def create_etl_procedures(self) -> Dict[str, str]:
        """创建ETL存储过程"""
        procedures = {}
        
        # 维度表加载过程(SCD Type 2)
        procedures["load_scd_type2_dimension"] = """
        CREATE OR REPLACE PROCEDURE dw.load_scd_type2_dimension(
            p_table_name VARCHAR(255),
            p_source_query VARCHAR(65535)
        )
        AS $$
        BEGIN
            -- 创建临时表
            EXECUTE 'CREATE TEMP TABLE temp_' || p_table_name || ' AS ' || p_source_query;
            
            -- 更新过期记录
            EXECUTE '
            UPDATE dw.' || p_table_name || ' 
            SET expiry_date = CURRENT_DATE - 1, 
                is_current = FALSE,
                updated_at = GETDATE()
            WHERE business_key IN (
                SELECT business_key FROM temp_' || p_table_name || '
                WHERE business_key IN (
                    SELECT business_key FROM dw.' || p_table_name || ' WHERE is_current = TRUE
                )
            ) AND is_current = TRUE';
            
            -- 插入新记录
            EXECUTE '
            INSERT INTO dw.' || p_table_name || ' 
            SELECT * FROM temp_' || p_table_name;
            
            -- 清理临时表
            EXECUTE 'DROP TABLE temp_' || p_table_name;
            
        END;
        $$ LANGUAGE plpgsql;
        """
        
        # 事实表增量加载过程
        procedures["load_fact_incremental"] = """
        CREATE OR REPLACE PROCEDURE dw.load_fact_incremental(
            p_fact_table VARCHAR(255),
            p_source_query VARCHAR(65535),
            p_partition_date DATE
        )
        AS $$
        BEGIN
            -- 删除当日数据(重新加载)
            EXECUTE 'DELETE FROM dw.' || p_fact_table || 
                    ' WHERE partition_date = ''' || p_partition_date || '''';
            
            -- 插入新数据
            EXECUTE 'INSERT INTO dw.' || p_fact_table || ' ' || p_source_query;
            
            -- 更新统计信息
            EXECUTE 'ANALYZE dw.' || p_fact_table;
            
        END;
        $$ LANGUAGE plpgsql;
        """
        
        # 数据质量检查过程
        procedures["data_quality_check"] = """
        CREATE OR REPLACE PROCEDURE audit.data_quality_check(
            p_table_name VARCHAR(255),
            p_check_date DATE
        )
        AS $$
        DECLARE
            v_row_count BIGINT;
            v_null_count BIGINT;
            v_duplicate_count BIGINT;
        BEGIN
            -- 行数检查
            EXECUTE 'SELECT COUNT(*) FROM ' || p_table_name 
                    || ' WHERE partition_date = ''' || p_check_date || ''''
            INTO v_row_count;
            
            -- 空值检查
            EXECUTE 'SELECT COUNT(*) FROM ' || p_table_name 
                    || ' WHERE partition_date = ''' || p_check_date || ''' AND business_key IS NULL'
            INTO v_null_count;
            
            -- 重复值检查
            EXECUTE 'SELECT COUNT(*) - COUNT(DISTINCT business_key) FROM ' || p_table_name 
                    || ' WHERE partition_date = ''' || p_check_date || ''''
            INTO v_duplicate_count;
            
            -- 记录检查结果
            INSERT INTO audit.data_quality_log (
                table_name, check_date, row_count, null_count, duplicate_count, check_timestamp
            ) VALUES (
                p_table_name, p_check_date, v_row_count, v_null_count, v_duplicate_count, GETDATE()
            );
            
        END;
        $$ LANGUAGE plpgsql;
        """
        
        return procedures
    
    def create_data_marts(self, business_areas: List[str]) -> Dict[str, List[str]]:
        """创建数据集市"""
        data_marts = {}
        
        for area in business_areas:
            mart_tables = []
            
            # 销售数据集市示例
            if area == "sales":
                mart_tables = [
                    f"""
                    CREATE TABLE dm.sales_summary AS
                    SELECT 
                        d.date_key,
                        p.product_category,
                        c.customer_segment,
                        g.geography_region,
                        SUM(f.sales_amount) as total_sales,
                        SUM(f.quantity) as total_quantity,
                        COUNT(DISTINCT f.order_id) as order_count,
                        AVG(f.sales_amount) as avg_order_value
                    FROM dw.fact_sales f
                    JOIN dw.dim_date d ON f.date_sk = d.date_sk
                    JOIN dw.dim_product p ON f.product_sk = p.product_sk
                    JOIN dw.dim_customer c ON f.customer_sk = c.customer_sk
                    JOIN dw.dim_geography g ON f.geography_sk = g.geography_sk
                    GROUP BY d.date_key, p.product_category, c.customer_segment, g.geography_region;
                    """,
                    f"""
                    CREATE TABLE dm.sales_trends AS
                    SELECT 
                        date_key,
                        product_category,
                        total_sales,
                        LAG(total_sales, 1) OVER (PARTITION BY product_category ORDER BY date_key) as prev_sales,
                        (total_sales - LAG(total_sales, 1) OVER (PARTITION BY product_category ORDER BY date_key)) / 
                        LAG(total_sales, 1) OVER (PARTITION BY product_category ORDER BY date_key) * 100 as growth_rate
                    FROM dm.sales_summary;
                    """
                ]
            
            # 客户数据集市示例
            elif area == "customer":
                mart_tables = [
                    f"""
                    CREATE TABLE dm.customer_360 AS
                    SELECT 
                        c.customer_id,
                        c.customer_name,
                        c.customer_segment,
                        c.registration_date,
                        COUNT(DISTINCT f.order_id) as total_orders,
                        SUM(f.sales_amount) as lifetime_value,
                        AVG(f.sales_amount) as avg_order_value,
                        MAX(f.order_date) as last_order_date,
                        DATEDIFF(day, MAX(f.order_date), CURRENT_DATE) as days_since_last_order
                    FROM dw.dim_customer c
                    LEFT JOIN dw.fact_sales f ON c.customer_sk = f.customer_sk
                    GROUP BY c.customer_id, c.customer_name, c.customer_segment, c.registration_date;
                    """
                ]
            
            data_marts[area] = mart_tables
        
        return data_marts
    
    def setup_monitoring_and_alerts(self) -> Dict[str, str]:
        """设置监控和告警"""
        monitoring_queries = {
            "cluster_performance": """
            SELECT 
                query_id,
                userid,
                query,
                starttime,
                endtime,
                DATEDIFF(seconds, starttime, endtime) as duration_seconds,
                aborted
            FROM stl_query 
            WHERE starttime >= DATEADD(hour, -1, GETDATE())
            AND duration_seconds > 300
            ORDER BY duration_seconds DESC;
            """,
            
            "storage_usage": """
            SELECT 
                schema_name,
                table_name,
                size_mb,
                pct_used
            FROM svv_table_info
            WHERE size_mb > 1000
            ORDER BY size_mb DESC;
            """,
            
            "data_freshness": """
            SELECT 
                schemaname,
                tablename,
                MAX(created_at) as last_update,
                DATEDIFF(hour, MAX(created_at), GETDATE()) as hours_since_update
            FROM information_schema.tables t
            JOIN pg_stat_user_tables s ON t.table_name = s.relname
            WHERE schemaname IN ('dw', 'dm')
            GROUP BY schemaname, tablename
            HAVING hours_since_update > 24;
            """
        }
        
        return monitoring_queries

# 使用示例
def data_warehouse_example():
    """数据仓库示例"""
    manager = DataWarehouseManager("my-dw-cluster")
    
    # 创建集群配置
    cluster_config = {
        "node_type": "dc2.large",
        "master_username": "dwadmin",
        "master_password": "SecurePassword123!",
        "db_name": "datawarehouse",
        "cluster_type": "multi-node",
        "number_of_nodes": 3,
        "encrypted": True,
        "environment": "production"
    }
    
    # 创建Redshift集群
    print("创建Redshift集群...")
    cluster_result = manager.create_redshift_cluster(cluster_config)
    print(f"结果: {cluster_result}")
    
    # 创建模式结构
    print("\n创建模式结构...")
    schema_result = manager.create_schema_structure()
    print(f"结果: {schema_result}")
    
    # 定义维度表
    customer_dim = DimensionTable(
        name="dim_customer",
        type=DimensionType.SCD_TYPE_2,
        business_key="customer_id",
        attributes=["customer_name", "email", "phone", "address", "customer_segment"],
        source_system="CRM",
        update_frequency="daily",
        retention_days=2555  # 7年
    )
    
    product_dim = DimensionTable(
        name="dim_product",
        type=DimensionType.SCD_TYPE_2,
        business_key="product_id",
        attributes=["product_name", "category", "brand", "price", "status"],
        source_system="ERP",
        update_frequency="daily",
        retention_days=2555
    )
    
    # 定义事实表
    sales_fact = FactTable(
        name="fact_sales",
        type=FactType.TRANSACTION,
        grain="每个销售交易",
        dimensions=["dim_customer", "dim_product", "dim_date", "dim_geography"],
        measures=["sales_amount", "quantity", "discount_amount", "tax_amount"],
        source_system="POS",
        update_frequency="hourly",
        partition_key="order_date"
    )
    
    # 创建表结构
    print("\n创建维度表...")
    customer_sql = manager.create_dimension_table(customer_dim)
    product_sql = manager.create_dimension_table(product_dim)
    
    print("\n创建事实表...")
    sales_sql = manager.create_fact_table(sales_fact)
    
    # 创建ETL过程
    print("\n创建ETL存储过程...")
    procedures = manager.create_etl_procedures()
    
    # 创建数据集市
    print("\n创建数据集市...")
    marts = manager.create_data_marts(["sales", "customer"])
    
    # 设置监控
    print("\n设置监控查询...")
    monitoring = manager.setup_monitoring_and_alerts()
    
    print("\n数据仓库设置完成!")

if __name__ == "__main__":
    data_warehouse_example()

实时数据处理

流处理架构

现代企业需要实时处理和分析数据流,以支持实时决策和响应。以下是一个完整的流处理架构实现:

import json
import time
import asyncio
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
import boto3
from abc import ABC, abstractmethod
import logging
import uuid

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("stream-processing")

class StreamType(Enum):
    """流类型"""
    KINESIS = "kinesis"
    KAFKA = "kafka"
    PULSAR = "pulsar"
    EVENTHUB = "eventhub"

class ProcessingMode(Enum):
    """处理模式"""
    AT_LEAST_ONCE = "at_least_once"
    AT_MOST_ONCE = "at_most_once"
    EXACTLY_ONCE = "exactly_once"

@dataclass
class StreamEvent:
    """流事件"""
    event_id: str
    event_type: str
    timestamp: datetime
    data: Dict[str, Any]
    source: str
    partition_key: str

@dataclass
class ProcessingWindow:
    """处理窗口"""
    window_type: str  # tumbling, sliding, session
    size_seconds: int
    slide_seconds: Optional[int] = None
    gap_seconds: Optional[int] = None

class StreamProcessor(ABC):
    """流处理器抽象基类"""
    
    @abstractmethod
    async def process_event(self, event: StreamEvent) -> Optional[Dict[str, Any]]:
        """处理单个事件"""
        pass
    
    @abstractmethod
    async def process_window(self, events: List[StreamEvent], 
                           window: ProcessingWindow) -> Optional[Dict[str, Any]]:
        """处理窗口事件"""
        pass

class RealTimeAnalyticsProcessor(StreamProcessor):
    """实时分析处理器"""
    
    def __init__(self):
        self.event_counts = {}
        self.metrics_cache = {}
    
    async def process_event(self, event: StreamEvent) -> Optional[Dict[str, Any]]:
        """处理单个事件"""
        event_type = event.event_type
        
        # 更新事件计数
        if event_type not in self.event_counts:
            self.event_counts[event_type] = 0
        self.event_counts[event_type] += 1
        
        # 实时指标计算
        metrics = {
            "event_id": event.event_id,
            "event_type": event_type,
            "timestamp": event.timestamp.isoformat(),
            "total_count": self.event_counts[event_type],
            "processing_latency_ms": (datetime.utcnow() - event.timestamp).total_seconds() * 1000
        }
        
        # 异常检测
        if "error" in event.data or "exception" in event.data:
            metrics["alert"] = {
                "type": "error_detected",
                "severity": "high",
                "message": f"Error detected in {event_type} event"
            }
        
        return metrics
    
    async def process_window(self, events: List[StreamEvent], 
                           window: ProcessingWindow) -> Optional[Dict[str, Any]]:
        """处理窗口事件"""
        if not events:
            return None
        
        window_start = min(event.timestamp for event in events)
        window_end = max(event.timestamp for event in events)
        
        # 按事件类型分组
        events_by_type = {}
        for event in events:
            event_type = event.event_type
            if event_type not in events_by_type:
                events_by_type[event_type] = []
            events_by_type[event_type].append(event)
        
        # 计算窗口指标
        window_metrics = {
            "window_id": str(uuid.uuid4()),
            "window_start": window_start.isoformat(),
            "window_end": window_end.isoformat(),
            "window_duration_seconds": (window_end - window_start).total_seconds(),
            "total_events": len(events),
            "events_by_type": {k: len(v) for k, v in events_by_type.items()},
            "events_per_second": len(events) / window.size_seconds
        }
        
        # 计算业务指标
        if "user_action" in events_by_type:
            user_actions = events_by_type["user_action"]
            unique_users = len(set(event.data.get("user_id") for event in user_actions))
            window_metrics["unique_users"] = unique_users
            window_metrics["actions_per_user"] = len(user_actions) / unique_users if unique_users > 0 else 0
        
        return window_metrics

class StreamingPlatform:
    """流处理平台"""
    
    def __init__(self, platform_type: StreamType, config: Dict[str, Any]):
        self.platform_type = platform_type
        self.config = config
        self.processors: List[StreamProcessor] = []
        self.running = False
        
        # 初始化平台客户端
        if platform_type == StreamType.KINESIS:
            self.kinesis = boto3.client('kinesis', region_name=config.get('region', 'us-east-1'))
            self.firehose = boto3.client('firehose', region_name=config.get('region', 'us-east-1'))
    
    def add_processor(self, processor: StreamProcessor):
        """添加处理器"""
        self.processors.append(processor)
        logger.info(f"Added processor: {processor.__class__.__name__}")
    
    async def create_kinesis_stream(self, stream_name: str, shard_count: int = 1) -> Dict[str, Any]:
        """创建Kinesis流"""
        try:
            response = self.kinesis.create_stream(
                StreamName=stream_name,
                ShardCount=shard_count
            )
            
            # 等待流变为活跃状态
            waiter = self.kinesis.get_waiter('stream_exists')
            waiter.wait(StreamName=stream_name)
            
            return {
                "status": "success",
                "stream_name": stream_name,
                "shard_count": shard_count
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    async def create_delivery_stream(self, delivery_stream_name: str, 
                                   destination_config: Dict[str, Any]) -> Dict[str, Any]:
        """创建Kinesis Firehose投递流"""
        try:
            # S3目标配置
            if destination_config["type"] == "s3":
                s3_config = {
                    'RoleARN': destination_config['role_arn'],
                    'BucketARN': destination_config['bucket_arn'],
                    'Prefix': destination_config.get('prefix', 'data/'),
                    'ErrorOutputPrefix': destination_config.get('error_prefix', 'errors/'),
                    'BufferingHints': {
                        'SizeInMBs': destination_config.get('buffer_size_mb', 5),
                        'IntervalInSeconds': destination_config.get('buffer_interval_seconds', 300)
                    },
                    'CompressionFormat': destination_config.get('compression', 'GZIP'),
                    'DataFormatConversionConfiguration': {
                        'Enabled': True,
                        'OutputFormatConfiguration': {
                            'Serializer': {
                                'ParquetSerDe': {}
                            }
                        }
                    }
                }
                
                response = self.firehose.create_delivery_stream(
                    DeliveryStreamName=delivery_stream_name,
                    DeliveryStreamType='DirectPut',
                    ExtendedS3DestinationConfiguration=s3_config
                )
            
            return {
                "status": "success",
                "delivery_stream_name": delivery_stream_name,
                "delivery_stream_arn": response['DeliveryStreamARN']
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    async def publish_event(self, stream_name: str, event: StreamEvent) -> Dict[str, Any]:
        """发布事件到流"""
        try:
            if self.platform_type == StreamType.KINESIS:
                response = self.kinesis.put_record(
                    StreamName=stream_name,
                    Data=json.dumps(asdict(event), default=str),
                    PartitionKey=event.partition_key
                )
                
                return {
                    "status": "success",
                    "shard_id": response['ShardId'],
                    "sequence_number": response['SequenceNumber']
                }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    async def consume_stream(self, stream_name: str, shard_iterator_type: str = 'LATEST'):
        """消费流数据"""
        if self.platform_type != StreamType.KINESIS:
            logger.error("Only Kinesis consumption is implemented")
            return
        
        try:
            # 获取分片信息
            response = self.kinesis.describe_stream(StreamName=stream_name)
            shards = response['StreamDescription']['Shards']
            
            # 为每个分片创建消费任务
            tasks = []
            for shard in shards:
                task = asyncio.create_task(
                    self._consume_shard(stream_name, shard['ShardId'], shard_iterator_type)
                )
                tasks.append(task)
            
            # 等待所有任务完成
            await asyncio.gather(*tasks)
            
        except Exception as e:
            logger.error(f"Error consuming stream: {e}")
    
    async def _consume_shard(self, stream_name: str, shard_id: str, 
                           shard_iterator_type: str):
        """消费单个分片"""
        try:
            # 获取分片迭代器
            response = self.kinesis.get_shard_iterator(
                StreamName=stream_name,
                ShardId=shard_id,
                ShardIteratorType=shard_iterator_type
            )
            
            shard_iterator = response['ShardIterator']
            
            while self.running and shard_iterator:
                # 获取记录
                response = self.kinesis.get_records(
                    ShardIterator=shard_iterator,
                    Limit=100
                )
                
                records = response['Records']
                shard_iterator = response.get('NextShardIterator')
                
                # 处理记录
                for record in records:
                    try:
                        # 解析事件数据
                        event_data = json.loads(record['Data'])
                        event = StreamEvent(**event_data)
                        
                        # 使用所有处理器处理事件
                        for processor in self.processors:
                            result = await processor.process_event(event)
                            if result:
                                logger.info(f"Processed event {event.event_id}: {result}")
                    
                    except Exception as e:
                        logger.error(f"Error processing record: {e}")
                
                # 避免过于频繁的API调用
                if not records:
                    await asyncio.sleep(1)
                    
        except Exception as e:
            logger.error(f"Error consuming shard {shard_id}: {e}")
    
    async def start_processing(self, stream_name: str):
        """开始处理"""
        self.running = True
        logger.info(f"Starting stream processing for {stream_name}")
        await self.consume_stream(stream_name)
    
    def stop_processing(self):
        """停止处理"""
        self.running = False
        logger.info("Stopping stream processing")

class WindowManager:
    """窗口管理器"""
    
    def __init__(self, window_config: ProcessingWindow):
        self.window_config = window_config
        self.current_window_events: List[StreamEvent] = []
        self.window_start_time: Optional[datetime] = None
        self.processors: List[StreamProcessor] = []
    
    def add_processor(self, processor: StreamProcessor):
        """添加处理器"""
        self.processors.append(processor)
    
    async def add_event(self, event: StreamEvent):
        """添加事件到窗口"""
        current_time = datetime.utcnow()
        
        # 初始化窗口
        if self.window_start_time is None:
            self.window_start_time = current_time
        
        # 检查是否需要触发窗口处理
        window_duration = timedelta(seconds=self.window_config.size_seconds)
        
        if current_time - self.window_start_time >= window_duration:
            # 处理当前窗口
            await self._process_current_window()
            
            # 重置窗口
            if self.window_config.window_type == "tumbling":
                self.current_window_events = [event]
                self.window_start_time = current_time
            elif self.window_config.window_type == "sliding":
                # 滑动窗口:移除过期事件
                slide_duration = timedelta(seconds=self.window_config.slide_seconds or self.window_config.size_seconds)
                cutoff_time = current_time - slide_duration
                self.current_window_events = [
                    e for e in self.current_window_events 
                    if e.timestamp >= cutoff_time
                ]
                self.current_window_events.append(event)
                self.window_start_time = current_time - slide_duration
        else:
            self.current_window_events.append(event)
    
    async def _process_current_window(self):
        """处理当前窗口"""
        if not self.current_window_events:
            return
        
        for processor in self.processors:
            try:
                result = await processor.process_window(
                    self.current_window_events, 
                    self.window_config
                )
                if result:
                    logger.info(f"Window processed: {result}")
            except Exception as e:
                logger.error(f"Error processing window: {e}")

# 使用示例
async def stream_processing_example():
    """流处理示例"""
    # 配置流处理平台
    config = {
        "region": "us-east-1",
        "buffer_size": 1000,
        "batch_timeout": 5
    }
    
    platform = StreamingPlatform(StreamType.KINESIS, config)
    
    # 添加处理器
    analytics_processor = RealTimeAnalyticsProcessor()
    platform.add_processor(analytics_processor)
    
    # 创建流
    stream_name = "real-time-events"
    stream_result = await platform.create_kinesis_stream(stream_name, shard_count=2)
    print(f"Stream creation result: {stream_result}")
    
    # 创建投递流
    delivery_config = {
        "type": "s3",
        "role_arn": "arn:aws:iam::123456789012:role/firehose-delivery-role",
        "bucket_arn": "arn:aws:s3:::my-data-lake",
        "prefix": "real-time-data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/",
        "buffer_size_mb": 5,
        "buffer_interval_seconds": 300
    }
    
    delivery_result = await platform.create_delivery_stream(
        "real-time-delivery", 
        delivery_config
    )
    print(f"Delivery stream creation result: {delivery_result}")
    
    # 设置窗口处理
    window_config = ProcessingWindow(
        window_type="tumbling",
        size_seconds=60  # 1分钟窗口
    )
    
    window_manager = WindowManager(window_config)
    window_manager.add_processor(analytics_processor)
    
    # 模拟发布事件
    events = [
        StreamEvent(
            event_id=str(uuid.uuid4()),
            event_type="user_action",
            timestamp=datetime.utcnow(),
            data={"user_id": "user123", "action": "click", "page": "homepage"},
            source="web_app",
            partition_key="user123"
        ),
        StreamEvent(
            event_id=str(uuid.uuid4()),
            event_type="system_metric",
            timestamp=datetime.utcnow(),
            data={"cpu_usage": 75.5, "memory_usage": 60.2, "server": "web01"},
            source="monitoring",
            partition_key="web01"
        ),
        StreamEvent(
            event_id=str(uuid.uuid4()),
            event_type="error",
            timestamp=datetime.utcnow(),
            data={"error_code": "500", "message": "Internal server error", "service": "api"},
            source="application",
            partition_key="api"
        )
    ]
    
    # 发布事件
    for event in events:
        publish_result = await platform.publish_event(stream_name, event)
        print(f"Event published: {publish_result}")
        
        # 添加到窗口管理器
        await window_manager.add_event(event)
        
        # 模拟延迟
        await asyncio.sleep(0.1)
    
    print("Stream processing example completed")

if __name__ == "__main__":
    asyncio.run(stream_processing_example())

AI平台架构设计

机器学习平台

构建企业级机器学习平台需要考虑数据管道、模型训练、部署和监控的完整生命周期:

import json
import pickle
import joblib
import numpy as np
import pandas as pd
from typing import Dict, Any, List, Optional, Union, Tuple
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
import boto3
import mlflow
import mlflow.sklearn
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ml-platform")

class ModelType(Enum):
    """模型类型"""
    CLASSIFICATION = "classification"
    REGRESSION = "regression"
    CLUSTERING = "clustering"
    DEEP_LEARNING = "deep_learning"

class DeploymentTarget(Enum):
    """部署目标"""
    SAGEMAKER = "sagemaker"
    LAMBDA = "lambda"
    ECS = "ecs"
    KUBERNETES = "kubernetes"

@dataclass
class ModelMetadata:
    """模型元数据"""
    model_id: str
    model_name: str
    model_type: ModelType
    version: str
    created_at: datetime
    created_by: str
    description: str
    tags: Dict[str, str]
    metrics: Dict[str, float]
    parameters: Dict[str, Any]

@dataclass
class TrainingJob:
    """训练任务"""
    job_id: str
    model_name: str
    dataset_path: str
    algorithm: str
    hyperparameters: Dict[str, Any]
    instance_type: str
    instance_count: int
    max_runtime_seconds: int
    output_path: str
    status: str = "CREATED"

class MLPlatform:
    """机器学习平台"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.sagemaker = boto3.client('sagemaker', region_name=config.get('region', 'us-east-1'))
        self.s3 = boto3.client('s3', region_name=config.get('region', 'us-east-1'))
        self.lambda_client = boto3.client('lambda', region_name=config.get('region', 'us-east-1'))
        
        # 初始化MLflow
        mlflow.set_tracking_uri(config.get('mlflow_uri', 'http://localhost:5000'))
        
        self.models_registry = {}
        self.training_jobs = {}
    
    def create_training_job(self, job_config: TrainingJob) -> Dict[str, Any]:
        """创建训练任务"""
        try:
            # 准备训练任务配置
            training_job_config = {
                'TrainingJobName': job_config.job_id,
                'AlgorithmSpecification': {
                    'TrainingImage': self._get_algorithm_image(job_config.algorithm),
                    'TrainingInputMode': 'File'
                },
                'RoleArn': self.config['sagemaker_role'],
                'InputDataConfig': [
                    {
                        'ChannelName': 'training',
                        'DataSource': {
                            'S3DataSource': {
                                'S3DataType': 'S3Prefix',
                                'S3Uri': job_config.dataset_path,
                                'S3DataDistributionType': 'FullyReplicated'
                            }
                        },
                        'ContentType': 'text/csv',
                        'CompressionType': 'None'
                    }
                ],
                'OutputDataConfig': {
                    'S3OutputPath': job_config.output_path
                },
                'ResourceConfig': {
                    'InstanceType': job_config.instance_type,
                    'InstanceCount': job_config.instance_count,
                    'VolumeSizeInGB': 30
                },
                'StoppingCondition': {
                    'MaxRuntimeInSeconds': job_config.max_runtime_seconds
                },
                'HyperParameters': {k: str(v) for k, v in job_config.hyperparameters.items()}
            }
            
            # 启动训练任务
            response = self.sagemaker.create_training_job(**training_job_config)
            
            # 更新任务状态
            job_config.status = "IN_PROGRESS"
            self.training_jobs[job_config.job_id] = job_config
            
            return {
                "status": "success",
                "job_id": job_config.job_id,
                "training_job_arn": response['TrainingJobArn']
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    def _get_algorithm_image(self, algorithm: str) -> str:
        """获取算法镜像"""
        algorithm_images = {
            "xgboost": "246618743249.dkr.ecr.us-west-2.amazonaws.com/xgboost:latest",
            "sklearn": "246618743249.dkr.ecr.us-west-2.amazonaws.com/scikit-learn:0.23-1-cpu-py3",
            "tensorflow": "246618743249.dkr.ecr.us-west-2.amazonaws.com/tensorflow-training:2.4.1-cpu-py37",
            "pytorch": "246618743249.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:1.8.1-cpu-py36"
        }
        return algorithm_images.get(algorithm, algorithm_images["sklearn"])
    
    def get_training_job_status(self, job_id: str) -> Dict[str, Any]:
        """获取训练任务状态"""
        try:
            response = self.sagemaker.describe_training_job(TrainingJobName=job_id)
            
            status_info = {
                "job_id": job_id,
                "status": response['TrainingJobStatus'],
                "creation_time": response['CreationTime'],
                "training_start_time": response.get('TrainingStartTime'),
                "training_end_time": response.get('TrainingEndTime'),
                "model_artifacts": response.get('ModelArtifacts', {}).get('S3ModelArtifacts'),
                "final_metric_data_list": response.get('FinalMetricDataList', [])
            }
            
            # 更新本地状态
            if job_id in self.training_jobs:
                self.training_jobs[job_id].status = response['TrainingJobStatus']
            
            return status_info
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    def register_model(self, model_metadata: ModelMetadata, model_path: str) -> Dict[str, Any]:
        """注册模型"""
        try:
            # 使用MLflow注册模型
            with mlflow.start_run():
                # 记录模型参数和指标
                mlflow.log_params(model_metadata.parameters)
                mlflow.log_metrics(model_metadata.metrics)
                
                # 记录模型
                if model_metadata.model_type == ModelType.CLASSIFICATION:
                    # 假设是sklearn模型
                    model = joblib.load(model_path)
                    mlflow.sklearn.log_model(model, "model")
                
                # 注册模型版本
                model_uri = f"runs:/{mlflow.active_run().info.run_id}/model"
                mlflow.register_model(model_uri, model_metadata.model_name)
            
            # 存储到本地注册表
            self.models_registry[model_metadata.model_id] = model_metadata
            
            return {
                "status": "success",
                "model_id": model_metadata.model_id,
                "model_version": model_metadata.version
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    def create_model_endpoint(self, model_id: str, endpoint_config: Dict[str, Any]) -> Dict[str, Any]:
        """创建模型端点"""
        try:
            model_metadata = self.models_registry.get(model_id)
            if not model_metadata:
                return {"status": "error", "error": "Model not found"}
            
            # 创建SageMaker模型
            model_name = f"{model_metadata.model_name}-{model_metadata.version}"
            
            create_model_response = self.sagemaker.create_model(
                ModelName=model_name,
                PrimaryContainer={
                    'Image': endpoint_config['inference_image'],
                    'ModelDataUrl': endpoint_config['model_data_url'],
                    'Environment': endpoint_config.get('environment', {})
                },
                ExecutionRoleArn=self.config['sagemaker_role']
            )
            
            # 创建端点配置
            endpoint_config_name = f"{model_name}-config"
            create_endpoint_config_response = self.sagemaker.create_endpoint_config(
                EndpointConfigName=endpoint_config_name,
                ProductionVariants=[
                    {
                        'VariantName': 'primary',
                        'ModelName': model_name,
                        'InitialInstanceCount': endpoint_config.get('instance_count', 1),
                        'InstanceType': endpoint_config.get('instance_type', 'ml.t2.medium'),
                        'InitialVariantWeight': 1
                    }
                ]
            )
            
            # 创建端点
            endpoint_name = f"{model_name}-endpoint"
            create_endpoint_response = self.sagemaker.create_endpoint(
                EndpointName=endpoint_name,
                EndpointConfigName=endpoint_config_name
            )
            
            return {
                "status": "success",
                "endpoint_name": endpoint_name,
                "endpoint_arn": create_endpoint_response['EndpointArn']
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    def invoke_endpoint(self, endpoint_name: str, input_data: Any) -> Dict[str, Any]:
        """调用模型端点"""
        try:
            runtime = boto3.client('sagemaker-runtime', region_name=self.config.get('region', 'us-east-1'))
            
            # 准备输入数据
            if isinstance(input_data, (dict, list)):
                payload = json.dumps(input_data)
                content_type = 'application/json'
            else:
                payload = str(input_data)
                content_type = 'text/csv'
            
            # 调用端点
            response = runtime.invoke_endpoint(
                EndpointName=endpoint_name,
                ContentType=content_type,
                Body=payload
            )
            
            # 解析响应
            result = json.loads(response['Body'].read().decode())
            
            return {
                "status": "success",
                "prediction": result,
                "model_latency_ms": response['ResponseMetadata']['HTTPHeaders'].get('x-amzn-invoked-production-variant-latency', 0)
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    def create_batch_transform_job(self, job_config: Dict[str, Any]) -> Dict[str, Any]:
        """创建批量转换任务"""
        try:
            transform_job_config = {
                'TransformJobName': job_config['job_name'],
                'ModelName': job_config['model_name'],
                'TransformInput': {
                    'DataSource': {
                        'S3DataSource': {
                            'S3DataType': 'S3Prefix',
                            'S3Uri': job_config['input_data_path']
                        }
                    },
                    'ContentType': job_config.get('content_type', 'text/csv'),
                    'SplitType': job_config.get('split_type', 'Line')
                },
                'TransformOutput': {
                    'S3OutputPath': job_config['output_data_path'],
                    'Accept': job_config.get('accept', 'text/csv')
                },
                'TransformResources': {
                    'InstanceType': job_config.get('instance_type', 'ml.m4.xlarge'),
                    'InstanceCount': job_config.get('instance_count', 1)
                }
            }
            
            response = self.sagemaker.create_transform_job(**transform_job_config)
            
            return {
                "status": "success",
                "job_name": job_config['job_name'],
                "transform_job_arn": response['TransformJobArn']
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    def setup_model_monitoring(self, endpoint_name: str, monitoring_config: Dict[str, Any]) -> Dict[str, Any]:
        """设置模型监控"""
        try:
            # 创建数据质量监控
            monitoring_schedule_name = f"{endpoint_name}-monitoring"
            
            create_monitoring_schedule_config = {
                'MonitoringScheduleName': monitoring_schedule_name,
                'MonitoringScheduleConfig': {
                    'ScheduleConfig': {
                        'ScheduleExpression': monitoring_config.get('schedule', 'cron(0 * * * ? *)')  # 每小时
                    },
                    'MonitoringJobDefinition': {
                        'MonitoringInputs': [
                            {
                                'EndpointInput': {
                                    'EndpointName': endpoint_name,
                                    'LocalPath': '/opt/ml/processing/input/endpoint'
                                }
                            }
                        ],
                        'MonitoringOutputConfig': {
                            'MonitoringOutputs': [
                                {
                                    'S3Output': {
                                        'S3Uri': monitoring_config['output_s3_uri'],
                                        'LocalPath': '/opt/ml/processing/output'
                                    }
                                }
                            ]
                        },
                        'MonitoringResources': {
                            'ClusterConfig': {
                                'InstanceCount': 1,
                                'InstanceType': 'ml.t3.medium',
                                'VolumeSizeInGB': 20
                            }
                        },
                        'MonitoringAppSpecification': {
                            'ImageUri': monitoring_config['monitoring_image'],
                            'RecordPreprocessorSourceUri': monitoring_config.get('preprocessor_uri'),
                            'PostAnalyticsProcessorSourceUri': monitoring_config.get('postprocessor_uri')
                        },
                        'RoleArn': self.config['sagemaker_role']
                    }
                }
            }
            
            response = self.sagemaker.create_monitoring_schedule(**create_monitoring_schedule_config)
            
            return {
                "status": "success",
                "monitoring_schedule_name": monitoring_schedule_name,
                "monitoring_schedule_arn": response['MonitoringScheduleArn']
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }

class AutoMLPipeline:
    """自动机器学习管道"""
    
    def __init__(self, ml_platform: MLPlatform):
        self.ml_platform = ml_platform
        self.experiments = {}
    
    def create_automl_experiment(self, experiment_config: Dict[str, Any]) -> Dict[str, Any]:
        """创建AutoML实验"""
        try:
            experiment_name = experiment_config['experiment_name']
            
            # 创建SageMaker AutoML任务
            automl_job_config = {
                'AutoMLJobName': experiment_name,
                'InputDataConfig': [
                    {
                        'DataSource': {
                            'S3DataSource': {
                                'S3DataType': 'S3Prefix',
                                'S3Uri': experiment_config['training_data_s3_uri']
                            }
                        },
                        'TargetAttributeName': experiment_config['target_column']
                    }
                ],
                'OutputDataConfig': {
                    'S3OutputPath': experiment_config['output_s3_uri']
                },
                'ProblemType': experiment_config.get('problem_type', 'BinaryClassification'),
                'AutoMLJobObjective': {
                    'MetricName': experiment_config.get('objective_metric', 'F1')
                },
                'RoleArn': self.ml_platform.config['sagemaker_role'],
                'GenerateCandidateDefinitionsOnly': False,
                'AutoMLJobConfig': {
                    'CompletionCriteria': {
                        'MaxCandidates': experiment_config.get('max_candidates', 250),
                        'MaxRuntimePerTrainingJobInSeconds': experiment_config.get('max_runtime_per_job', 3600),
                        'MaxAutoMLJobRuntimeInSeconds': experiment_config.get('max_total_runtime', 86400)
                    }
                }
            }
            
            response = self.ml_platform.sagemaker.create_auto_ml_job(**automl_job_config)
            
            # 存储实验信息
            self.experiments[experiment_name] = {
                'config': experiment_config,
                'job_arn': response['AutoMLJobArn'],
                'status': 'InProgress',
                'created_at': datetime.utcnow()
            }
            
            return {
                "status": "success",
                "experiment_name": experiment_name,
                "job_arn": response['AutoMLJobArn']
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    def get_automl_experiment_status(self, experiment_name: str) -> Dict[str, Any]:
        """获取AutoML实验状态"""
        try:
            response = self.ml_platform.sagemaker.describe_auto_ml_job(AutoMLJobName=experiment_name)
            
            status_info = {
                "experiment_name": experiment_name,
                "status": response['AutoMLJobStatus'],
                "creation_time": response['CreationTime'],
                "end_time": response.get('EndTime'),
                "best_candidate": response.get('BestCandidate'),
                "auto_ml_job_secondary_status": response.get('AutoMLJobSecondaryStatus'),
                "failure_reason": response.get('FailureReason')
            }
            
            # 更新本地状态
            if experiment_name in self.experiments:
                self.experiments[experiment_name]['status'] = response['AutoMLJobStatus']
            
            return status_info
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    def get_automl_candidates(self, experiment_name: str) -> Dict[str, Any]:
        """获取AutoML候选模型"""
        try:
            response = self.ml_platform.sagemaker.list_candidates_for_auto_ml_job(
                AutoMLJobName=experiment_name,
                MaxResults=50
            )
            
            candidates = []
            for candidate in response['Candidates']:
                candidate_info = {
                    'candidate_name': candidate['CandidateName'],
                    'objective_score': candidate.get('FinalAutoMLJobObjectiveMetric', {}).get('Value'),
                    'candidate_status': candidate['CandidateStatus'],
                    'creation_time': candidate['CreationTime'],
                    'inference_containers': candidate.get('InferenceContainers', [])
                }
                candidates.append(candidate_info)
            
            return {
                "status": "success",
                "candidates": candidates,
                "total_candidates": len(candidates)
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }

# 使用示例
def ml_platform_example():
    """机器学习平台示例"""
    # 配置平台
    config = {
        "region": "us-east-1",
        "sagemaker_role": "arn:aws:iam::123456789012:role/SageMakerExecutionRole",
        "mlflow_uri": "http://mlflow.example.com:5000"
    }
    
    platform = MLPlatform(config)
    
    # 创建训练任务
    training_job = TrainingJob(
        job_id="customer-churn-training-001",
        model_name="customer-churn-model",
        dataset_path="s3://my-ml-bucket/datasets/customer-churn/train.csv",
        algorithm="xgboost",
        hyperparameters={
            "max_depth": 5,
            "eta": 0.2,
            "gamma": 4,
            "min_child_weight": 6,
            "subsample": 0.8,
            "objective": "binary:logistic",
            "num_round": 100
        },
        instance_type="ml.m4.xlarge",
        instance_count=1,
        max_runtime_seconds=3600,
        output_path="s3://my-ml-bucket/models/customer-churn/"
    )
    
    # 启动训练
    print("启动训练任务...")
    training_result = platform.create_training_job(training_job)
    print(f"训练结果: {training_result}")
    
    # 检查训练状态
    print("\n检查训练状态...")
    status = platform.get_training_job_status(training_job.job_id)
    print(f"训练状态: {status}")
    
    # 注册模型
    model_metadata = ModelMetadata(
        model_id="customer-churn-v1",
        model_name="customer-churn-model",
        model_type=ModelType.CLASSIFICATION,
        version="1.0",
        created_at=datetime.utcnow(),
        created_by="data-scientist@company.com",
        description="Customer churn prediction model using XGBoost",
        tags={"team": "data-science", "use-case": "churn-prediction"},
        metrics={"accuracy": 0.85, "precision": 0.82, "recall": 0.88, "f1": 0.85},
        parameters=training_job.hyperparameters
    )
    
    print("\n注册模型...")
    register_result = platform.register_model(
        model_metadata, 
        "s3://my-ml-bucket/models/customer-churn/model.tar.gz"
    )
    print(f"注册结果: {register_result}")
    
    # 创建模型端点
    endpoint_config = {
        "inference_image": "246618743249.dkr.ecr.us-west-2.amazonaws.com/xgboost:latest",
        "model_data_url": "s3://my-ml-bucket/models/customer-churn/model.tar.gz",
        "instance_type": "ml.t2.medium",
        "instance_count": 1
    }
    
    print("\n创建模型端点...")
    endpoint_result = platform.create_model_endpoint("customer-churn-v1", endpoint_config)
    print(f"端点结果: {endpoint_result}")
    
    # 设置模型监控
    monitoring_config = {
        "schedule": "cron(0 * * * ? *)",  # 每小时
        "output_s3_uri": "s3://my-ml-bucket/monitoring/customer-churn/",
        "monitoring_image": "246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-model-monitor-analyzer"
    }
    
    print("\n设置模型监控...")
    monitoring_result = platform.setup_model_monitoring(
        endpoint_result.get("endpoint_name", ""), 
        monitoring_config
    )
    print(f"监控结果: {monitoring_result}")
    
    # AutoML示例
    automl = AutoMLPipeline(platform)
    
    automl_config = {
        "experiment_name": "customer-churn-automl",
        "training_data_s3_uri": "s3://my-ml-bucket/datasets/customer-churn/automl-train.csv",
        "target_column": "churn",
        "output_s3_uri": "s3://my-ml-bucket/automl-output/",
        "problem_type": "BinaryClassification",
        "objective_metric": "F1",
        "max_candidates": 50,
        "max_runtime_per_job": 1800,
        "max_total_runtime": 7200
    }
    
    print("\n创建AutoML实验...")
    automl_result = automl.create_automl_experiment(automl_config)
    print(f"AutoML结果: {automl_result}")
    
    print("\nML平台示例完成!")

if __name__ == "__main__":
    ml_platform_example()

数据治理与安全

数据治理框架

企业级数据治理需要建立完整的数据管理、质量控制和合规性框架:

import json
import hashlib
import uuid
from typing import Dict, Any, List, Optional, Set
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
import boto3
from abc import ABC, abstractmethod
import logging
import re

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("data-governance")

class DataClassification(Enum):
    """数据分类"""
    PUBLIC = "public"
    INTERNAL = "internal"
    CONFIDENTIAL = "confidential"
    RESTRICTED = "restricted"

class DataSensitivity(Enum):
    """数据敏感性"""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class ComplianceFramework(Enum):
    """合规框架"""
    GDPR = "gdpr"
    CCPA = "ccpa"
    HIPAA = "hipaa"
    SOX = "sox"
    PCI_DSS = "pci_dss"

@dataclass
class DataAsset:
    """数据资产"""
    asset_id: str
    name: str
    description: str
    owner: str
    steward: str
    classification: DataClassification
    sensitivity: DataSensitivity
    location: str
    schema: Dict[str, Any]
    tags: Dict[str, str]
    created_at: datetime
    updated_at: datetime
    retention_period_days: int
    compliance_frameworks: List[ComplianceFramework]

@dataclass
class DataLineage:
    """数据血缘"""
    lineage_id: str
    source_asset_id: str
    target_asset_id: str
    transformation_logic: str
    created_at: datetime
    created_by: str

@dataclass
class DataQualityRule:
    """数据质量规则"""
    rule_id: str
    name: str
    description: str
    rule_type: str  # completeness, accuracy, consistency, validity, uniqueness
    column_name: str
    condition: str
    threshold: float
    severity: str  # warning, error, critical

@dataclass
class DataQualityResult:
    """数据质量结果"""
    result_id: str
    rule_id: str
    asset_id: str
    execution_time: datetime
    passed: bool
    score: float
    details: Dict[str, Any]

class DataCatalog:
    """数据目录"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.glue = boto3.client('glue', region_name=config.get('region', 'us-east-1'))
        self.assets: Dict[str, DataAsset] = {}
        self.lineage: List[DataLineage] = []
    
    def register_data_asset(self, asset: DataAsset) -> Dict[str, Any]:
        """注册数据资产"""
        try:
            # 存储到本地目录
            self.assets[asset.asset_id] = asset
            
            # 在AWS Glue中创建表
            database_name = self.config.get('catalog_database', 'data_catalog')
            table_name = asset.name.replace('-', '_').lower()
            
            # 构建表结构
            columns = []
            for column_name, column_info in asset.schema.items():
                columns.append({
                    'Name': column_name,
                    'Type': column_info.get('type', 'string'),
                    'Comment': column_info.get('description', '')
                })
            
            table_input = {
                'Name': table_name,
                'Description': asset.description,
                'Owner': asset.owner,
                'StorageDescriptor': {
                    'Columns': columns,
                    'Location': asset.location,
                    'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
                    'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
                    'SerdeInfo': {
                        'SerializationLibrary': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
                    }
                },
                'Parameters': {
                    'classification': asset.classification.value,
                    'sensitivity': asset.sensitivity.value,
                    'owner': asset.owner,
                    'steward': asset.steward,
                    'retention_days': str(asset.retention_period_days),
                    'compliance_frameworks': ','.join([f.value for f in asset.compliance_frameworks])
                }
            }
            
            # 添加标签
            for key, value in asset.tags.items():
                table_input['Parameters'][f'tag_{key}'] = value
            
            # 创建表
            self.glue.create_table(
                DatabaseName=database_name,
                TableInput=table_input
            )
            
            logger.info(f"Data asset {asset.asset_id} registered successfully")
            
            return {
                "status": "success",
                "asset_id": asset.asset_id,
                "catalog_table": f"{database_name}.{table_name}"
            }
            
        except Exception as e:
            logger.error(f"Error registering data asset: {e}")
            return {
                "status": "error",
                "error": str(e)
            }
    
    def search_assets(self, query: str, filters: Optional[Dict[str, Any]] = None) -> List[DataAsset]:
        """搜索数据资产"""
        results = []
        
        for asset in self.assets.values():
            # 文本搜索
            if query.lower() in asset.name.lower() or query.lower() in asset.description.lower():
                match = True
                
                # 应用过滤器
                if filters:
                    if 'classification' in filters and asset.classification != filters['classification']:
                        match = False
                    if 'owner' in filters and asset.owner != filters['owner']:
                        match = False
                    if 'tags' in filters:
                        for tag_key, tag_value in filters['tags'].items():
                            if asset.tags.get(tag_key) != tag_value:
                                match = False
                                break
                
                if match:
                    results.append(asset)
        
        return results
    
    def add_lineage(self, lineage: DataLineage) -> Dict[str, Any]:
        """添加数据血缘"""
        try:
            self.lineage.append(lineage)
            
            logger.info(f"Data lineage {lineage.lineage_id} added successfully")
            
            return {
                "status": "success",
                "lineage_id": lineage.lineage_id
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    def get_lineage_graph(self, asset_id: str, direction: str = "both") -> Dict[str, Any]:
        """获取数据血缘图"""
        upstream = []
        downstream = []
        
        for lineage in self.lineage:
            if direction in ["upstream", "both"] and lineage.target_asset_id == asset_id:
                upstream.append({
                    "source_asset_id": lineage.source_asset_id,
                    "transformation": lineage.transformation_logic,
                    "created_by": lineage.created_by
                })
            
            if direction in ["downstream", "both"] and lineage.source_asset_id == asset_id:
                downstream.append({
                    "target_asset_id": lineage.target_asset_id,
                    "transformation": lineage.transformation_logic,
                    "created_by": lineage.created_by
                })
        
        return {
            "asset_id": asset_id,
            "upstream": upstream,
            "downstream": downstream
        }

class DataQualityEngine:
    """数据质量引擎"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.rules: Dict[str, DataQualityRule] = {}
        self.results: List[DataQualityResult] = []
    
    def add_quality_rule(self, rule: DataQualityRule) -> Dict[str, Any]:
        """添加数据质量规则"""
        try:
            self.rules[rule.rule_id] = rule
            
            logger.info(f"Data quality rule {rule.rule_id} added successfully")
            
            return {
                "status": "success",
                "rule_id": rule.rule_id
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    def execute_quality_checks(self, asset_id: str, data_sample: Any) -> List[DataQualityResult]:
        """执行数据质量检查"""
        results = []
        
        for rule in self.rules.values():
            try:
                result = self._execute_single_rule(rule, asset_id, data_sample)
                results.append(result)
                self.results.append(result)
                
            except Exception as e:
                logger.error(f"Error executing rule {rule.rule_id}: {e}")
                
                # 创建失败结果
                error_result = DataQualityResult(
                    result_id=str(uuid.uuid4()),
                    rule_id=rule.rule_id,
                    asset_id=asset_id,
                    execution_time=datetime.utcnow(),
                    passed=False,
                    score=0.0,
                    details={"error": str(e)}
                )
                results.append(error_result)
                self.results.append(error_result)
        
        return results
    
    def _execute_single_rule(self, rule: DataQualityRule, asset_id: str, data_sample: Any) -> DataQualityResult:
        """执行单个质量规则"""
        import pandas as pd
        
        # 假设data_sample是pandas DataFrame
        if not isinstance(data_sample, pd.DataFrame):
            raise ValueError("Data sample must be a pandas DataFrame")
        
        result_id = str(uuid.uuid4())
        execution_time = datetime.utcnow()
        
        if rule.rule_type == "completeness":
            # 完整性检查
            total_rows = len(data_sample)
            non_null_rows = data_sample[rule.column_name].notna().sum()
            score = non_null_rows / total_rows if total_rows > 0 else 0
            passed = score >= rule.threshold
            
            details = {
                "total_rows": total_rows,
                "non_null_rows": int(non_null_rows),
                "null_rows": total_rows - int(non_null_rows),
                "completeness_rate": score
            }
            
        elif rule.rule_type == "uniqueness":
            # 唯一性检查
            total_rows = len(data_sample)
            unique_rows = data_sample[rule.column_name].nunique()
            score = unique_rows / total_rows if total_rows > 0 else 0
            passed = score >= rule.threshold
            
            details = {
                "total_rows": total_rows,
                "unique_values": unique_rows,
                "duplicate_rows": total_rows - unique_rows,
                "uniqueness_rate": score
            }
            
        elif rule.rule_type == "validity":
            # 有效性检查(基于正则表达式)
            if rule.condition.startswith("regex:"):
                pattern = rule.condition[6:]  # 移除"regex:"前缀
                valid_rows = data_sample[rule.column_name].astype(str).str.match(pattern).sum()
                total_rows = len(data_sample)
                score = valid_rows / total_rows if total_rows > 0 else 0
                passed = score >= rule.threshold
                
                details = {
                    "total_rows": total_rows,
                    "valid_rows": int(valid_rows),
                    "invalid_rows": total_rows - int(valid_rows),
                    "validity_rate": score,
                    "pattern": pattern
                }
            else:
                raise ValueError(f"Unsupported validity condition: {rule.condition}")
                
        elif rule.rule_type == "range":
            # 范围检查
            if rule.condition.startswith("between:"):
                range_str = rule.condition[8:]  # 移除"between:"前缀
                min_val, max_val = map(float, range_str.split(","))
                
                numeric_data = pd.to_numeric(data_sample[rule.column_name], errors='coerce')
                valid_rows = ((numeric_data >= min_val) & (numeric_data <= max_val)).sum()
                total_rows = len(data_sample)
                score = valid_rows / total_rows if total_rows > 0 else 0
                passed = score >= rule.threshold
                
                details = {
                    "total_rows": total_rows,
                    "valid_rows": int(valid_rows),
                    "out_of_range_rows": total_rows - int(valid_rows),
                    "range_compliance_rate": score,
                    "min_value": min_val,
                    "max_value": max_val
                }
            else:
                raise ValueError(f"Unsupported range condition: {rule.condition}")
                
        else:
            raise ValueError(f"Unsupported rule type: {rule.rule_type}")
        
        return DataQualityResult(
            result_id=result_id,
            rule_id=rule.rule_id,
            asset_id=asset_id,
            execution_time=execution_time,
            passed=passed,
            score=score,
            details=details
        )
    
    def get_quality_report(self, asset_id: Optional[str] = None, 
                          start_date: Optional[datetime] = None,
                          end_date: Optional[datetime] = None) -> Dict[str, Any]:
        """生成数据质量报告"""
        filtered_results = self.results
        
        # 应用过滤器
        if asset_id:
            filtered_results = [r for r in filtered_results if r.asset_id == asset_id]
        
        if start_date:
            filtered_results = [r for r in filtered_results if r.execution_time >= start_date]
        
        if end_date:
            filtered_results = [r for r in filtered_results if r.execution_time <= end_date]
        
        if not filtered_results:
            return {
                "total_checks": 0,
                "passed_checks": 0,
                "failed_checks": 0,
                "overall_score": 0.0,
                "results_by_rule": {},
                "trends": []
            }
        
        # 计算统计信息
        total_checks = len(filtered_results)
        passed_checks = sum(1 for r in filtered_results if r.passed)
        failed_checks = total_checks - passed_checks
        overall_score = sum(r.score for r in filtered_results) / total_checks
        
        # 按规则分组结果
        results_by_rule = {}
        for result in filtered_results:
            rule_id = result.rule_id
            if rule_id not in results_by_rule:
                results_by_rule[rule_id] = {
                    "rule_name": self.rules[rule_id].name if rule_id in self.rules else "Unknown",
                    "total_executions": 0,
                    "passed_executions": 0,
                    "average_score": 0.0,
                    "latest_result": None
                }
            
            rule_stats = results_by_rule[rule_id]
            rule_stats["total_executions"] += 1
            if result.passed:
                rule_stats["passed_executions"] += 1
            
            # 更新平均分数
            current_total = rule_stats["average_score"] * (rule_stats["total_executions"] - 1)
            rule_stats["average_score"] = (current_total + result.score) / rule_stats["total_executions"]
            
            # 更新最新结果
            if (rule_stats["latest_result"] is None or 
                result.execution_time > rule_stats["latest_result"]["execution_time"]):
                rule_stats["latest_result"] = {
                    "execution_time": result.execution_time,
                    "passed": result.passed,
                    "score": result.score
                }
        
        return {
            "total_checks": total_checks,
            "passed_checks": passed_checks,
            "failed_checks": failed_checks,
            "overall_score": overall_score,
            "pass_rate": passed_checks / total_checks,
            "results_by_rule": results_by_rule,
            "execution_period": {
                "start": min(r.execution_time for r in filtered_results),
                "end": max(r.execution_time for r in filtered_results)
            }
        }

class DataPrivacyManager:
    """数据隐私管理器"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.privacy_policies: Dict[str, Dict[str, Any]] = {}
        self.anonymization_rules: Dict[str, Dict[str, Any]] = {}
    
    def create_privacy_policy(self, policy_id: str, policy_config: Dict[str, Any]) -> Dict[str, Any]:
        """创建隐私政策"""
        try:
            policy = {
                "policy_id": policy_id,
                "name": policy_config["name"],
                "description": policy_config["description"],
                "applicable_data_types": policy_config["applicable_data_types"],
                "retention_period_days": policy_config["retention_period_days"],
                "anonymization_required": policy_config.get("anonymization_required", False),
                "encryption_required": policy_config.get("encryption_required", True),
                "access_controls": policy_config.get("access_controls", []),
                "compliance_frameworks": policy_config.get("compliance_frameworks", []),
                "created_at": datetime.utcnow(),
                "created_by": policy_config.get("created_by", "system")
            }
            
            self.privacy_policies[policy_id] = policy
            
            logger.info(f"Privacy policy {policy_id} created successfully")
            
            return {
                "status": "success",
                "policy_id": policy_id
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    def anonymize_data(self, data: Any, anonymization_config: Dict[str, Any]) -> Dict[str, Any]:
        """数据匿名化"""
        try:
            import pandas as pd
            import hashlib
            import random
            
            if not isinstance(data, pd.DataFrame):
                raise ValueError("Data must be a pandas DataFrame")
            
            anonymized_data = data.copy()
            
            for column, method in anonymization_config.items():
                if column not in anonymized_data.columns:
                    continue
                
                if method["type"] == "hash":
                    # 哈希匿名化
                    salt = method.get("salt", "default_salt")
                    anonymized_data[column] = anonymized_data[column].apply(
                        lambda x: hashlib.sha256(f"{x}{salt}".encode()).hexdigest()[:10]
                    )
                
                elif method["type"] == "mask":
                    # 掩码匿名化
                    mask_char = method.get("mask_char", "*")
                    preserve_chars = method.get("preserve_chars", 2)
                    
                    def mask_value(value):
                        if pd.isna(value):
                            return value
                        str_value = str(value)
                        if len(str_value) <= preserve_chars:
                            return mask_char * len(str_value)
                        return str_value[:preserve_chars] + mask_char * (len(str_value) - preserve_chars)
                    
                    anonymized_data[column] = anonymized_data[column].apply(mask_value)
                
                elif method["type"] == "generalize":
                    # 泛化匿名化
                    if method.get("age_ranges"):
                        # 年龄范围泛化
                        def generalize_age(age):
                            if pd.isna(age):
                                return age
                            age = int(age)
                            for range_def in method["age_ranges"]:
                                if range_def["min"] <= age <= range_def["max"]:
                                    return range_def["label"]
                            return "Unknown"
                        
                        anonymized_data[column] = anonymized_data[column].apply(generalize_age)
                
                elif method["type"] == "noise":
                    # 添加噪声
                    noise_level = method.get("noise_level", 0.1)
                    numeric_data = pd.to_numeric(anonymized_data[column], errors='coerce')
                    noise = numeric_data * noise_level * (2 * pd.Series([random.random() for _ in range(len(numeric_data))]) - 1)
                    anonymized_data[column] = numeric_data + noise
                
                elif method["type"] == "remove":
                    # 移除敏感列
                    anonymized_data = anonymized_data.drop(columns=[column])
            
            return {
                "status": "success",
                "anonymized_data": anonymized_data,
                "original_rows": len(data),
                "anonymized_rows": len(anonymized_data),
                "anonymization_summary": {
                    column: method["type"] for column, method in anonymization_config.items()
                }
            }
            
        except Exception as e:
            return {
                "status": "error",
                "error": str(e)
            }
    
    def check_compliance(self, asset: DataAsset, framework: ComplianceFramework) -> Dict[str, Any]:
        """检查合规性"""
        compliance_checks = []
        
        if framework == ComplianceFramework.GDPR:
            # GDPR合规检查
            checks = [
                {
                    "check": "data_classification",
                    "passed": asset.classification in [DataClassification.PUBLIC, DataClassification.INTERNAL],
                    "description": "Data must be properly classified"
                },
                {
                    "check": "retention_policy",
                    "passed": asset.retention_period_days > 0,
                    "description": "Data retention period must be defined"
                },
                {
                    "check": "data_owner",
                    "passed": bool(asset.owner),
                    "description": "Data owner must be assigned"
                },
                {
                    "check": "sensitive_data_protection",
                    "passed": asset.sensitivity != DataSensitivity.CRITICAL or asset.classification == DataClassification.RESTRICTED,
                    "description": "Critical sensitive data must be restricted"
                }
            ]
            compliance_checks.extend(checks)
        
        elif framework == ComplianceFramework.HIPAA:
            # HIPAA合规检查
            checks = [
                {
                    "check": "phi_identification",
                    "passed": "phi" in asset.tags,
                    "description": "PHI data must be properly tagged"
                },
                {
                    "check": "access_controls",
                    "passed": asset.classification in [DataClassification.CONFIDENTIAL, DataClassification.RESTRICTED],
                    "description": "PHI must have appropriate access controls"
                },
                {
                    "check": "encryption_required",
                    "passed": "encrypted" in asset.tags and asset.tags["encrypted"] == "true",
                    "description": "PHI must be encrypted"
                }
            ]
            compliance_checks.extend(checks)
        
        # 计算合规分数
        total_checks = len(compliance_checks)
        passed_checks = sum(1 for check in compliance_checks if check["passed"])
        compliance_score = passed_checks / total_checks if total_checks > 0 else 0
        
        return {
            "framework": framework.value,
            "asset_id": asset.asset_id,
            "compliance_score": compliance_score,
            "total_checks": total_checks,
            "passed_checks": passed_checks,
            "failed_checks": total_checks - passed_checks,
            "checks": compliance_checks,
            "overall_status": "compliant" if compliance_score >= 0.8 else "non_compliant"
        }

# 使用示例
def data_governance_example():
    """数据治理示例"""
    import pandas as pd
    
    # 配置
    config = {
        "region": "us-east-1",
        "catalog_database": "enterprise_catalog"
    }
    
    # 初始化组件
    catalog = DataCatalog(config)
    quality_engine = DataQualityEngine(config)
    privacy_manager = DataPrivacyManager(config)
    
    # 注册数据资产
    customer_asset = DataAsset(
        asset_id="customer-data-001",
        name="customer-master-data",
        description="Customer master data including personal information",
        owner="data-team@company.com",
        steward="john.doe@company.com",
        classification=DataClassification.CONFIDENTIAL,
        sensitivity=DataSensitivity.HIGH,
        location="s3://data-lake/customer/",
        schema={
            "customer_id": {"type": "string", "description": "Unique customer identifier"},
            "first_name": {"type": "string", "description": "Customer first name"},
            "last_name": {"type": "string", "description": "Customer last name"},
            "email": {"type": "string", "description": "Customer email address"},
            "phone": {"type": "string", "description": "Customer phone number"},
            "birth_date": {"type": "date", "description": "Customer birth date"},
            "registration_date": {"type": "timestamp", "description": "Account registration date"}
        },
        tags={"team": "customer-analytics", "pii": "true", "encrypted": "true"},
        created_at=datetime.utcnow(),
        updated_at=datetime.utcnow(),
        retention_period_days=2555,  # 7年
        compliance_frameworks=[ComplianceFramework.GDPR, ComplianceFramework.CCPA]
    )
    
    print("注册数据资产...")
    register_result = catalog.register_data_asset(customer_asset)
    print(f"注册结果: {register_result}")
    
    # 添加数据质量规则
    quality_rules = [
        DataQualityRule(
            rule_id="email-completeness",
            name="Email Completeness",
            description="Email field should be complete",
            rule_type="completeness",
            column_name="email",
            condition="not_null",
            threshold=0.95,
            severity="error"
        ),
        DataQualityRule(
            rule_id="email-validity",
            name="Email Validity",
            description="Email should be in valid format",
            rule_type="validity",
            column_name="email",
            condition="regex:^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$",
            threshold=0.98,
            severity="error"
        ),
        DataQualityRule(
            rule_id="customer-id-uniqueness",
            name="Customer ID Uniqueness",
            description="Customer ID should be unique",
            rule_type="uniqueness",
            column_name="customer_id",
            condition="unique",
            threshold=1.0,
            severity="critical"
        )
    ]
    
    print("\n添加数据质量规则...")
    for rule in quality_rules:
        rule_result = quality_engine.add_quality_rule(rule)
        print(f"规则 {rule.name}: {rule_result}")
    
    # 模拟数据样本
    sample_data = pd.DataFrame({
        'customer_id': ['C001', 'C002', 'C003', 'C004', 'C005'],
        'first_name': ['John', 'Jane', 'Bob', 'Alice', 'Charlie'],
        'last_name': ['Doe', 'Smith', 'Johnson', 'Brown', 'Wilson'],
        'email': ['john@example.com', 'jane@example.com', 'invalid-email', 'alice@example.com', None],
        'phone': ['+1234567890', '+1234567891', '+1234567892', '+1234567893', '+1234567894'],
        'birth_date': ['1990-01-01', '1985-05-15', '1992-12-25', '1988-07-30', '1995-03-10']
    })
    
    # 执行数据质量检查
    print("\n执行数据质量检查...")
    quality_results = quality_engine.execute_quality_checks("customer-data-001", sample_data)
    
    for result in quality_results:
        print(f"规则 {result.rule_id}: {'通过' if result.passed else '失败'} (分数: {result.score:.2f})")
        print(f"  详情: {result.details}")
    
    # 生成质量报告
    print("\n生成数据质量报告...")
    quality_report = quality_engine.get_quality_report("customer-data-001")
    print(f"总体质量分数: {quality_report['overall_score']:.2f}")
    print(f"通过率: {quality_report['pass_rate']:.2%}")
    
    # 数据匿名化
    anonymization_config = {
        "first_name": {"type": "mask", "mask_char": "*", "preserve_chars": 1},
        "last_name": {"type": "mask", "mask_char": "*", "preserve_chars": 1},
        "email": {"type": "hash", "salt": "customer_salt_2024"},
        "phone": {"type": "mask", "mask_char": "X", "preserve_chars": 3}
    }
    
    print("\n执行数据匿名化...")
    anonymization_result = privacy_manager.anonymize_data(sample_data, anonymization_config)
    if anonymization_result["status"] == "success":
        print("匿名化成功")
        print("匿名化后的数据:")
        print(anonymization_result["anonymized_data"])
    
    # 合规性检查
    print("\n执行GDPR合规性检查...")
    gdpr_compliance = privacy_manager.check_compliance(customer_asset, ComplianceFramework.GDPR)
    print(f"GDPR合规分数: {gdpr_compliance['compliance_score']:.2f}")
    print(f"合规状态: {gdpr_compliance['overall_status']}")
    
    for check in gdpr_compliance['checks']:
        status = "✓" if check['passed'] else "✗"
        print(f"  {status} {check['check']}: {check['description']}")
    
    print("\n数据治理示例完成!")

if __name__ == "__main__":
    data_governance_example()

最佳实践与建议

架构设计原则

  1. 分层架构设计

    • 数据采集层:统一数据接入标准
    • 数据存储层:湖仓一体化架构
    • 数据处理层:批流一体化处理
    • 数据服务层:统一数据服务接口
    • 数据应用层:面向业务的数据产品
  2. 数据治理优先

    • 建立数据标准和规范
    • 实施数据质量管控
    • 确保数据安全和隐私保护
    • 建立数据血缘和影响分析
  3. 技术选型策略

    • 优先选择云原生服务
    • 考虑成本效益和性能平衡
    • 确保技术栈的兼容性
    • 关注长期维护和扩展性

实施建议

  1. 分阶段实施

    • 第一阶段:建立数据湖基础设施
    • 第二阶段:实现核心数据管道
    • 第三阶段:构建AI/ML平台
    • 第四阶段:完善数据治理体系
  2. 团队建设

    • 数据工程师:负责数据管道开发
    • 数据科学家:负责模型开发和分析
    • 数据治理专员:负责数据质量和合规
    • 平台工程师:负责基础设施维护
  3. 监控和运维

    • 建立全面的监控体系
    • 实施自动化运维流程
    • 定期进行性能优化
    • 建立故障应急响应机制

总结

云数据架构的设计需要综合考虑数据湖、数据仓库、实时处理、AI平台和数据治理等多个方面。通过采用现代化的技术栈和最佳实践,企业可以构建一个高效、安全、可扩展的大数据和AI平台,为业务创新提供强有力的数据支撑。

成功的云数据架构应该具备以下特征:

  • 统一性:统一的数据标准和接口
  • 灵活性:支持多种数据类型和处理模式
  • 可扩展性:能够随业务增长而扩展
  • 安全性:全面的数据安全和隐私保护
  • 智能化:内置AI/ML能力支持智能决策

随着技术的不断发展,云数据架构也将持续演进,企业需要保持技术敏感性,及时采用新技术来提升数据处理能力和业务价值。

分享文章