跳转到主要内容

多云环境下的架构治理与成本优化:构建统一、高效的云管理平台

博主
45 分钟
9488 字
--

AI 导读

深刻理解和准确把握"多云环境下的架构治理与成本优化:构建统一、高效的云管理平台"这一重要概念的核心要义,本文从理论基础、实践应用和发展前景等多个维度进行了系统性阐述,为读者提供了全面而深入的分析视角。

内容由AI智能生成

多云环境下的架构治理与成本优化:构建统一、高效的云管理平台

目录

  1. 多云架构概述
  2. 多云治理框架
  3. 统一身份与访问管理
  4. 资源管理与编排
  5. 成本监控与优化
  6. 合规性与风险管理
  7. 运维自动化
  8. 总结

多云架构概述

多云环境已成为现代企业IT架构的主流选择,它提供了更好的灵活性、可靠性和成本效益。然而,多云环境也带来了复杂的治理和管理挑战。

graph TB
    subgraph "多云架构全景图"
        subgraph "治理层"
            A[统一治理平台]
            B[策略管理]
            C[合规监控]
        end
        
        subgraph "管理层"
            D[身份管理]
            E[资源编排]
            F[成本管理]
            G[监控告警]
        end
        
        subgraph "云服务提供商"
            H[AWS]
            I[Azure]
            J[GCP]
            K[阿里云]
        end
        
        subgraph "应用层"
            L[Web应用]
            M[数据库]
            N[存储服务]
            O[AI/ML服务]
        end
    end
    
    A --> D
    A --> E
    A --> F
    A --> G
    
    D --> H
    D --> I
    D --> J
    D --> K
    
    E --> H
    E --> I
    E --> J
    E --> K
    
    H --> L
    H --> M
    I --> N
    J --> O

多云架构分析器

#!/usr/bin/env python3
"""
多云架构分析器
分析和评估多云环境的架构特征和优化建议
"""

import json
import boto3
import azure.identity
import azure.mgmt.resource
from google.cloud import resource_manager
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class CloudProvider:
    """云服务提供商配置"""
    name: str
    region: str
    credentials: Dict[str, str]
    services: List[str]
    cost_center: str
    compliance_requirements: List[str]

@dataclass
class ResourceInventory:
    """资源清单"""
    provider: str
    region: str
    resource_type: str
    resource_id: str
    name: str
    status: str
    cost_monthly: float
    tags: Dict[str, str]
    compliance_status: str

class MultiCloudAnalyzer:
    """多云架构分析器"""
    
    def __init__(self):
        self.providers = {}
        self.resource_inventory = []
        self.cost_data = {}
        self.compliance_rules = []
        
    def add_cloud_provider(self, provider: CloudProvider):
        """添加云服务提供商"""
        self.providers[provider.name] = provider
        logger.info(f"添加云服务提供商: {provider.name}")
    
    def analyze_aws_resources(self, aws_config: Dict[str, str]) -> List[ResourceInventory]:
        """分析AWS资源"""
        try:
            session = boto3.Session(
                aws_access_key_id=aws_config.get('access_key'),
                aws_secret_access_key=aws_config.get('secret_key'),
                region_name=aws_config.get('region', 'us-east-1')
            )
            
            resources = []
            
            # EC2实例
            ec2 = session.client('ec2')
            instances = ec2.describe_instances()
            
            for reservation in instances['Reservations']:
                for instance in reservation['Instances']:
                    tags = {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}
                    
                    resources.append(ResourceInventory(
                        provider="AWS",
                        region=instance['Placement']['AvailabilityZone'][:-1],
                        resource_type="EC2",
                        resource_id=instance['InstanceId'],
                        name=tags.get('Name', instance['InstanceId']),
                        status=instance['State']['Name'],
                        cost_monthly=self._estimate_ec2_cost(instance['InstanceType']),
                        tags=tags,
                        compliance_status=self._check_compliance(tags)
                    ))
            
            # RDS实例
            rds = session.client('rds')
            db_instances = rds.describe_db_instances()
            
            for db in db_instances['DBInstances']:
                tags = {tag['Key']: tag['Value'] for tag in db.get('TagList', [])}
                
                resources.append(ResourceInventory(
                    provider="AWS",
                    region=db['AvailabilityZone'][:-1] if db.get('AvailabilityZone') else aws_config.get('region'),
                    resource_type="RDS",
                    resource_id=db['DBInstanceIdentifier'],
                    name=db['DBInstanceIdentifier'],
                    status=db['DBInstanceStatus'],
                    cost_monthly=self._estimate_rds_cost(db['DBInstanceClass']),
                    tags=tags,
                    compliance_status=self._check_compliance(tags)
                ))
            
            # S3存储桶
            s3 = session.client('s3')
            buckets = s3.list_buckets()
            
            for bucket in buckets['Buckets']:
                try:
                    tags_response = s3.get_bucket_tagging(Bucket=bucket['Name'])
                    tags = {tag['Key']: tag['Value'] for tag in tags_response.get('TagSet', [])}
                except:
                    tags = {}
                
                resources.append(ResourceInventory(
                    provider="AWS",
                    region=aws_config.get('region'),
                    resource_type="S3",
                    resource_id=bucket['Name'],
                    name=bucket['Name'],
                    status="Active",
                    cost_monthly=self._estimate_s3_cost(bucket['Name'], session),
                    tags=tags,
                    compliance_status=self._check_compliance(tags)
                ))
            
            return resources
            
        except Exception as e:
            logger.error(f"分析AWS资源失败: {e}")
            return []
    
    def analyze_azure_resources(self, azure_config: Dict[str, str]) -> List[ResourceInventory]:
        """分析Azure资源"""
        try:
            credential = azure.identity.ClientSecretCredential(
                tenant_id=azure_config.get('tenant_id'),
                client_id=azure_config.get('client_id'),
                client_secret=azure_config.get('client_secret')
            )
            
            resource_client = azure.mgmt.resource.ResourceManagementClient(
                credential, azure_config.get('subscription_id')
            )
            
            resources = []
            
            # 获取所有资源组
            resource_groups = resource_client.resource_groups.list()
            
            for rg in resource_groups:
                # 获取资源组中的资源
                rg_resources = resource_client.resources.list_by_resource_group(rg.name)
                
                for resource in rg_resources:
                    tags = resource.tags or {}
                    
                    resources.append(ResourceInventory(
                        provider="Azure",
                        region=resource.location,
                        resource_type=resource.type.split('/')[-1],
                        resource_id=resource.id,
                        name=resource.name,
                        status="Active",
                        cost_monthly=self._estimate_azure_cost(resource.type),
                        tags=tags,
                        compliance_status=self._check_compliance(tags)
                    ))
            
            return resources
            
        except Exception as e:
            logger.error(f"分析Azure资源失败: {e}")
            return []
    
    def analyze_gcp_resources(self, gcp_config: Dict[str, str]) -> List[ResourceInventory]:
        """分析GCP资源"""
        try:
            # 这里简化处理,实际需要配置GCP认证
            resources = []
            
            # 模拟GCP资源分析
            sample_resources = [
                {
                    "name": "web-server-instance",
                    "type": "compute.instances",
                    "zone": "us-central1-a",
                    "status": "RUNNING",
                    "labels": {"env": "production", "team": "backend"}
                },
                {
                    "name": "database-instance",
                    "type": "sql.instances",
                    "region": "us-central1",
                    "status": "RUNNABLE",
                    "labels": {"env": "production", "team": "data"}
                }
            ]
            
            for resource in sample_resources:
                resources.append(ResourceInventory(
                    provider="GCP",
                    region=resource.get('zone', resource.get('region', 'us-central1')),
                    resource_type=resource['type'].split('.')[-1],
                    resource_id=resource['name'],
                    name=resource['name'],
                    status=resource['status'],
                    cost_monthly=self._estimate_gcp_cost(resource['type']),
                    tags=resource.get('labels', {}),
                    compliance_status=self._check_compliance(resource.get('labels', {}))
                ))
            
            return resources
            
        except Exception as e:
            logger.error(f"分析GCP资源失败: {e}")
            return []
    
    def _estimate_ec2_cost(self, instance_type: str) -> float:
        """估算EC2成本"""
        cost_map = {
            't3.micro': 8.5,
            't3.small': 17.0,
            't3.medium': 34.0,
            't3.large': 68.0,
            'm5.large': 88.0,
            'm5.xlarge': 176.0,
            'c5.large': 78.0,
            'r5.large': 115.0
        }
        return cost_map.get(instance_type, 50.0)
    
    def _estimate_rds_cost(self, instance_class: str) -> float:
        """估算RDS成本"""
        cost_map = {
            'db.t3.micro': 15.0,
            'db.t3.small': 30.0,
            'db.t3.medium': 60.0,
            'db.m5.large': 150.0,
            'db.r5.large': 200.0
        }
        return cost_map.get(instance_class, 75.0)
    
    def _estimate_s3_cost(self, bucket_name: str, session) -> float:
        """估算S3成本"""
        try:
            cloudwatch = session.client('cloudwatch')
            # 获取存储使用量指标
            response = cloudwatch.get_metric_statistics(
                Namespace='AWS/S3',
                MetricName='BucketSizeBytes',
                Dimensions=[
                    {'Name': 'BucketName', 'Value': bucket_name},
                    {'Name': 'StorageType', 'Value': 'StandardStorage'}
                ],
                StartTime=datetime.now() - timedelta(days=1),
                EndTime=datetime.now(),
                Period=86400,
                Statistics=['Average']
            )
            
            if response['Datapoints']:
                size_bytes = response['Datapoints'][0]['Average']
                size_gb = size_bytes / (1024 ** 3)
                return size_gb * 0.023  # $0.023 per GB
            
        except:
            pass
        
        return 10.0  # 默认估算
    
    def _estimate_azure_cost(self, resource_type: str) -> float:
        """估算Azure成本"""
        cost_map = {
            'virtualMachines': 80.0,
            'sqlDatabases': 120.0,
            'storageAccounts': 15.0,
            'networkSecurityGroups': 0.0,
            'publicIPAddresses': 3.0
        }
        return cost_map.get(resource_type.split('/')[-1], 25.0)
    
    def _estimate_gcp_cost(self, resource_type: str) -> float:
        """估算GCP成本"""
        cost_map = {
            'instances': 75.0,
            'sql.instances': 110.0,
            'storage.buckets': 12.0
        }
        return cost_map.get(resource_type, 30.0)
    
    def _check_compliance(self, tags: Dict[str, str]) -> str:
        """检查合规性"""
        required_tags = ['Environment', 'Owner', 'Project']
        
        missing_tags = [tag for tag in required_tags if tag not in tags]
        
        if not missing_tags:
            return "Compliant"
        elif len(missing_tags) <= 1:
            return "Warning"
        else:
            return "Non-Compliant"
    
    def generate_multi_cloud_report(self) -> Dict[str, Any]:
        """生成多云分析报告"""
        try:
            # 按云服务提供商分组
            provider_summary = {}
            total_cost = 0
            compliance_summary = {"Compliant": 0, "Warning": 0, "Non-Compliant": 0}
            
            for resource in self.resource_inventory:
                provider = resource.provider
                
                if provider not in provider_summary:
                    provider_summary[provider] = {
                        "resource_count": 0,
                        "total_cost": 0,
                        "resource_types": set(),
                        "regions": set()
                    }
                
                provider_summary[provider]["resource_count"] += 1
                provider_summary[provider]["total_cost"] += resource.cost_monthly
                provider_summary[provider]["resource_types"].add(resource.resource_type)
                provider_summary[provider]["regions"].add(resource.region)
                
                total_cost += resource.cost_monthly
                compliance_summary[resource.compliance_status] += 1
            
            # 转换set为list以便JSON序列化
            for provider in provider_summary:
                provider_summary[provider]["resource_types"] = list(provider_summary[provider]["resource_types"])
                provider_summary[provider]["regions"] = list(provider_summary[provider]["regions"])
            
            # 成本分析
            cost_analysis = {
                "total_monthly_cost": total_cost,
                "cost_by_provider": {
                    provider: data["total_cost"] 
                    for provider, data in provider_summary.items()
                },
                "cost_by_resource_type": {},
                "cost_optimization_opportunities": []
            }
            
            # 按资源类型统计成本
            for resource in self.resource_inventory:
                resource_type = resource.resource_type
                if resource_type not in cost_analysis["cost_by_resource_type"]:
                    cost_analysis["cost_by_resource_type"][resource_type] = 0
                cost_analysis["cost_by_resource_type"][resource_type] += resource.cost_monthly
            
            # 识别成本优化机会
            for resource in self.resource_inventory:
                if resource.status in ['stopped', 'deallocated'] and resource.cost_monthly > 0:
                    cost_analysis["cost_optimization_opportunities"].append({
                        "type": "unused_resource",
                        "resource": f"{resource.provider}:{resource.resource_id}",
                        "potential_savings": resource.cost_monthly,
                        "recommendation": "Consider terminating unused resource"
                    })
            
            return {
                "timestamp": datetime.now().isoformat(),
                "summary": {
                    "total_resources": len(self.resource_inventory),
                    "total_providers": len(provider_summary),
                    "total_monthly_cost": total_cost,
                    "compliance_score": compliance_summary["Compliant"] / len(self.resource_inventory) * 100 if self.resource_inventory else 0
                },
                "provider_summary": provider_summary,
                "cost_analysis": cost_analysis,
                "compliance_summary": compliance_summary,
                "recommendations": self._generate_recommendations()
            }
            
        except Exception as e:
            logger.error(f"生成多云分析报告失败: {e}")
            return {}
    
    def _generate_recommendations(self) -> List[Dict[str, str]]:
        """生成优化建议"""
        recommendations = []
        
        # 成本优化建议
        if len(self.providers) > 1:
            recommendations.append({
                "category": "Cost Optimization",
                "priority": "High",
                "title": "实施多云成本比较",
                "description": "定期比较不同云服务提供商的价格,选择最具成本效益的服务"
            })
        
        # 合规性建议
        non_compliant_count = sum(1 for r in self.resource_inventory if r.compliance_status == "Non-Compliant")
        if non_compliant_count > 0:
            recommendations.append({
                "category": "Compliance",
                "priority": "Critical",
                "title": "修复合规性问题",
                "description": f"有{non_compliant_count}个资源不符合标签要求,需要立即修复"
            })
        
        # 架构优化建议
        recommendations.append({
            "category": "Architecture",
            "priority": "Medium",
            "title": "实施统一监控",
            "description": "建立跨云的统一监控和告警系统"
        })
        
        recommendations.append({
            "category": "Security",
            "priority": "High",
            "title": "统一身份管理",
            "description": "实施跨云的统一身份和访问管理(IAM)系统"
        })
        
        return recommendations

# 使用示例
def main():
    """主函数 - 多云架构分析示例"""
    analyzer = MultiCloudAnalyzer()
    
    print("=== 多云架构分析示例 ===")
    
    # 添加云服务提供商
    aws_provider = CloudProvider(
        name="AWS",
        region="us-east-1",
        credentials={"access_key": "AKIA...", "secret_key": "xxx"},
        services=["EC2", "RDS", "S3", "Lambda"],
        cost_center="IT-Infrastructure",
        compliance_requirements=["SOC2", "ISO27001"]
    )
    
    azure_provider = CloudProvider(
        name="Azure",
        region="East US",
        credentials={"tenant_id": "xxx", "client_id": "xxx", "client_secret": "xxx"},
        services=["VirtualMachines", "SQLDatabase", "Storage"],
        cost_center="IT-Infrastructure",
        compliance_requirements=["SOC2", "GDPR"]
    )
    
    analyzer.add_cloud_provider(aws_provider)
    analyzer.add_cloud_provider(azure_provider)
    
    # 模拟资源清单
    analyzer.resource_inventory = [
        ResourceInventory(
            provider="AWS",
            region="us-east-1",
            resource_type="EC2",
            resource_id="i-1234567890abcdef0",
            name="web-server-1",
            status="running",
            cost_monthly=88.0,
            tags={"Environment": "production", "Owner": "team-a", "Project": "web-app"},
            compliance_status="Compliant"
        ),
        ResourceInventory(
            provider="Azure",
            region="East US",
            resource_type="VirtualMachine",
            resource_id="/subscriptions/.../vm-web-2",
            name="web-server-2",
            status="running",
            cost_monthly=95.0,
            tags={"Environment": "production", "Project": "web-app"},
            compliance_status="Warning"
        ),
        ResourceInventory(
            provider="AWS",
            region="us-west-2",
            resource_type="RDS",
            resource_id="database-prod",
            name="main-database",
            status="available",
            cost_monthly=150.0,
            tags={"Environment": "production", "Owner": "team-b"},
            compliance_status="Warning"
        )
    ]
    
    # 生成分析报告
    report = analyzer.generate_multi_cloud_report()
    print("\n多云架构分析报告:")
    print(json.dumps(report, indent=2, ensure_ascii=False))

if __name__ == "__main__":
    main()

多云治理框架

建立统一的多云治理框架是管理复杂多云环境的关键。

治理策略管理器

#!/usr/bin/env python3
"""
多云治理策略管理器
提供统一的策略定义、执行和监控能力
"""

import json
import yaml
import re
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class PolicyType(Enum):
    """策略类型"""
    SECURITY = "security"
    COST = "cost"
    COMPLIANCE = "compliance"
    RESOURCE = "resource"
    NETWORK = "network"

class PolicySeverity(Enum):
    """策略严重性"""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class GovernancePolicy:
    """治理策略"""
    id: str
    name: str
    description: str
    policy_type: PolicyType
    severity: PolicySeverity
    conditions: List[Dict[str, Any]]
    actions: List[Dict[str, Any]]
    enabled: bool = True
    created_at: Optional[datetime] = None
    updated_at: Optional[datetime] = None

@dataclass
class PolicyViolation:
    """策略违规"""
    policy_id: str
    resource_id: str
    provider: str
    violation_type: str
    description: str
    severity: PolicySeverity
    detected_at: datetime
    resolved: bool = False

class MultiCloudGovernanceManager:
    """多云治理管理器"""
    
    def __init__(self):
        self.policies = {}
        self.violations = []
        self.policy_templates = self._load_policy_templates()
        
    def _load_policy_templates(self) -> Dict[str, GovernancePolicy]:
        """加载策略模板"""
        templates = {}
        
        # 安全策略模板
        templates['security_sg_open'] = GovernancePolicy(
            id="security_sg_open",
            name="禁止开放安全组",
            description="检测并阻止创建允许0.0.0.0/0访问的安全组",
            policy_type=PolicyType.SECURITY,
            severity=PolicySeverity.CRITICAL,
            conditions=[
                {
                    "resource_type": "SecurityGroup",
                    "field": "ingress_rules",
                    "operator": "contains",
                    "value": "0.0.0.0/0"
                }
            ],
            actions=[
                {
                    "type": "block",
                    "message": "不允许创建开放的安全组规则"
                },
                {
                    "type": "notify",
                    "channels": ["email", "slack"],
                    "recipients": ["security-team@company.com"]
                }
            ]
        )
        
        # 成本策略模板
        templates['cost_large_instance'] = GovernancePolicy(
            id="cost_large_instance",
            name="大型实例成本控制",
            description="监控和控制大型实例的创建",
            policy_type=PolicyType.COST,
            severity=PolicySeverity.HIGH,
            conditions=[
                {
                    "resource_type": "VirtualMachine",
                    "field": "instance_size",
                    "operator": "in",
                    "value": ["xlarge", "2xlarge", "4xlarge"]
                }
            ],
            actions=[
                {
                    "type": "require_approval",
                    "approvers": ["cost-center-manager"]
                },
                {
                    "type": "notify",
                    "channels": ["email"],
                    "recipients": ["finance-team@company.com"]
                }
            ]
        )
        
        # 合规策略模板
        templates['compliance_tagging'] = GovernancePolicy(
            id="compliance_tagging",
            name="强制资源标签",
            description="确保所有资源都有必需的标签",
            policy_type=PolicyType.COMPLIANCE,
            severity=PolicySeverity.MEDIUM,
            conditions=[
                {
                    "resource_type": "*",
                    "field": "tags",
                    "operator": "missing",
                    "value": ["Environment", "Owner", "Project", "CostCenter"]
                }
            ],
            actions=[
                {
                    "type": "auto_tag",
                    "default_tags": {
                        "Environment": "unspecified",
                        "Owner": "unknown",
                        "Project": "unassigned",
                        "CostCenter": "default"
                    }
                },
                {
                    "type": "notify",
                    "channels": ["email"],
                    "recipients": ["governance-team@company.com"]
                }
            ]
        )
        
        # 资源策略模板
        templates['resource_lifecycle'] = GovernancePolicy(
            id="resource_lifecycle",
            name="资源生命周期管理",
            description="自动管理资源的生命周期",
            policy_type=PolicyType.RESOURCE,
            severity=PolicySeverity.MEDIUM,
            conditions=[
                {
                    "resource_type": "*",
                    "field": "age_days",
                    "operator": "greater_than",
                    "value": 90
                },
                {
                    "resource_type": "*",
                    "field": "utilization",
                    "operator": "less_than",
                    "value": 10
                }
            ],
            actions=[
                {
                    "type": "schedule_deletion",
                    "grace_period_days": 7
                },
                {
                    "type": "notify",
                    "channels": ["email"],
                    "recipients": ["resource-owners"]
                }
            ]
        )
        
        return templates
    
    def create_policy(self, policy: GovernancePolicy) -> bool:
        """创建治理策略"""
        try:
            policy.created_at = datetime.now()
            policy.updated_at = datetime.now()
            self.policies[policy.id] = policy
            
            logger.info(f"策略创建成功: {policy.name}")
            return True
            
        except Exception as e:
            logger.error(f"创建策略失败: {e}")
            return False
    
    def update_policy(self, policy_id: str, updates: Dict[str, Any]) -> bool:
        """更新治理策略"""
        try:
            if policy_id not in self.policies:
                logger.error(f"策略不存在: {policy_id}")
                return False
            
            policy = self.policies[policy_id]
            
            for key, value in updates.items():
                if hasattr(policy, key):
                    setattr(policy, key, value)
            
            policy.updated_at = datetime.now()
            
            logger.info(f"策略更新成功: {policy.name}")
            return True
            
        except Exception as e:
            logger.error(f"更新策略失败: {e}")
            return False
    
    def evaluate_resource(self, resource: ResourceInventory) -> List[PolicyViolation]:
        """评估资源是否违反策略"""
        violations = []
        
        try:
            for policy in self.policies.values():
                if not policy.enabled:
                    continue
                
                if self._check_policy_conditions(resource, policy):
                    violation = PolicyViolation(
                        policy_id=policy.id,
                        resource_id=resource.resource_id,
                        provider=resource.provider,
                        violation_type=policy.policy_type.value,
                        description=f"资源违反策略: {policy.name}",
                        severity=policy.severity,
                        detected_at=datetime.now()
                    )
                    violations.append(violation)
                    
                    # 执行策略动作
                    self._execute_policy_actions(resource, policy, violation)
            
            return violations
            
        except Exception as e:
            logger.error(f"评估资源策略失败: {e}")
            return []
    
    def _check_policy_conditions(self, resource: ResourceInventory, 
                                policy: GovernancePolicy) -> bool:
        """检查策略条件"""
        try:
            for condition in policy.conditions:
                resource_type = condition.get('resource_type', '*')
                
                # 检查资源类型匹配
                if resource_type != '*' and resource_type != resource.resource_type:
                    continue
                
                field = condition.get('field')
                operator = condition.get('operator')
                value = condition.get('value')
                
                # 获取资源字段值
                resource_value = self._get_resource_field_value(resource, field)
                
                # 执行条件检查
                if not self._evaluate_condition(resource_value, operator, value):
                    return False
            
            return True
            
        except Exception as e:
            logger.error(f"检查策略条件失败: {e}")
            return False
    
    def _get_resource_field_value(self, resource: ResourceInventory, field: str) -> Any:
        """获取资源字段值"""
        if field == "tags":
            return resource.tags
        elif field == "status":
            return resource.status
        elif field == "cost_monthly":
            return resource.cost_monthly
        elif field == "age_days":
            # 模拟计算资源年龄
            return 30  # 默认30天
        elif field == "utilization":
            # 模拟资源利用率
            return 15  # 默认15%
        else:
            return getattr(resource, field, None)
    
    def _evaluate_condition(self, resource_value: Any, operator: str, condition_value: Any) -> bool:
        """评估条件"""
        try:
            if operator == "equals":
                return resource_value == condition_value
            elif operator == "not_equals":
                return resource_value != condition_value
            elif operator == "contains":
                return condition_value in str(resource_value)
            elif operator == "missing":
                if isinstance(resource_value, dict) and isinstance(condition_value, list):
                    return any(tag not in resource_value for tag in condition_value)
                return resource_value is None
            elif operator == "greater_than":
                return float(resource_value) > float(condition_value)
            elif operator == "less_than":
                return float(resource_value) < float(condition_value)
            elif operator == "in":
                return resource_value in condition_value
            else:
                return False
                
        except Exception as e:
            logger.error(f"评估条件失败: {e}")
            return False
    
    def _execute_policy_actions(self, resource: ResourceInventory, 
                               policy: GovernancePolicy, violation: PolicyViolation):
        """执行策略动作"""
        try:
            for action in policy.actions:
                action_type = action.get('type')
                
                if action_type == "block":
                    logger.warning(f"阻止操作: {action.get('message', '策略违规')}")
                
                elif action_type == "notify":
                    self._send_notification(action, violation)
                
                elif action_type == "auto_tag":
                    self._auto_tag_resource(resource, action.get('default_tags', {}))
                
                elif action_type == "require_approval":
                    self._require_approval(resource, action.get('approvers', []))
                
                elif action_type == "schedule_deletion":
                    grace_period = action.get('grace_period_days', 7)
                    self._schedule_resource_deletion(resource, grace_period)
                
        except Exception as e:
            logger.error(f"执行策略动作失败: {e}")
    
    def _send_notification(self, action: Dict[str, Any], violation: PolicyViolation):
        """发送通知"""
        channels = action.get('channels', [])
        recipients = action.get('recipients', [])
        
        message = f"策略违规检测: {violation.description}"
        
        for channel in channels:
            for recipient in recipients:
                logger.info(f"发送{channel}通知到{recipient}: {message}")
    
    def _auto_tag_resource(self, resource: ResourceInventory, default_tags: Dict[str, str]):
        """自动标签资源"""
        for tag_key, tag_value in default_tags.items():
            if tag_key not in resource.tags:
                resource.tags[tag_key] = tag_value
                logger.info(f"自动添加标签 {tag_key}={tag_value} 到资源 {resource.resource_id}")
    
    def _require_approval(self, resource: ResourceInventory, approvers: List[str]):
        """需要审批"""
        logger.info(f"资源 {resource.resource_id} 需要以下人员审批: {', '.join(approvers)}")
    
    def _schedule_resource_deletion(self, resource: ResourceInventory, grace_period_days: int):
        """计划删除资源"""
        deletion_date = datetime.now() + timedelta(days=grace_period_days)
        logger.info(f"资源 {resource.resource_id} 计划于 {deletion_date} 删除")
    
    def generate_governance_report(self) -> Dict[str, Any]:
        """生成治理报告"""
        try:
            # 策略统计
            policy_stats = {
                "total_policies": len(self.policies),
                "enabled_policies": len([p for p in self.policies.values() if p.enabled]),
                "policies_by_type": {},
                "policies_by_severity": {}
            }
            
            for policy in self.policies.values():
                policy_type = policy.policy_type.value
                severity = policy.severity.value
                
                policy_stats["policies_by_type"][policy_type] = policy_stats["policies_by_type"].get(policy_type, 0) + 1
                policy_stats["policies_by_severity"][severity] = policy_stats["policies_by_severity"].get(severity, 0) + 1
            
            # 违规统计
            violation_stats = {
                "total_violations": len(self.violations),
                "unresolved_violations": len([v for v in self.violations if not v.resolved]),
                "violations_by_type": {},
                "violations_by_severity": {},
                "violations_by_provider": {}
            }
            
            for violation in self.violations:
                violation_type = violation.violation_type
                severity = violation.severity.value
                provider = violation.provider
                
                violation_stats["violations_by_type"][violation_type] = violation_stats["violations_by_type"].get(violation_type, 0) + 1
                violation_stats["violations_by_severity"][severity] = violation_stats["violations_by_severity"].get(severity, 0) + 1
                violation_stats["violations_by_provider"][provider] = violation_stats["violations_by_provider"].get(provider, 0) + 1
            
            return {
                "timestamp": datetime.now().isoformat(),
                "policy_statistics": policy_stats,
                "violation_statistics": violation_stats,
                "compliance_score": self._calculate_compliance_score(),
                "recommendations": self._generate_governance_recommendations()
            }
            
        except Exception as e:
            logger.error(f"生成治理报告失败: {e}")
            return {}
    
    def _calculate_compliance_score(self) -> float:
        """计算合规分数"""
        if not self.violations:
            return 100.0
        
        total_violations = len(self.violations)
        resolved_violations = len([v for v in self.violations if v.resolved])
        
        return (resolved_violations / total_violations) * 100
    
    def _generate_governance_recommendations(self) -> List[Dict[str, str]]:
        """生成治理建议"""
        recommendations = []
        
        unresolved_critical = len([v for v in self.violations 
                                 if not v.resolved and v.severity == PolicySeverity.CRITICAL])
        
        if unresolved_critical > 0:
            recommendations.append({
                "priority": "Critical",
                "title": "立即处理关键违规",
                "description": f"有{unresolved_critical}个关键违规需要立即处理"
            })
        
        if len(self.policies) < 5:
            recommendations.append({
                "priority": "Medium",
                "title": "扩展策略覆盖",
                "description": "建议添加更多治理策略以提高覆盖率"
            })
        
        return recommendations

# 使用示例
def main():
    """主函数 - 多云治理管理示例"""
    governance = MultiCloudGovernanceManager()
    
    print("=== 多云治理管理示例 ===")
    
    # 创建策略
    for template_id, template in governance.policy_templates.items():
        governance.create_policy(template)
    
    # 模拟策略违规
    governance.violations = [
        PolicyViolation(
            policy_id="security_sg_open",
            resource_id="sg-12345",
            provider="AWS",
            violation_type="security",
            description="安全组允许0.0.0.0/0访问",
            severity=PolicySeverity.CRITICAL,
            detected_at=datetime.now() - timedelta(hours=2)
        ),
        PolicyViolation(
            policy_id="compliance_tagging",
            resource_id="vm-67890",
            provider="Azure",
            violation_type="compliance",
            description="缺少必需标签",
            severity=PolicySeverity.MEDIUM,
            detected_at=datetime.now() - timedelta(hours=1)
        )
    ]
    
    # 生成治理报告
    report = governance.generate_governance_report()
    print("\n治理报告:")
    print(json.dumps(report, indent=2, ensure_ascii=False))

if __name__ == "__main__":
    main()

统一身份与访问管理

多云环境需要统一的身份和访问管理系统来确保安全性和一致性。

多云IAM管理器

#!/usr/bin/env python3
"""
多云统一身份与访问管理器
提供跨云平台的统一身份认证和权限管理
"""

import json
import jwt
import hashlib
import secrets
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class PermissionLevel(Enum):
    """权限级别"""
    READ = "read"
    WRITE = "write"
    ADMIN = "admin"
    OWNER = "owner"

class ResourceType(Enum):
    """资源类型"""
    COMPUTE = "compute"
    STORAGE = "storage"
    DATABASE = "database"
    NETWORK = "network"
    SECURITY = "security"
    ALL = "*"

@dataclass
class User:
    """用户"""
    id: str
    username: str
    email: str
    full_name: str
    department: str
    role: str
    active: bool = True
    created_at: Optional[datetime] = None
    last_login: Optional[datetime] = None

@dataclass
class Role:
    """角色"""
    id: str
    name: str
    description: str
    permissions: List[Dict[str, Any]]
    cloud_providers: List[str]
    created_at: Optional[datetime] = None

@dataclass
class Permission:
    """权限"""
    resource_type: ResourceType
    permission_level: PermissionLevel
    cloud_provider: str
    region: Optional[str] = None
    resource_filter: Optional[Dict[str, str]] = None

@dataclass
class AccessToken:
    """访问令牌"""
    token: str
    user_id: str
    expires_at: datetime
    scopes: List[str]
    cloud_providers: List[str]

class MultiCloudIAMManager:
    """多云IAM管理器"""
    
    def __init__(self, secret_key: str):
        self.secret_key = secret_key
        self.users = {}
        self.roles = {}
        self.user_roles = {}  # user_id -> [role_ids]
        self.active_tokens = {}
        self.audit_logs = []
        
        # 初始化默认角色
        self._initialize_default_roles()
    
    def _initialize_default_roles(self):
        """初始化默认角色"""
        # 只读角色
        readonly_role = Role(
            id="readonly",
            name="只读用户",
            description="只能查看资源,无法修改",
            permissions=[
                {
                    "resource_type": ResourceType.ALL.value,
                    "permission_level": PermissionLevel.READ.value,
                    "cloud_providers": ["AWS", "Azure", "GCP"]
                }
            ],
            cloud_providers=["AWS", "Azure", "GCP"],
            created_at=datetime.now()
        )
        
        # 开发者角色
        developer_role = Role(
            id="developer",
            name="开发者",
            description="可以管理开发环境资源",
            permissions=[
                {
                    "resource_type": ResourceType.COMPUTE.value,
                    "permission_level": PermissionLevel.WRITE.value,
                    "cloud_providers": ["AWS", "Azure", "GCP"],
                    "resource_filter": {"Environment": "development"}
                },
                {
                    "resource_type": ResourceType.STORAGE.value,
                    "permission_level": PermissionLevel.WRITE.value,
                    "cloud_providers": ["AWS", "Azure", "GCP"],
                    "resource_filter": {"Environment": "development"}
                }
            ],
            cloud_providers=["AWS", "Azure", "GCP"],
            created_at=datetime.now()
        )
        
        # 运维角色
        ops_role = Role(
            id="ops",
            name="运维工程师",
            description="可以管理生产环境资源",
            permissions=[
                {
                    "resource_type": ResourceType.ALL.value,
                    "permission_level": PermissionLevel.ADMIN.value,
                    "cloud_providers": ["AWS", "Azure", "GCP"]
                }
            ],
            cloud_providers=["AWS", "Azure", "GCP"],
            created_at=datetime.now()
        )
        
        # 安全管理员角色
        security_role = Role(
            id="security_admin",
            name="安全管理员",
            description="管理安全相关资源和策略",
            permissions=[
                {
                    "resource_type": ResourceType.SECURITY.value,
                    "permission_level": PermissionLevel.OWNER.value,
                    "cloud_providers": ["AWS", "Azure", "GCP"]
                },
                {
                    "resource_type": ResourceType.NETWORK.value,
                    "permission_level": PermissionLevel.ADMIN.value,
                    "cloud_providers": ["AWS", "Azure", "GCP"]
                }
            ],
            cloud_providers=["AWS", "Azure", "GCP"],
            created_at=datetime.now()
        )
        
        self.roles = {
            "readonly": readonly_role,
            "developer": developer_role,
            "ops": ops_role,
            "security_admin": security_role
        }
    
    def create_user(self, user: User) -> bool:
        """创建用户"""
        try:
            user.created_at = datetime.now()
            self.users[user.id] = user
            self.user_roles[user.id] = []
            
            self._log_audit_event("USER_CREATED", user.id, {"username": user.username})
            logger.info(f"用户创建成功: {user.username}")
            return True
            
        except Exception as e:
            logger.error(f"创建用户失败: {e}")
            return False
    
    def assign_role(self, user_id: str, role_id: str) -> bool:
        """分配角色"""
        try:
            if user_id not in self.users:
                logger.error(f"用户不存在: {user_id}")
                return False
            
            if role_id not in self.roles:
                logger.error(f"角色不存在: {role_id}")
                return False
            
            if role_id not in self.user_roles[user_id]:
                self.user_roles[user_id].append(role_id)
                
                self._log_audit_event("ROLE_ASSIGNED", user_id, {
                    "role_id": role_id,
                    "role_name": self.roles[role_id].name
                })
                
                logger.info(f"角色分配成功: {user_id} -> {role_id}")
                return True
            
            return True
            
        except Exception as e:
            logger.error(f"分配角色失败: {e}")
            return False
    
    def authenticate_user(self, username: str, password: str) -> Optional[AccessToken]:
        """用户认证"""
        try:
            # 查找用户
            user = None
            for u in self.users.values():
                if u.username == username and u.active:
                    user = u
                    break
            
            if not user:
                logger.warning(f"用户认证失败: {username}")
                return None
            
            # 简化密码验证(实际应用中应使用安全的密码哈希)
            # 这里假设密码验证通过
            
            # 获取用户权限范围
            scopes = []
            cloud_providers = set()
            
            for role_id in self.user_roles[user.id]:
                role = self.roles[role_id]
                for permission in role.permissions:
                    scopes.append(f"{permission['resource_type']}:{permission['permission_level']}")
                cloud_providers.update(role.cloud_providers)
            
            # 生成JWT令牌
            payload = {
                "user_id": user.id,
                "username": user.username,
                "scopes": scopes,
                "cloud_providers": list(cloud_providers),
                "iat": datetime.now(),
                "exp": datetime.now() + timedelta(hours=8)
            }
            
            token = jwt.encode(payload, self.secret_key, algorithm="HS256")
            
            access_token = AccessToken(
                token=token,
                user_id=user.id,
                expires_at=payload["exp"],
                scopes=scopes,
                cloud_providers=list(cloud_providers)
            )
            
            self.active_tokens[token] = access_token
            user.last_login = datetime.now()
            
            self._log_audit_event("USER_LOGIN", user.id, {"username": username})
            logger.info(f"用户认证成功: {username}")
            
            return access_token
            
        except Exception as e:
            logger.error(f"用户认证失败: {e}")
            return None
    
    def validate_token(self, token: str) -> Optional[AccessToken]:
        """验证访问令牌"""
        try:
            if token not in self.active_tokens:
                return None
            
            access_token = self.active_tokens[token]
            
            # 检查令牌是否过期
            if datetime.now() > access_token.expires_at:
                del self.active_tokens[token]
                return None
            
            # 验证JWT签名
            payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
            
            return access_token
            
        except jwt.InvalidTokenError:
            logger.warning("无效的访问令牌")
            return None
        except Exception as e:
            logger.error(f"验证令牌失败: {e}")
            return None
    
    def check_permission(self, token: str, resource_type: str, 
                        permission_level: str, cloud_provider: str,
                        resource_tags: Optional[Dict[str, str]] = None) -> bool:
        """检查权限"""
        try:
            access_token = self.validate_token(token)
            if not access_token:
                return False
            
            # 检查云服务提供商权限
            if cloud_provider not in access_token.cloud_providers:
                return False
            
            # 检查资源类型和权限级别
            required_scope = f"{resource_type}:{permission_level}"
            all_scope = f"*:{permission_level}"
            admin_scope = f"{resource_type}:admin"
            owner_scope = f"{resource_type}:owner"
            
            has_permission = (
                required_scope in access_token.scopes or
                all_scope in access_token.scopes or
                admin_scope in access_token.scopes or
                owner_scope in access_token.scopes
            )
            
            if not has_permission:
                return False
            
            # 检查资源过滤器
            user_roles = self.user_roles.get(access_token.user_id, [])
            for role_id in user_roles:
                role = self.roles[role_id]
                for permission in role.permissions:
                    if (permission['resource_type'] in [resource_type, '*'] and
                        cloud_provider in permission.get('cloud_providers', [])):
                        
                        resource_filter = permission.get('resource_filter')
                        if resource_filter and resource_tags:
                            # 检查资源标签是否匹配过滤器
                            for filter_key, filter_value in resource_filter.items():
                                if resource_tags.get(filter_key) != filter_value:
                                    continue
                        
                        return True
            
            return has_permission
            
        except Exception as e:
            logger.error(f"检查权限失败: {e}")
            return False
    
    def revoke_token(self, token: str) -> bool:
        """撤销访问令牌"""
        try:
            if token in self.active_tokens:
                access_token = self.active_tokens[token]
                del self.active_tokens[token]
                
                self._log_audit_event("TOKEN_REVOKED", access_token.user_id, {"token": token[:10] + "..."})
                logger.info("访问令牌已撤销")
                return True
            
            return False
            
        except Exception as e:
            logger.error(f"撤销令牌失败: {e}")
            return False
    
    def _log_audit_event(self, event_type: str, user_id: str, details: Dict[str, Any]):
        """记录审计事件"""
        audit_event = {
            "timestamp": datetime.now().isoformat(),
            "event_type": event_type,
            "user_id": user_id,
            "details": details
        }
        
        self.audit_logs.append(audit_event)
        logger.info(f"审计事件: {event_type} - {user_id}")
    
    def generate_access_report(self) -> Dict[str, Any]:
        """生成访问报告"""
        try:
            # 用户统计
            user_stats = {
                "total_users": len(self.users),
                "active_users": len([u for u in self.users.values() if u.active]),
                "users_by_department": {},
                "users_by_role": {}
            }
            
            for user in self.users.values():
                dept = user.department
                user_stats["users_by_department"][dept] = user_stats["users_by_department"].get(dept, 0) + 1
                
                for role_id in self.user_roles.get(user.id, []):
                    role_name = self.roles[role_id].name
                    user_stats["users_by_role"][role_name] = user_stats["users_by_role"].get(role_name, 0) + 1
            
            # 令牌统计
            token_stats = {
                "active_tokens": len(self.active_tokens),
                "expired_tokens": 0,
                "tokens_by_user": {}
            }
            
            for token, access_token in self.active_tokens.items():
                user_id = access_token.user_id
                token_stats["tokens_by_user"][user_id] = token_stats["tokens_by_user"].get(user_id, 0) + 1
            
            # 审计统计
            audit_stats = {
                "total_events": len(self.audit_logs),
                "events_by_type": {},
                "recent_events": self.audit_logs[-10:] if self.audit_logs else []
            }
            
            for event in self.audit_logs:
                event_type = event["event_type"]
                audit_stats["events_by_type"][event_type] = audit_stats["events_by_type"].get(event_type, 0) + 1
            
            return {
                "timestamp": datetime.now().isoformat(),
                "user_statistics": user_stats,
                "token_statistics": token_stats,
                "audit_statistics": audit_stats,
                "security_recommendations": self._generate_security_recommendations()
            }
            
        except Exception as e:
            logger.error(f"生成访问报告失败: {e}")
            return {}
    
    def _generate_security_recommendations(self) -> List[Dict[str, str]]:
        """生成安全建议"""
        recommendations = []
        
        # 检查长期未登录用户
        inactive_users = []
        for user in self.users.values():
            if user.last_login and (datetime.now() - user.last_login).days > 90:
                inactive_users.append(user.username)
        
        if inactive_users:
            recommendations.append({
                "priority": "Medium",
                "title": "清理非活跃用户",
                "description": f"有{len(inactive_users)}个用户超过90天未登录,建议审查并禁用"
            })
        
        # 检查过期令牌
        expired_tokens = [token for token, access_token in self.active_tokens.items() 
                         if datetime.now() > access_token.expires_at]
        
        if expired_tokens:
            recommendations.append({
                "priority": "High",
                "title": "清理过期令牌",
                "description": f"有{len(expired_tokens)}个过期令牌需要清理"
            })
        
        # 检查权限过度分配
        admin_users = []
        for user_id, role_ids in self.user_roles.items():
            for role_id in role_ids:
                role = self.roles[role_id]
                for permission in role.permissions:
                    if permission.get('permission_level') in ['admin', 'owner']:
                        admin_users.append(self.users[user_id].username)
                        break
        
        if len(admin_users) > len(self.users) * 0.2:  # 超过20%的用户有管理员权限
            recommendations.append({
                "priority": "High",
                "title": "审查管理员权限",
                "description": "管理员权限用户比例过高,建议审查权限分配"
            })
        
        return recommendations

# 使用示例
def main():
    """主函数 - 多云IAM管理示例"""
    iam_manager = MultiCloudIAMManager(secret_key="your-secret-key-here")
    
    print("=== 多云IAM管理示例 ===")
    
    # 创建用户
    users = [
        User(
            id="user1",
            username="john.doe",
            email="john.doe@company.com",
            full_name="John Doe",
            department="Engineering",
            role="Senior Developer"
        ),
        User(
            id="user2",
            username="jane.smith",
            email="jane.smith@company.com",
            full_name="Jane Smith",
            department="Operations",
            role="DevOps Engineer"
        ),
        User(
            id="user3",
            username="bob.wilson",
            email="bob.wilson@company.com",
            full_name="Bob Wilson",
            department="Security",
            role="Security Engineer"
        )
    ]
    
    for user in users:
        iam_manager.create_user(user)
    
    # 分配角色
    iam_manager.assign_role("user1", "developer")
    iam_manager.assign_role("user2", "ops")
    iam_manager.assign_role("user3", "security_admin")
    
    # 用户认证
    token = iam_manager.authenticate_user("john.doe", "password123")
    if token:
        print(f"\n认证成功,令牌: {token.token[:50]}...")
        
        # 权限检查
        has_permission = iam_manager.check_permission(
            token.token,
            "compute",
            "write",
            "AWS",
            {"Environment": "development"}
        )
        print(f"开发环境计算资源写权限: {has_permission}")
        
        has_permission = iam_manager.check_permission(
            token.token,
            "compute",
            "write",
            "AWS",
            {"Environment": "production"}
        )
        print(f"生产环境计算资源写权限: {has_permission}")
    
    # 生成访问报告
    report = iam_manager.generate_access_report()
    print("\n访问管理报告:")
    print(json.dumps(report, indent=2, ensure_ascii=False))

if __name__ == "__main__":
    main()

资源管理与编排

多云环境需要统一的资源管理和编排能力,以实现跨云的资源协调和自动化部署。

多云资源编排器

#!/bin/bash
# 多云资源编排部署脚本
# 提供跨云平台的统一资源部署和管理能力

set -e

# 配置变量
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
CONFIG_DIR="${SCRIPT_DIR}/config"
TERRAFORM_DIR="${SCRIPT_DIR}/terraform"
ANSIBLE_DIR="${SCRIPT_DIR}/ansible"

# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color

# 日志函数
log_info() {
    echo -e "${GREEN}[INFO]${NC} $1"
}

log_warn() {
    echo -e "${YELLOW}[WARN]${NC} $1"
}

log_error() {
    echo -e "${RED}[ERROR]${NC} $1"
}

log_debug() {
    echo -e "${BLUE}[DEBUG]${NC} $1"
}

# 检查依赖
check_dependencies() {
    log_info "检查依赖工具..."
    
    local deps=("terraform" "ansible" "kubectl" "aws" "az" "gcloud")
    local missing_deps=()
    
    for dep in "${deps[@]}"; do
        if ! command -v "$dep" &> /dev/null; then
            missing_deps+=("$dep")
        fi
    done
    
    if [ ${#missing_deps[@]} -ne 0 ]; then
        log_error "缺少以下依赖工具: ${missing_deps[*]}"
        log_info "请安装缺少的工具后重试"
        exit 1
    fi
    
    log_info "所有依赖工具已安装"
}

# 初始化Terraform配置
init_terraform() {
    log_info "初始化Terraform配置..."
    
    mkdir -p "$TERRAFORM_DIR"
    
    # 创建主配置文件
    cat > "$TERRAFORM_DIR/main.tf" << 'EOF'
# 多云资源编排主配置
terraform {
  required_version = ">= 1.0"
  
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
    azurerm = {
      source  = "hashicorp/azurerm"
      version = "~> 3.0"
    }
    google = {
      source  = "hashicorp/google"
      version = "~> 4.0"
    }
  }
  
  backend "s3" {
    # 配置远程状态存储
    bucket = "terraform-state-bucket"
    key    = "multi-cloud/terraform.tfstate"
    region = "us-east-1"
  }
}

# AWS Provider
provider "aws" {
  region = var.aws_region
  
  default_tags {
    tags = var.common_tags
  }
}

# Azure Provider
provider "azurerm" {
  features {}
  
  subscription_id = var.azure_subscription_id
}

# Google Cloud Provider
provider "google" {
  project = var.gcp_project_id
  region  = var.gcp_region
}

# 变量定义
variable "aws_region" {
  description = "AWS region"
  type        = string
  default     = "us-east-1"
}

variable "azure_subscription_id" {
  description = "Azure subscription ID"
  type        = string
}

variable "gcp_project_id" {
  description = "GCP project ID"
  type        = string
}

variable "gcp_region" {
  description = "GCP region"
  type        = string
  default     = "us-central1"
}

variable "common_tags" {
  description = "Common tags for all resources"
  type        = map(string)
  default = {
    Environment = "production"
    Project     = "multi-cloud"
    ManagedBy   = "terraform"
  }
}

variable "environment" {
  description = "Environment name"
  type        = string
  default     = "production"
}
EOF

    # 创建AWS资源配置
    cat > "$TERRAFORM_DIR/aws.tf" << 'EOF'
# AWS 资源配置

# VPC
resource "aws_vpc" "main" {
  cidr_block           = "10.0.0.0/16"
  enable_dns_hostnames = true
  enable_dns_support   = true
  
  tags = merge(var.common_tags, {
    Name = "${var.environment}-vpc"
  })
}

# 子网
resource "aws_subnet" "public" {
  count = 2
  
  vpc_id                  = aws_vpc.main.id
  cidr_block              = "10.0.${count.index + 1}.0/24"
  availability_zone       = data.aws_availability_zones.available.names[count.index]
  map_public_ip_on_launch = true
  
  tags = merge(var.common_tags, {
    Name = "${var.environment}-public-subnet-${count.index + 1}"
    Type = "public"
  })
}

resource "aws_subnet" "private" {
  count = 2
  
  vpc_id            = aws_vpc.main.id
  cidr_block        = "10.0.${count.index + 10}.0/24"
  availability_zone = data.aws_availability_zones.available.names[count.index]
  
  tags = merge(var.common_tags, {
    Name = "${var.environment}-private-subnet-${count.index + 1}"
    Type = "private"
  })
}

# 互联网网关
resource "aws_internet_gateway" "main" {
  vpc_id = aws_vpc.main.id
  
  tags = merge(var.common_tags, {
    Name = "${var.environment}-igw"
  })
}

# NAT网关
resource "aws_eip" "nat" {
  count = 2
  
  domain = "vpc"
  
  tags = merge(var.common_tags, {
    Name = "${var.environment}-nat-eip-${count.index + 1}"
  })
}

resource "aws_nat_gateway" "main" {
  count = 2
  
  allocation_id = aws_eip.nat[count.index].id
  subnet_id     = aws_subnet.public[count.index].id
  
  tags = merge(var.common_tags, {
    Name = "${var.environment}-nat-${count.index + 1}"
  })
  
  depends_on = [aws_internet_gateway.main]
}

# 路由表
resource "aws_route_table" "public" {
  vpc_id = aws_vpc.main.id
  
  route {
    cidr_block = "0.0.0.0/0"
    gateway_id = aws_internet_gateway.main.id
  }
  
  tags = merge(var.common_tags, {
    Name = "${var.environment}-public-rt"
  })
}

resource "aws_route_table" "private" {
  count = 2
  
  vpc_id = aws_vpc.main.id
  
  route {
    cidr_block     = "0.0.0.0/0"
    nat_gateway_id = aws_nat_gateway.main[count.index].id
  }
  
  tags = merge(var.common_tags, {
    Name = "${var.environment}-private-rt-${count.index + 1}"
  })
}

# 路由表关联
resource "aws_route_table_association" "public" {
  count = 2
  
  subnet_id      = aws_subnet.public[count.index].id
  route_table_id = aws_route_table.public.id
}

resource "aws_route_table_association" "private" {
  count = 2
  
  subnet_id      = aws_subnet.private[count.index].id
  route_table_id = aws_route_table.private[count.index].id
}

# 安全组
resource "aws_security_group" "web" {
  name_prefix = "${var.environment}-web-"
  vpc_id      = aws_vpc.main.id
  
  ingress {
    from_port   = 80
    to_port     = 80
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
  }
  
  ingress {
    from_port   = 443
    to_port     = 443
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
  }
  
  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
  
  tags = merge(var.common_tags, {
    Name = "${var.environment}-web-sg"
  })
}

# EKS集群
resource "aws_eks_cluster" "main" {
  name     = "${var.environment}-eks-cluster"
  role_arn = aws_iam_role.eks_cluster.arn
  version  = "1.28"
  
  vpc_config {
    subnet_ids              = concat(aws_subnet.public[*].id, aws_subnet.private[*].id)
    endpoint_private_access = true
    endpoint_public_access  = true
  }
  
  enabled_cluster_log_types = ["api", "audit", "authenticator", "controllerManager", "scheduler"]
  
  tags = var.common_tags
  
  depends_on = [
    aws_iam_role_policy_attachment.eks_cluster_policy,
    aws_iam_role_policy_attachment.eks_vpc_resource_controller,
  ]
}

# EKS节点组
resource "aws_eks_node_group" "main" {
  cluster_name    = aws_eks_cluster.main.name
  node_group_name = "${var.environment}-eks-nodes"
  node_role_arn   = aws_iam_role.eks_node_group.arn
  subnet_ids      = aws_subnet.private[*].id
  
  instance_types = ["t3.medium"]
  
  scaling_config {
    desired_size = 2
    max_size     = 4
    min_size     = 1
  }
  
  update_config {
    max_unavailable = 1
  }
  
  tags = var.common_tags
  
  depends_on = [
    aws_iam_role_policy_attachment.eks_worker_node_policy,
    aws_iam_role_policy_attachment.eks_cni_policy,
    aws_iam_role_policy_attachment.eks_container_registry_policy,
  ]
}

# 数据源
data "aws_availability_zones" "available" {
  state = "available"
}

# IAM角色
resource "aws_iam_role" "eks_cluster" {
  name = "${var.environment}-eks-cluster-role"
  
  assume_role_policy = jsonencode({
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = {
        Service = "eks.amazonaws.com"
      }
    }]
    Version = "2012-10-17"
  })
}

resource "aws_iam_role_policy_attachment" "eks_cluster_policy" {
  policy_arn = "arn:aws:iam::aws:policy/AmazonEKSClusterPolicy"
  role       = aws_iam_role.eks_cluster.name
}

resource "aws_iam_role_policy_attachment" "eks_vpc_resource_controller" {
  policy_arn = "arn:aws:iam::aws:policy/AmazonEKSVPCResourceController"
  role       = aws_iam_role.eks_cluster.name
}

resource "aws_iam_role" "eks_node_group" {
  name = "${var.environment}-eks-node-group-role"
  
  assume_role_policy = jsonencode({
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = {
        Service = "ec2.amazonaws.com"
      }
    }]
    Version = "2012-10-17"
  })
}

resource "aws_iam_role_policy_attachment" "eks_worker_node_policy" {
  policy_arn = "arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy"
  role       = aws_iam_role.eks_node_group.name
}

resource "aws_iam_role_policy_attachment" "eks_cni_policy" {
  policy_arn = "arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy"
  role       = aws_iam_role.eks_node_group.name
}

resource "aws_iam_role_policy_attachment" "eks_container_registry_policy" {
  policy_arn = "arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly"
  role       = aws_iam_role.eks_node_group.name
}

# 输出
output "aws_vpc_id" {
  value = aws_vpc.main.id
}

output "aws_eks_cluster_name" {
  value = aws_eks_cluster.main.name
}

output "aws_eks_cluster_endpoint" {
  value = aws_eks_cluster.main.endpoint
}
EOF

    # 创建Azure资源配置
    cat > "$TERRAFORM_DIR/azure.tf" << 'EOF'
# Azure 资源配置

# 资源组
resource "azurerm_resource_group" "main" {
  name     = "${var.environment}-rg"
  location = "East US"
  
  tags = var.common_tags
}

# 虚拟网络
resource "azurerm_virtual_network" "main" {
  name                = "${var.environment}-vnet"
  address_space       = ["10.1.0.0/16"]
  location            = azurerm_resource_group.main.location
  resource_group_name = azurerm_resource_group.main.name
  
  tags = var.common_tags
}

# 子网
resource "azurerm_subnet" "public" {
  name                 = "${var.environment}-public-subnet"
  resource_group_name  = azurerm_resource_group.main.name
  virtual_network_name = azurerm_virtual_network.main.name
  address_prefixes     = ["10.1.1.0/24"]
}

resource "azurerm_subnet" "private" {
  name                 = "${var.environment}-private-subnet"
  resource_group_name  = azurerm_resource_group.main.name
  virtual_network_name = azurerm_virtual_network.main.name
  address_prefixes     = ["10.1.2.0/24"]
}

# 网络安全组
resource "azurerm_network_security_group" "web" {
  name                = "${var.environment}-web-nsg"
  location            = azurerm_resource_group.main.location
  resource_group_name = azurerm_resource_group.main.name
  
  security_rule {
    name                       = "HTTP"
    priority                   = 1001
    direction                  = "Inbound"
    access                     = "Allow"
    protocol                   = "Tcp"
    source_port_range          = "*"
    destination_port_range     = "80"
    source_address_prefix      = "*"
    destination_address_prefix = "*"
  }
  
  security_rule {
    name                       = "HTTPS"
    priority                   = 1002
    direction                  = "Inbound"
    access                     = "Allow"
    protocol                   = "Tcp"
    source_port_range          = "*"
    destination_port_range     = "443"
    source_address_prefix      = "*"
    destination_address_prefix = "*"
  }
  
  tags = var.common_tags
}

# AKS集群
resource "azurerm_kubernetes_cluster" "main" {
  name                = "${var.environment}-aks-cluster"
  location            = azurerm_resource_group.main.location
  resource_group_name = azurerm_resource_group.main.name
  dns_prefix          = "${var.environment}-aks"
  
  default_node_pool {
    name       = "default"
    node_count = 2
    vm_size    = "Standard_D2_v2"
    
    vnet_subnet_id = azurerm_subnet.private.id
  }
  
  identity {
    type = "SystemAssigned"
  }
  
  network_profile {
    network_plugin = "azure"
    network_policy = "azure"
  }
  
  tags = var.common_tags
}

# 输出
output "azure_resource_group_name" {
  value = azurerm_resource_group.main.name
}

output "azure_aks_cluster_name" {
  value = azurerm_kubernetes_cluster.main.name
}

output "azure_aks_cluster_fqdn" {
  value = azurerm_kubernetes_cluster.main.fqdn
}
EOF

    # 创建GCP资源配置
    cat > "$TERRAFORM_DIR/gcp.tf" << 'EOF'
# GCP 资源配置

# VPC网络
resource "google_compute_network" "main" {
  name                    = "${var.environment}-vpc"
  auto_create_subnetworks = false
}

# 子网
resource "google_compute_subnetwork" "public" {
  name          = "${var.environment}-public-subnet"
  ip_cidr_range = "10.2.1.0/24"
  region        = var.gcp_region
  network       = google_compute_network.main.id
  
  secondary_ip_range {
    range_name    = "pods"
    ip_cidr_range = "10.2.16.0/20"
  }
  
  secondary_ip_range {
    range_name    = "services"
    ip_cidr_range = "10.2.32.0/20"
  }
}

resource "google_compute_subnetwork" "private" {
  name          = "${var.environment}-private-subnet"
  ip_cidr_range = "10.2.2.0/24"
  region        = var.gcp_region
  network       = google_compute_network.main.id
}

# 防火墙规则
resource "google_compute_firewall" "web" {
  name    = "${var.environment}-web-firewall"
  network = google_compute_network.main.name
  
  allow {
    protocol = "tcp"
    ports    = ["80", "443"]
  }
  
  source_ranges = ["0.0.0.0/0"]
  target_tags   = ["web"]
}

# GKE集群
resource "google_container_cluster" "main" {
  name     = "${var.environment}-gke-cluster"
  location = var.gcp_region
  
  # 删除默认节点池
  remove_default_node_pool = true
  initial_node_count       = 1
  
  network    = google_compute_network.main.name
  subnetwork = google_compute_subnetwork.public.name
  
  ip_allocation_policy {
    cluster_secondary_range_name  = "pods"
    services_secondary_range_name = "services"
  }
  
  workload_identity_config {
    workload_pool = "${var.gcp_project_id}.svc.id.goog"
  }
}

# GKE节点池
resource "google_container_node_pool" "main" {
  name       = "${var.environment}-node-pool"
  location   = var.gcp_region
  cluster    = google_container_cluster.main.name
  node_count = 2
  
  node_config {
    preemptible  = false
    machine_type = "e2-medium"
    
    service_account = google_service_account.gke.email
    oauth_scopes = [
      "https://www.googleapis.com/auth/cloud-platform"
    ]
    
    workload_metadata_config {
      mode = "GKE_METADATA"
    }
  }
}

# 服务账号
resource "google_service_account" "gke" {
  account_id   = "${var.environment}-gke-sa"
  display_name = "GKE Service Account"
}

# 输出
output "gcp_network_name" {
  value = google_compute_network.main.name
}

output "gcp_gke_cluster_name" {
  value = google_container_cluster.main.name
}

output "gcp_gke_cluster_endpoint" {
  value = google_container_cluster.main.endpoint
}
EOF

    log_info "Terraform配置初始化完成"
}

# 初始化Ansible配置
init_ansible() {
    log_info "初始化Ansible配置..."
    
    mkdir -p "$ANSIBLE_DIR"/{playbooks,roles,inventory}
    
    # 创建Ansible配置文件
    cat > "$ANSIBLE_DIR/ansible.cfg" << 'EOF'
[defaults]
host_key_checking = False
inventory = inventory/
roles_path = roles/
remote_user = ubuntu
private_key_file = ~/.ssh/id_rsa

[inventory]
enable_plugins = aws_ec2, azure_rm, gcp_compute

[ssh_connection]
ssh_args = -o ControlMaster=auto -o ControlPersist=60s
pipelining = True
EOF

    # 创建动态清单配置
    cat > "$ANSIBLE_DIR/inventory/aws_ec2.yml" << 'EOF'
plugin: aws_ec2
regions:
  - us-east-1
  - us-west-2
keyed_groups:
  - key: tags
    prefix: tag
  - key: instance_type
    prefix: instance_type
  - key: placement.region
    prefix: aws_region
compose:
  ansible_host: public_ip_address
EOF

    cat > "$ANSIBLE_DIR/inventory/azure_rm.yml" << 'EOF'
plugin: azure_rm
include_vm_resource_groups:
  - "*"
auth_source: auto
keyed_groups:
  - key: tags
    prefix: tag
  - key: location
    prefix: azure_location
compose:
  ansible_host: public_ipv4_addresses[0] | default(private_ipv4_addresses[0])
EOF

    cat > "$ANSIBLE_DIR/inventory/gcp_compute.yml" << 'EOF'
plugin: gcp_compute
projects:
  - your-gcp-project-id
auth_kind: serviceaccount
service_account_file: ~/.gcp/credentials.json
keyed_groups:
  - key: labels
    prefix: label
  - key: zone
    prefix: gcp_zone
compose:
  ansible_host: networkInterfaces[0].accessConfigs[0].natIP | default(networkInterfaces[0].networkIP)
EOF

    # 创建多云部署playbook
    cat > "$ANSIBLE_DIR/playbooks/multi-cloud-deploy.yml" << 'EOF'
---
- name: 多云应用部署
  hosts: all
  become: yes
  vars:
    app_name: "multi-cloud-app"
    app_version: "1.0.0"
    docker_image: "nginx:latest"
    
  tasks:
    - name: 更新包管理器
      package:
        update_cache: yes
      when: ansible_os_family == "Debian"
    
    - name: 安装Docker
      package:
        name: docker.io
        state: present
    
    - name: 启动Docker服务
      service:
        name: docker
        state: started
        enabled: yes
    
    - name: 拉取应用镜像
      docker_image:
        name: "{{ docker_image }}"
        source: pull
    
    - name: 运行应用容器
      docker_container:
        name: "{{ app_name }}"
        image: "{{ docker_image }}"
        state: started
        restart_policy: always
        ports:
          - "80:80"
        env:
          APP_ENV: "{{ ansible_hostname }}"
          CLOUD_PROVIDER: "{{ group_names[0] | regex_replace('^tag_', '') }}"
    
    - name: 配置防火墙
      ufw:
        rule: allow
        port: "80"
        proto: tcp
      when: ansible_os_family == "Debian"

- name: 配置监控
  hosts: all
  become: yes
  tasks:
    - name: 安装Node Exporter
      get_url:
        url: https://github.com/prometheus/node_exporter/releases/download/v1.6.1/node_exporter-1.6.1.linux-amd64.tar.gz
        dest: /tmp/node_exporter.tar.gz
    
    - name: 解压Node Exporter
      unarchive:
        src: /tmp/node_exporter.tar.gz
        dest: /tmp/
        remote_src: yes
    
    - name: 复制Node Exporter二进制文件
      copy:
        src: /tmp/node_exporter-1.6.1.linux-amd64/node_exporter
        dest: /usr/local/bin/node_exporter
        mode: '0755'
        remote_src: yes
    
    - name: 创建Node Exporter服务
      copy:
        content: |
          [Unit]
          Description=Node Exporter
          After=network.target
          
          [Service]
          Type=simple
          ExecStart=/usr/local/bin/node_exporter
          Restart=always
          
          [Install]
          WantedBy=multi-user.target
        dest: /etc/systemd/system/node_exporter.service
    
    - name: 启动Node Exporter服务
      systemd:
        name: node_exporter
        state: started
        enabled: yes
        daemon_reload: yes
EOF

    log_info "Ansible配置初始化完成"
}

# 部署多云基础设施
deploy_infrastructure() {
    local environment="${1:-production}"
    
    log_info "开始部署多云基础设施 (环境: $environment)..."
    
    cd "$TERRAFORM_DIR"
    
    # 初始化Terraform
    log_info "初始化Terraform..."
    terraform init
    
    # 创建terraform.tfvars文件
    cat > terraform.tfvars << EOF
environment = "$environment"
aws_region = "us-east-1"
azure_subscription_id = "your-azure-subscription-id"
gcp_project_id = "your-gcp-project-id"
gcp_region = "us-central1"

common_tags = {
  Environment = "$environment"
  Project     = "multi-cloud"
  ManagedBy   = "terraform"
  Owner       = "devops-team"
}
EOF
    
    # 验证配置
    log_info "验证Terraform配置..."
    terraform validate
    
    # 规划部署
    log_info "生成部署计划..."
    terraform plan -out=tfplan
    
    # 执行部署
    log_info "执行基础设施部署..."
    terraform apply tfplan
    
    # 获取输出
    log_info "获取部署输出..."
    terraform output -json > ../outputs.json
    
    log_info "多云基础设施部署完成"
}

# 部署应用
deploy_applications() {
    log_info "开始部署应用..."
    
    cd "$ANSIBLE_DIR"
    
    # 更新动态清单
    log_info "更新动态清单..."
    ansible-inventory --list > inventory/dynamic_inventory.json
    
    # 执行应用部署
    log_info "执行应用部署..."
    ansible-playbook -i inventory/ playbooks/multi-cloud-deploy.yml
    
    log_info "应用部署完成"
}

# 配置Kubernetes集群
configure_kubernetes() {
    log_info "配置Kubernetes集群..."
    
    # 配置AWS EKS
    if [ -f "$TERRAFORM_DIR/../outputs.json" ]; then
        local aws_cluster_name=$(jq -r '.aws_eks_cluster_name.value' "$TERRAFORM_DIR/../outputs.json")
        if [ "$aws_cluster_name" != "null" ]; then
            log_info "配置AWS EKS集群: $aws_cluster_name"
            aws eks update-kubeconfig --region us-east-1 --name "$aws_cluster_name" --alias aws-cluster
        fi
        
        # 配置Azure AKS
        local azure_cluster_name=$(jq -r '.azure_aks_cluster_name.value' "$TERRAFORM_DIR/../outputs.json")
        local azure_rg_name=$(jq -r '.azure_resource_group_name.value' "$TERRAFORM_DIR/../outputs.json")
        if [ "$azure_cluster_name" != "null" ] && [ "$azure_rg_name" != "null" ]; then
            log_info "配置Azure AKS集群: $azure_cluster_name"
            az aks get-credentials --resource-group "$azure_rg_name" --name "$azure_cluster_name" --admin --context azure-cluster
        fi
        
        # 配置GCP GKE
        local gcp_cluster_name=$(jq -r '.gcp_gke_cluster_name.value' "$TERRAFORM_DIR/../outputs.json")
        if [ "$gcp_cluster_name" != "null" ]; then
            log_info "配置GCP GKE集群: $gcp_cluster_name"
            gcloud container clusters get-credentials "$gcp_cluster_name" --region us-central1 --project your-gcp-project-id
            kubectl config rename-context "gke_your-gcp-project-id_us-central1_$gcp_cluster_name" gcp-cluster
        fi
    fi
    
    # 显示集群信息
    log_info "Kubernetes集群配置完成"
    kubectl config get-contexts
}

# 销毁基础设施
destroy_infrastructure() {
    log_warn "准备销毁多云基础设施..."
    
    read -p "确认要销毁所有资源吗?(yes/no): " confirm
    if [ "$confirm" != "yes" ]; then
        log_info "取消销毁操作"
        return 0
    fi
    
    cd "$TERRAFORM_DIR"
    
    log_info "开始销毁基础设施..."
    terraform destroy -auto-approve
    
    log_info "基础设施销毁完成"
}

# 显示帮助信息
show_help() {
    cat << EOF
多云资源编排部署脚本

用法: $0 [命令] [选项]

命令:
  init                    初始化配置文件
  deploy [环境名]         部署基础设施 (默认: production)
  deploy-apps            部署应用
  configure-k8s          配置Kubernetes集群
  destroy                销毁基础设施
  help                   显示帮助信息

示例:
  $0 init                 # 初始化配置
  $0 deploy staging       # 部署staging环境
  $0 deploy-apps          # 部署应用
  $0 configure-k8s        # 配置K8s集群
  $0 destroy              # 销毁基础设施

EOF
}

# 主函数
main() {
    case "${1:-help}" in
        "init")
            check_dependencies
            init_terraform
            init_ansible
            ;;
        "deploy")
            check_dependencies
            deploy_infrastructure "${2:-production}"
            ;;
        "deploy-apps")
            check_dependencies
            deploy_applications
            ;;
        "configure-k8s")
            check_dependencies
            configure_kubernetes
            ;;
        "destroy")
            check_dependencies
            destroy_infrastructure
            ;;
        "help"|*)
            show_help
            ;;
    esac
}

# 执行主函数
main "$@"

多云资源管理器

#!/usr/bin/env python3
"""
多云资源管理器
提供统一的多云资源管理和编排能力
"""

import json
import logging
import asyncio
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
from enum import Enum
import boto3
from azure.identity import DefaultAzureCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.compute import ComputeManagementClient
from google.cloud import resource_manager
from google.cloud import compute_v1
import kubernetes
from kubernetes import client, config

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class CloudProvider(Enum):
    """云服务提供商枚举"""
    AWS = "aws"
    AZURE = "azure"
    GCP = "gcp"

class ResourceType(Enum):
    """资源类型枚举"""
    COMPUTE = "compute"
    STORAGE = "storage"
    NETWORK = "network"
    DATABASE = "database"
    KUBERNETES = "kubernetes"

@dataclass
class CloudResource:
    """云资源数据类"""
    id: str
    name: str
    type: ResourceType
    provider: CloudProvider
    region: str
    status: str
    tags: Dict[str, str]
    metadata: Dict[str, Any]
    created_at: datetime
    cost_estimate: float = 0.0

@dataclass
class DeploymentTemplate:
    """部署模板数据类"""
    name: str
    description: str
    provider: CloudProvider
    resources: List[Dict[str, Any]]
    parameters: Dict[str, Any]
    dependencies: List[str]

class MultiCloudResourceManager:
    """多云资源管理器"""
    
    def __init__(self):
        self.aws_session = None
        self.azure_credential = None
        self.gcp_client = None
        self.k8s_clients = {}
        self.resources_cache = {}
        self.templates = {}
        
        self._initialize_clients()
    
    def _initialize_clients(self):
        """初始化云服务客户端"""
        try:
            # 初始化AWS客户端
            self.aws_session = boto3.Session()
            logger.info("AWS客户端初始化成功")
            
            # 初始化Azure客户端
            self.azure_credential = DefaultAzureCredential()
            logger.info("Azure客户端初始化成功")
            
            # 初始化GCP客户端
            self.gcp_client = resource_manager.Client()
            logger.info("GCP客户端初始化成功")
            
            # 初始化Kubernetes客户端
            try:
                config.load_kube_config()
                self.k8s_clients['default'] = client.ApiClient()
                logger.info("Kubernetes客户端初始化成功")
            except Exception as e:
                logger.warning(f"Kubernetes客户端初始化失败: {e}")
                
        except Exception as e:
            logger.error(f"客户端初始化失败: {e}")
    
    async def discover_resources(self, providers: List[CloudProvider] = None) -> Dict[CloudProvider, List[CloudResource]]:
        """发现多云资源"""
        if providers is None:
            providers = [CloudProvider.AWS, CloudProvider.AZURE, CloudProvider.GCP]
        
        discovered_resources = {}
        
        for provider in providers:
            try:
                if provider == CloudProvider.AWS:
                    resources = await self._discover_aws_resources()
                elif provider == CloudProvider.AZURE:
                    resources = await self._discover_azure_resources()
                elif provider == CloudProvider.GCP:
                    resources = await self._discover_gcp_resources()
                else:
                    resources = []
                
                discovered_resources[provider] = resources
                self.resources_cache[provider] = resources
                
                logger.info(f"发现 {provider.value} 资源: {len(resources)} 个")
                
            except Exception as e:
                logger.error(f"发现 {provider.value} 资源失败: {e}")
                discovered_resources[provider] = []
        
        return discovered_resources
    
    async def _discover_aws_resources(self) -> List[CloudResource]:
        """发现AWS资源"""
        resources = []
        
        try:
            # 发现EC2实例
            ec2 = self.aws_session.client('ec2')
            instances = ec2.describe_instances()
            
            for reservation in instances['Reservations']:
                for instance in reservation['Instances']:
                    resource = CloudResource(
                        id=instance['InstanceId'],
                        name=self._get_tag_value(instance.get('Tags', []), 'Name', instance['InstanceId']),
                        type=ResourceType.COMPUTE,
                        provider=CloudProvider.AWS,
                        region=instance['Placement']['AvailabilityZone'][:-1],
                        status=instance['State']['Name'],
                        tags=self._parse_aws_tags(instance.get('Tags', [])),
                        metadata={
                            'instance_type': instance['InstanceType'],
                            'vpc_id': instance.get('VpcId'),
                            'subnet_id': instance.get('SubnetId'),
                            'security_groups': [sg['GroupId'] for sg in instance.get('SecurityGroups', [])]
                        },
                        created_at=instance['LaunchTime']
                    )
                    resources.append(resource)
            
            # 发现RDS实例
            rds = self.aws_session.client('rds')
            db_instances = rds.describe_db_instances()
            
            for db_instance in db_instances['DBInstances']:
                resource = CloudResource(
                    id=db_instance['DBInstanceIdentifier'],
                    name=db_instance['DBInstanceIdentifier'],
                    type=ResourceType.DATABASE,
                    provider=CloudProvider.AWS,
                    region=db_instance['AvailabilityZone'][:-1] if db_instance.get('AvailabilityZone') else 'unknown',
                    status=db_instance['DBInstanceStatus'],
                    tags=self._parse_aws_tags(db_instance.get('TagList', [])),
                    metadata={
                        'engine': db_instance['Engine'],
                        'engine_version': db_instance['EngineVersion'],
                        'instance_class': db_instance['DBInstanceClass'],
                        'allocated_storage': db_instance['AllocatedStorage']
                    },
                    created_at=db_instance['InstanceCreateTime']
                )
                resources.append(resource)
            
            # 发现EKS集群
            eks = self.aws_session.client('eks')
            clusters = eks.list_clusters()
            
            for cluster_name in clusters['clusters']:
                cluster_info = eks.describe_cluster(name=cluster_name)['cluster']
                resource = CloudResource(
                    id=cluster_info['arn'],
                    name=cluster_name,
                    type=ResourceType.KUBERNETES,
                    provider=CloudProvider.AWS,
                    region=cluster_info['arn'].split(':')[3],
                    status=cluster_info['status'],
                    tags=cluster_info.get('tags', {}),
                    metadata={
                        'version': cluster_info['version'],
                        'platform_version': cluster_info['platformVersion'],
                        'endpoint': cluster_info['endpoint']
                    },
                    created_at=cluster_info['createdAt']
                )
                resources.append(resource)
                
        except Exception as e:
            logger.error(f"发现AWS资源失败: {e}")
        
        return resources
    
    async def _discover_azure_resources(self) -> List[CloudResource]:
        """发现Azure资源"""
        resources = []
        
        try:
            subscription_id = "your-subscription-id"  # 从配置获取
            
            # 发现虚拟机
            compute_client = ComputeManagementClient(self.azure_credential, subscription_id)
            vms = compute_client.virtual_machines.list_all()
            
            for vm in vms:
                resource = CloudResource(
                    id=vm.id,
                    name=vm.name,
                    type=ResourceType.COMPUTE,
                    provider=CloudProvider.AZURE,
                    region=vm.location,
                    status=vm.provisioning_state,
                    tags=vm.tags or {},
                    metadata={
                        'vm_size': vm.hardware_profile.vm_size,
                        'os_type': vm.storage_profile.os_disk.os_type.value if vm.storage_profile.os_disk.os_type else 'unknown',
                        'resource_group': vm.id.split('/')[4]
                    },
                    created_at=datetime.now()  # Azure API不直接提供创建时间
                )
                resources.append(resource)
            
            # 发现AKS集群
            # 这里需要使用Container Service客户端
            # 为简化示例,暂时跳过
                
        except Exception as e:
            logger.error(f"发现Azure资源失败: {e}")
        
        return resources
    
    async def _discover_gcp_resources(self) -> List[CloudResource]:
        """发现GCP资源"""
        resources = []
        
        try:
            project_id = "your-project-id"  # 从配置获取
            
            # 发现Compute Engine实例
            instances_client = compute_v1.InstancesClient()
            
            # 获取所有区域
            zones_client = compute_v1.ZonesClient()
            zones = zones_client.list(project=project_id)
            
            for zone in zones:
                instances = instances_client.list(project=project_id, zone=zone.name)
                
                for instance in instances:
                    resource = CloudResource(
                        id=str(instance.id),
                        name=instance.name,
                        type=ResourceType.COMPUTE,
                        provider=CloudProvider.GCP,
                        region=zone.region.split('/')[-1],
                        status=instance.status,
                        tags=dict(instance.labels) if instance.labels else {},
                        metadata={
                            'machine_type': instance.machine_type.split('/')[-1],
                            'zone': zone.name,
                            'network_interfaces': len(instance.network_interfaces)
                        },
                        created_at=datetime.fromisoformat(instance.creation_timestamp.rstrip('Z'))
                    )
                    resources.append(resource)
            
            # 发现GKE集群
            # 这里需要使用Container客户端
            # 为简化示例,暂时跳过
                
        except Exception as e:
            logger.error(f"发现GCP资源失败: {e}")
        
        return resources
    
    def create_deployment_template(self, template: DeploymentTemplate) -> bool:
        """创建部署模板"""
        try:
            self.templates[template.name] = template
            logger.info(f"创建部署模板: {template.name}")
            return True
        except Exception as e:
            logger.error(f"创建部署模板失败: {e}")
            return False
    
    async def deploy_from_template(self, template_name: str, parameters: Dict[str, Any] = None) -> Dict[str, Any]:
        """从模板部署资源"""
        if template_name not in self.templates:
            raise ValueError(f"模板 {template_name} 不存在")
        
        template = self.templates[template_name]
        deployment_params = {**template.parameters}
        if parameters:
            deployment_params.update(parameters)
        
        deployment_result = {
            'template_name': template_name,
            'provider': template.provider.value,
            'status': 'deploying',
            'resources': [],
            'errors': []
        }
        
        try:
            if template.provider == CloudProvider.AWS:
                result = await self._deploy_aws_template(template, deployment_params)
            elif template.provider == CloudProvider.AZURE:
                result = await self._deploy_azure_template(template, deployment_params)
            elif template.provider == CloudProvider.GCP:
                result = await self._deploy_gcp_template(template, deployment_params)
            else:
                raise ValueError(f"不支持的云提供商: {template.provider}")
            
            deployment_result.update(result)
            deployment_result['status'] = 'completed'
            
        except Exception as e:
            logger.error(f"模板部署失败: {e}")
            deployment_result['status'] = 'failed'
            deployment_result['errors'].append(str(e))
        
        return deployment_result
    
    async def _deploy_aws_template(self, template: DeploymentTemplate, parameters: Dict[str, Any]) -> Dict[str, Any]:
        """部署AWS模板"""
        # 这里实现AWS CloudFormation或Terraform部署逻辑
        # 为简化示例,返回模拟结果
        return {
            'resources': [f"aws-resource-{i}" for i in range(len(template.resources))],
            'deployment_id': f"aws-deployment-{datetime.now().strftime('%Y%m%d%H%M%S')}"
        }
    
    async def _deploy_azure_template(self, template: DeploymentTemplate, parameters: Dict[str, Any]) -> Dict[str, Any]:
        """部署Azure模板"""
        # 这里实现Azure ARM模板部署逻辑
        # 为简化示例,返回模拟结果
        return {
            'resources': [f"azure-resource-{i}" for i in range(len(template.resources))],
            'deployment_id': f"azure-deployment-{datetime.now().strftime('%Y%m%d%H%M%S')}"
        }
    
    async def _deploy_gcp_template(self, template: DeploymentTemplate, parameters: Dict[str, Any]) -> Dict[str, Any]:
        """部署GCP模板"""
        # 这里实现GCP Deployment Manager部署逻辑
        # 为简化示例,返回模拟结果
        return {
            'resources': [f"gcp-resource-{i}" for i in range(len(template.resources))],
            'deployment_id': f"gcp-deployment-{datetime.now().strftime('%Y%m%d%H%M%S')}"
        }
    
    def sync_kubernetes_resources(self, cluster_contexts: List[str] = None) -> Dict[str, List[Dict[str, Any]]]:
        """同步Kubernetes资源"""
        if cluster_contexts is None:
            cluster_contexts = ['aws-cluster', 'azure-cluster', 'gcp-cluster']
        
        k8s_resources = {}
        
        for context in cluster_contexts:
            try:
                # 切换到指定的集群上下文
                config.load_kube_config(context=context)
                v1 = client.CoreV1Api()
                apps_v1 = client.AppsV1Api()
                
                resources = {
                    'namespaces': [],
                    'deployments': [],
                    'services': [],
                    'pods': []
                }
                
                # 获取命名空间
                namespaces = v1.list_namespace()
                for ns in namespaces.items:
                    resources['namespaces'].append({
                        'name': ns.metadata.name,
                        'status': ns.status.phase,
                        'created_at': ns.metadata.creation_timestamp.isoformat()
                    })
                
                # 获取部署
                deployments = apps_v1.list_deployment_for_all_namespaces()
                for deploy in deployments.items:
                    resources['deployments'].append({
                        'name': deploy.metadata.name,
                        'namespace': deploy.metadata.namespace,
                        'replicas': deploy.spec.replicas,
                        'ready_replicas': deploy.status.ready_replicas or 0,
                        'created_at': deploy.metadata.creation_timestamp.isoformat()
                    })
                
                # 获取服务
                services = v1.list_service_for_all_namespaces()
                for svc in services.items:
                    resources['services'].append({
                        'name': svc.metadata.name,
                        'namespace': svc.metadata.namespace,
                        'type': svc.spec.type,
                        'cluster_ip': svc.spec.cluster_ip,
                        'created_at': svc.metadata.creation_timestamp.isoformat()
                    })
                
                k8s_resources[context] = resources
                logger.info(f"同步 {context} 集群资源完成")
                
            except Exception as e:
                logger.error(f"同步 {context} 集群资源失败: {e}")
                k8s_resources[context] = {}
        
        return k8s_resources
    
    def generate_resource_report(self) -> Dict[str, Any]:
        """生成资源报告"""
        report = {
            'generated_at': datetime.now().isoformat(),
            'summary': {
                'total_resources': 0,
                'by_provider': {},
                'by_type': {},
                'by_region': {}
            },
            'details': {},
            'recommendations': []
        }
        
        # 统计资源
        for provider, resources in self.resources_cache.items():
            provider_name = provider.value
            report['summary']['by_provider'][provider_name] = len(resources)
            report['summary']['total_resources'] += len(resources)
            report['details'][provider_name] = []
            
            for resource in resources:
                # 按类型统计
                resource_type = resource.type.value
                if resource_type not in report['summary']['by_type']:
                    report['summary']['by_type'][resource_type] = 0
                report['summary']['by_type'][resource_type] += 1
                
                # 按区域统计
                if resource.region not in report['summary']['by_region']:
                    report['summary']['by_region'][resource.region] = 0
                report['summary']['by_region'][resource.region] += 1
                
                # 添加详细信息
                report['details'][provider_name].append(asdict(resource))
        
        # 生成建议
        self._generate_recommendations(report)
        
        return report
    
    def _generate_recommendations(self, report: Dict[str, Any]):
        """生成优化建议"""
        recommendations = []
        
        # 检查资源分布
        provider_counts = report['summary']['by_provider']
        if len(provider_counts) > 1:
            max_provider = max(provider_counts, key=provider_counts.get)
            max_count = provider_counts[max_provider]
            total_count = report['summary']['total_resources']
            
            if max_count / total_count > 0.8:
                recommendations.append({
                    'type': 'distribution',
                    'priority': 'medium',
                    'message': f"资源过度集中在 {max_provider},建议考虑多云分布以提高可用性"
                })
        
        # 检查区域分布
        region_counts = report['summary']['by_region']
        if len(region_counts) == 1:
            recommendations.append({
                'type': 'availability',
                'priority': 'high',
                'message': "所有资源都在同一区域,建议部署到多个区域以提高容灾能力"
            })
        
        # 检查资源类型
        type_counts = report['summary']['by_type']
        if 'kubernetes' in type_counts and type_counts['kubernetes'] > 1:
            recommendations.append({
                'type': 'optimization',
                'priority': 'low',
                'message': "检测到多个Kubernetes集群,建议评估是否可以合并以降低管理复杂度"
            })
        
        report['recommendations'] = recommendations
    
    def _get_tag_value(self, tags: List[Dict], key: str, default: str = "") -> str:
        """获取标签值"""
        for tag in tags:
            if tag.get('Key') == key:
                return tag.get('Value', default)
        return default
    
    def _parse_aws_tags(self, tags: List[Dict]) -> Dict[str, str]:
        """解析AWS标签"""
        return {tag['Key']: tag['Value'] for tag in tags}

def main():
    """主函数"""
    # 创建多云资源管理器
    manager = MultiCloudResourceManager()
    
    async def run_demo():
        # 发现资源
        print("发现多云资源...")
        resources = await manager.discover_resources([CloudProvider.AWS])
        
        for provider, resource_list in resources.items():
            print(f"\n{provider.value} 资源:")
            for resource in resource_list[:3]:  # 只显示前3个
                print(f"  - {resource.name} ({resource.type.value}) - {resource.status}")
        
        # 创建部署模板
        print("\n创建部署模板...")
        template = DeploymentTemplate(
            name="web-app-template",
            description="Web应用部署模板",
            provider=CloudProvider.AWS,
            resources=[
                {"type": "compute", "count": 2},
                {"type": "load_balancer", "count": 1},
                {"type": "database", "count": 1}
            ],
            parameters={
                "instance_type": "t3.medium",
                "region": "us-east-1"
            },
            dependencies=[]
        )
        
        manager.create_deployment_template(template)
        
        # 从模板部署
        print("\n从模板部署资源...")
        deployment_result = await manager.deploy_from_template(
            "web-app-template",
            {"instance_type": "t3.large"}
        )
        print(f"部署状态: {deployment_result['status']}")
        print(f"部署ID: {deployment_result.get('deployment_id', 'N/A')}")
        
        # 同步Kubernetes资源
        print("\n同步Kubernetes资源...")
        k8s_resources = manager.sync_kubernetes_resources(['default'])
        for context, resources in k8s_resources.items():
            if resources:
                print(f"{context} 集群:")
                print(f"  命名空间: {len(resources.get('namespaces', []))}")
                print(f"  部署: {len(resources.get('deployments', []))}")
                print(f"  服务: {len(resources.get('services', []))}")
        
        # 生成资源报告
        print("\n生成资源报告...")
        report = manager.generate_resource_report()
        print(f"总资源数: {report['summary']['total_resources']}")
        print(f"云提供商分布: {report['summary']['by_provider']}")
        print(f"资源类型分布: {report['summary']['by_type']}")
        
        if report['recommendations']:
            print("\n优化建议:")
            for rec in report['recommendations']:
                print(f"  [{rec['priority']}] {rec['message']}")
    
    # 运行演示
    asyncio.run(run_demo())

if __name__ == "__main__":
    main()

成本优化与监控

多云环境的成本优化需要综合考虑资源使用效率、定价策略和自动化管理。

成本优化分析器

#!/usr/bin/env python3
"""
多云成本优化分析器
提供跨云平台的成本分析和优化建议
"""

import json
import logging
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
from enum import Enum
import pandas as pd
import numpy as np
from collections import defaultdict
import boto3
from azure.identity import DefaultAzureCredential
from azure.mgmt.consumption import ConsumptionManagementClient
from google.cloud import billing

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class OptimizationType(Enum):
    """优化类型枚举"""
    RIGHT_SIZING = "right_sizing"
    RESERVED_INSTANCES = "reserved_instances"
    SPOT_INSTANCES = "spot_instances"
    STORAGE_OPTIMIZATION = "storage_optimization"
    NETWORK_OPTIMIZATION = "network_optimization"
    SCHEDULING = "scheduling"

@dataclass
class CostData:
    """成本数据类"""
    provider: str
    service: str
    resource_id: str
    resource_name: str
    region: str
    cost: float
    currency: str
    period_start: datetime
    period_end: datetime
    usage_quantity: float
    usage_unit: str
    tags: Dict[str, str]

@dataclass
class OptimizationRecommendation:
    """优化建议数据类"""
    type: OptimizationType
    resource_id: str
    resource_name: str
    current_cost: float
    potential_savings: float
    confidence: float
    description: str
    implementation_effort: str
    risk_level: str
    details: Dict[str, Any]

class MultiCloudCostOptimizer:
    """多云成本优化器"""
    
    def __init__(self):
        self.aws_session = None
        self.azure_credential = None
        self.gcp_billing_client = None
        self.cost_data = []
        self.recommendations = []
        
        self._initialize_clients()
    
    def _initialize_clients(self):
        """初始化云服务客户端"""
        try:
            # 初始化AWS客户端
            self.aws_session = boto3.Session()
            logger.info("AWS成本管理客户端初始化成功")
            
            # 初始化Azure客户端
            self.azure_credential = DefaultAzureCredential()
            logger.info("Azure成本管理客户端初始化成功")
            
            # 初始化GCP客户端
            self.gcp_billing_client = billing.CloudBillingClient()
            logger.info("GCP计费客户端初始化成功")
            
        except Exception as e:
            logger.error(f"客户端初始化失败: {e}")
    
    def collect_cost_data(self, start_date: datetime, end_date: datetime) -> List[CostData]:
        """收集成本数据"""
        all_cost_data = []
        
        # 收集AWS成本数据
        aws_costs = self._collect_aws_costs(start_date, end_date)
        all_cost_data.extend(aws_costs)
        
        # 收集Azure成本数据
        azure_costs = self._collect_azure_costs(start_date, end_date)
        all_cost_data.extend(azure_costs)
        
        # 收集GCP成本数据
        gcp_costs = self._collect_gcp_costs(start_date, end_date)
        all_cost_data.extend(gcp_costs)
        
        self.cost_data = all_cost_data
        logger.info(f"收集到 {len(all_cost_data)} 条成本数据")
        
        return all_cost_data
    
    def _collect_aws_costs(self, start_date: datetime, end_date: datetime) -> List[CostData]:
        """收集AWS成本数据"""
        cost_data = []
        
        try:
            ce_client = self.aws_session.client('ce')
            
            response = ce_client.get_cost_and_usage(
                TimePeriod={
                    'Start': start_date.strftime('%Y-%m-%d'),
                    'End': end_date.strftime('%Y-%m-%d')
                },
                Granularity='DAILY',
                Metrics=['BlendedCost', 'UsageQuantity'],
                GroupBy=[
                    {'Type': 'DIMENSION', 'Key': 'SERVICE'},
                    {'Type': 'DIMENSION', 'Key': 'REGION'}
                ]
            )
            
            for result in response['ResultsByTime']:
                period_start = datetime.strptime(result['TimePeriod']['Start'], '%Y-%m-%d')
                period_end = datetime.strptime(result['TimePeriod']['End'], '%Y-%m-%d')
                
                for group in result['Groups']:
                    service = group['Keys'][0] if len(group['Keys']) > 0 else 'Unknown'
                    region = group['Keys'][1] if len(group['Keys']) > 1 else 'Unknown'
                    
                    cost = float(group['Metrics']['BlendedCost']['Amount'])
                    usage = float(group['Metrics']['UsageQuantity']['Amount'])
                    
                    cost_data.append(CostData(
                        provider='AWS',
                        service=service,
                        resource_id=f"aws-{service}-{region}",
                        resource_name=f"{service} in {region}",
                        region=region,
                        cost=cost,
                        currency=group['Metrics']['BlendedCost']['Unit'],
                        period_start=period_start,
                        period_end=period_end,
                        usage_quantity=usage,
                        usage_unit=group['Metrics']['UsageQuantity']['Unit'],
                        tags={}
                    ))
            
        except Exception as e:
            logger.error(f"收集AWS成本数据失败: {e}")
        
        return cost_data
    
    def _collect_azure_costs(self, start_date: datetime, end_date: datetime) -> List[CostData]:
        """收集Azure成本数据"""
        cost_data = []
        
        try:
            subscription_id = "your-subscription-id"  # 从配置获取
            consumption_client = ConsumptionManagementClient(
                self.azure_credential, 
                subscription_id
            )
            
            # 获取使用详情
            usage_details = consumption_client.usage_details.list(
                scope=f"/subscriptions/{subscription_id}",
                filter=f"properties/usageStart ge '{start_date.strftime('%Y-%m-%d')}' and properties/usageEnd le '{end_date.strftime('%Y-%m-%d')}'"
            )
            
            for usage in usage_details:
                cost_data.append(CostData(
                    provider='Azure',
                    service=usage.consumed_service,
                    resource_id=usage.instance_id or 'unknown',
                    resource_name=usage.instance_name or usage.consumed_service,
                    region=usage.resource_location or 'unknown',
                    cost=float(usage.cost),
                    currency=usage.billing_currency,
                    period_start=usage.usage_start,
                    period_end=usage.usage_end,
                    usage_quantity=float(usage.usage_quantity),
                    usage_unit=usage.unit_of_measure,
                    tags=usage.tags or {}
                ))
            
        except Exception as e:
            logger.error(f"收集Azure成本数据失败: {e}")
        
        return cost_data
    
    def _collect_gcp_costs(self, start_date: datetime, end_date: datetime) -> List[CostData]:
        """收集GCP成本数据"""
        cost_data = []
        
        try:
            # GCP BigQuery导出的计费数据查询
            # 这里需要配置BigQuery客户端和查询
            # 为简化示例,返回模拟数据
            
            services = ['Compute Engine', 'Cloud Storage', 'BigQuery', 'Cloud SQL']
            regions = ['us-central1', 'us-east1', 'europe-west1']
            
            current_date = start_date
            while current_date < end_date:
                for service in services:
                    for region in regions:
                        cost_data.append(CostData(
                            provider='GCP',
                            service=service,
                            resource_id=f"gcp-{service.lower().replace(' ', '-')}-{region}",
                            resource_name=f"{service} in {region}",
                            region=region,
                            cost=np.random.uniform(10, 1000),
                            currency='USD',
                            period_start=current_date,
                            period_end=current_date + timedelta(days=1),
                            usage_quantity=np.random.uniform(1, 100),
                            usage_unit='hours',
                            tags={}
                        ))
                
                current_date += timedelta(days=1)
            
        except Exception as e:
            logger.error(f"收集GCP成本数据失败: {e}")
        
        return cost_data
    
    def analyze_cost_trends(self) -> Dict[str, Any]:
        """分析成本趋势"""
        if not self.cost_data:
            return {}
        
        # 转换为DataFrame进行分析
        df = pd.DataFrame([asdict(cost) for cost in self.cost_data])
        df['period_start'] = pd.to_datetime(df['period_start'])
        
        analysis = {
            'total_cost': df['cost'].sum(),
            'daily_average': df.groupby('period_start')['cost'].sum().mean(),
            'cost_by_provider': df.groupby('provider')['cost'].sum().to_dict(),
            'cost_by_service': df.groupby('service')['cost'].sum().to_dict(),
            'cost_by_region': df.groupby('region')['cost'].sum().to_dict(),
            'trend_analysis': {},
            'growth_rate': {}
        }
        
        # 计算趋势
        daily_costs = df.groupby('period_start')['cost'].sum().sort_index()
        if len(daily_costs) > 1:
            # 计算增长率
            first_week = daily_costs.head(7).mean()
            last_week = daily_costs.tail(7).mean()
            
            if first_week > 0:
                growth_rate = ((last_week - first_week) / first_week) * 100
                analysis['growth_rate']['weekly'] = growth_rate
            
            # 趋势分析
            x = np.arange(len(daily_costs))
            y = daily_costs.values
            
            if len(x) > 1:
                slope, intercept = np.polyfit(x, y, 1)
                analysis['trend_analysis']['slope'] = slope
                analysis['trend_analysis']['direction'] = 'increasing' if slope > 0 else 'decreasing'
        
        return analysis
    
    def generate_optimization_recommendations(self) -> List[OptimizationRecommendation]:
        """生成优化建议"""
        recommendations = []
        
        if not self.cost_data:
            return recommendations
        
        # 转换为DataFrame
        df = pd.DataFrame([asdict(cost) for cost in self.cost_data])
        
        # 右尺寸优化建议
        recommendations.extend(self._analyze_right_sizing(df))
        
        # 预留实例建议
        recommendations.extend(self._analyze_reserved_instances(df))
        
        # Spot实例建议
        recommendations.extend(self._analyze_spot_instances(df))
        
        # 存储优化建议
        recommendations.extend(self._analyze_storage_optimization(df))
        
        # 网络优化建议
        recommendations.extend(self._analyze_network_optimization(df))
        
        # 调度优化建议
        recommendations.extend(self._analyze_scheduling_optimization(df))
        
        # 按潜在节省排序
        recommendations.sort(key=lambda x: x.potential_savings, reverse=True)
        
        self.recommendations = recommendations
        return recommendations
    
    def _analyze_right_sizing(self, df: pd.DataFrame) -> List[OptimizationRecommendation]:
        """分析右尺寸优化"""
        recommendations = []
        
        # 查找高成本计算资源
        compute_services = df[df['service'].str.contains('Compute|EC2|Virtual Machines', case=False, na=False)]
        
        if not compute_services.empty:
            high_cost_resources = compute_services.groupby(['resource_id', 'resource_name'])['cost'].sum()
            high_cost_resources = high_cost_resources[high_cost_resources > high_cost_resources.quantile(0.8)]
            
            for resource_id, cost in high_cost_resources.items():
                recommendations.append(OptimizationRecommendation(
                    type=OptimizationType.RIGHT_SIZING,
                    resource_id=resource_id[0],
                    resource_name=resource_id[1],
                    current_cost=cost,
                    potential_savings=cost * 0.3,  # 假设可节省30%
                    confidence=0.7,
                    description=f"资源 {resource_id[1]} 的成本较高,建议评估是否可以降低配置",
                    implementation_effort="Medium",
                    risk_level="Low",
                    details={
                        'current_monthly_cost': cost,
                        'recommended_action': 'Downsize instance type',
                        'monitoring_period': '2 weeks'
                    }
                ))
        
        return recommendations
    
    def _analyze_reserved_instances(self, df: pd.DataFrame) -> List[OptimizationRecommendation]:
        """分析预留实例优化"""
        recommendations = []
        
        # 查找稳定运行的计算资源
        compute_services = df[df['service'].str.contains('Compute|EC2|Virtual Machines', case=False, na=False)]
        
        if not compute_services.empty:
            # 按资源分组,计算使用稳定性
            resource_usage = compute_services.groupby(['resource_id', 'resource_name']).agg({
                'cost': 'sum',
                'usage_quantity': ['mean', 'std']
            }).reset_index()
            
            # 扁平化列名
            resource_usage.columns = ['resource_id', 'resource_name', 'total_cost', 'avg_usage', 'usage_std']
            
            # 查找使用稳定且成本较高的资源
            stable_resources = resource_usage[
                (resource_usage['usage_std'] / resource_usage['avg_usage'] < 0.2) &  # 使用量变化小于20%
                (resource_usage['total_cost'] > resource_usage['total_cost'].quantile(0.6))  # 成本较高
            ]
            
            for _, resource in stable_resources.iterrows():
                savings = resource['total_cost'] * 0.4  # 预留实例可节省40%
                
                recommendations.append(OptimizationRecommendation(
                    type=OptimizationType.RESERVED_INSTANCES,
                    resource_id=resource['resource_id'],
                    resource_name=resource['resource_name'],
                    current_cost=resource['total_cost'],
                    potential_savings=savings,
                    confidence=0.8,
                    description=f"资源 {resource['resource_name']} 使用稳定,建议购买预留实例",
                    implementation_effort="Low",
                    risk_level="Low",
                    details={
                        'usage_stability': resource['usage_std'] / resource['avg_usage'],
                        'recommended_term': '1 year',
                        'payment_option': 'Partial Upfront'
                    }
                ))
        
        return recommendations
    
    def _analyze_spot_instances(self, df: pd.DataFrame) -> List[OptimizationRecommendation]:
        """分析Spot实例优化"""
        recommendations = []
        
        # 查找可容错的工作负载
        batch_services = df[df['service'].str.contains('Batch|Lambda|Functions', case=False, na=False)]
        
        if not batch_services.empty:
            high_cost_batch = batch_services.groupby(['resource_id', 'resource_name'])['cost'].sum()
            high_cost_batch = high_cost_batch[high_cost_batch > high_cost_batch.quantile(0.7)]
            
            for resource_id, cost in high_cost_batch.items():
                recommendations.append(OptimizationRecommendation(
                    type=OptimizationType.SPOT_INSTANCES,
                    resource_id=resource_id[0],
                    resource_name=resource_id[1],
                    current_cost=cost,
                    potential_savings=cost * 0.6,  # Spot实例可节省60%
                    confidence=0.6,
                    description=f"批处理工作负载 {resource_id[1]} 建议使用Spot实例",
                    implementation_effort="Medium",
                    risk_level="Medium",
                    details={
                        'workload_type': 'Batch processing',
                        'fault_tolerance': 'Required',
                        'recommended_strategy': 'Mixed instance types'
                    }
                ))
        
        return recommendations
    
    def _analyze_storage_optimization(self, df: pd.DataFrame) -> List[OptimizationRecommendation]:
        """分析存储优化"""
        recommendations = []
        
        # 查找存储服务
        storage_services = df[df['service'].str.contains('Storage|S3|Blob|Cloud Storage', case=False, na=False)]
        
        if not storage_services.empty:
            high_cost_storage = storage_services.groupby(['resource_id', 'resource_name'])['cost'].sum()
            high_cost_storage = high_cost_storage[high_cost_storage > high_cost_storage.quantile(0.7)]
            
            for resource_id, cost in high_cost_storage.items():
                recommendations.append(OptimizationRecommendation(
                    type=OptimizationType.STORAGE_OPTIMIZATION,
                    resource_id=resource_id[0],
                    resource_name=resource_id[1],
                    current_cost=cost,
                    potential_savings=cost * 0.25,  # 存储优化可节省25%
                    confidence=0.8,
                    description=f"存储资源 {resource_id[1]} 建议优化存储类别和生命周期策略",
                    implementation_effort="Low",
                    risk_level="Low",
                    details={
                        'optimization_type': 'Storage class transition',
                        'lifecycle_policy': 'Recommended',
                        'compression': 'Evaluate'
                    }
                ))
        
        return recommendations
    
    def _analyze_network_optimization(self, df: pd.DataFrame) -> List[OptimizationRecommendation]:
        """分析网络优化"""
        recommendations = []
        
        # 查找网络服务
        network_services = df[df['service'].str.contains('Network|VPC|CDN|Load Balancer', case=False, na=False)]
        
        if not network_services.empty:
            # 按区域分析网络成本
            region_costs = network_services.groupby('region')['cost'].sum()
            high_cost_regions = region_costs[region_costs > region_costs.quantile(0.8)]
            
            for region, cost in high_cost_regions.items():
                recommendations.append(OptimizationRecommendation(
                    type=OptimizationType.NETWORK_OPTIMIZATION,
                    resource_id=f"network-{region}",
                    resource_name=f"Network resources in {region}",
                    current_cost=cost,
                    potential_savings=cost * 0.2,  # 网络优化可节省20%
                    confidence=0.6,
                    description=f"区域 {region} 的网络成本较高,建议优化数据传输和CDN配置",
                    implementation_effort="Medium",
                    risk_level="Low",
                    details={
                        'optimization_areas': ['Data transfer', 'CDN configuration', 'Load balancer optimization'],
                        'region': region
                    }
                ))
        
        return recommendations
    
    def _analyze_scheduling_optimization(self, df: pd.DataFrame) -> List[OptimizationRecommendation]:
        """分析调度优化"""
        recommendations = []
        
        # 查找可调度的资源
        schedulable_services = df[df['service'].str.contains('Compute|EC2|Virtual Machines|Database', case=False, na=False)]
        
        if not schedulable_services.empty:
            # 分析使用模式
            df['hour'] = pd.to_datetime(df['period_start']).dt.hour
            hourly_usage = schedulable_services.groupby(['resource_id', 'hour'])['usage_quantity'].mean().unstack(fill_value=0)
            
            # 查找有明显使用模式的资源
            for resource_id in hourly_usage.index:
                usage_pattern = hourly_usage.loc[resource_id]
                peak_hours = usage_pattern.nlargest(8).index.tolist()  # 前8小时为峰值
                off_peak_hours = usage_pattern.nsmallest(8).index.tolist()  # 后8小时为低谷
                
                if usage_pattern.max() > usage_pattern.mean() * 2:  # 峰值是平均值的2倍以上
                    resource_cost = schedulable_services[schedulable_services['resource_id'] == resource_id]['cost'].sum()
                    
                    recommendations.append(OptimizationRecommendation(
                        type=OptimizationType.SCHEDULING,
                        resource_id=resource_id,
                        resource_name=resource_id,
                        current_cost=resource_cost,
                        potential_savings=resource_cost * 0.3,  # 调度优化可节省30%
                        confidence=0.7,
                        description=f"资源 {resource_id} 有明显的使用模式,建议实施自动调度",
                        implementation_effort="High",
                        risk_level="Medium",
                        details={
                            'peak_hours': peak_hours,
                            'off_peak_hours': off_peak_hours,
                            'scaling_strategy': 'Time-based auto scaling'
                        }
                    ))
        
        return recommendations
    
    def create_cost_dashboard_data(self) -> Dict[str, Any]:
        """创建成本仪表板数据"""
        if not self.cost_data:
            return {}
        
        df = pd.DataFrame([asdict(cost) for cost in self.cost_data])
        df['period_start'] = pd.to_datetime(df['period_start'])
        
        # 准备仪表板数据
        dashboard_data = {
            'summary': {
                'total_cost': df['cost'].sum(),
                'total_resources': len(df['resource_id'].unique()),
                'cost_trend': 'increasing',  # 基于趋势分析
                'top_cost_driver': df.groupby('service')['cost'].sum().idxmax()
            },
            'charts': {
                'cost_by_provider': df.groupby('provider')['cost'].sum().to_dict(),
                'cost_by_service': df.groupby('service')['cost'].sum().nlargest(10).to_dict(),
                'cost_by_region': df.groupby('region')['cost'].sum().to_dict(),
                'daily_cost_trend': df.groupby('period_start')['cost'].sum().to_dict()
            },
            'recommendations_summary': {
                'total_recommendations': len(self.recommendations),
                'potential_savings': sum(rec.potential_savings for rec in self.recommendations),
                'high_priority_count': len([rec for rec in self.recommendations if rec.confidence > 0.8])
            },
            'alerts': []
        }
        
        # 生成告警
        total_cost = dashboard_data['summary']['total_cost']
        if total_cost > 10000:  # 假设阈值
            dashboard_data['alerts'].append({
                'type': 'high_cost',
                'message': f"总成本 ${total_cost:.2f} 超过预算阈值",
                'severity': 'warning'
            })
        
        # 检查成本增长
        daily_costs = df.groupby('period_start')['cost'].sum().sort_index()
        if len(daily_costs) > 7:
            recent_avg = daily_costs.tail(7).mean()
            previous_avg = daily_costs.head(7).mean()
            
            if recent_avg > previous_avg * 1.2:  # 增长超过20%
                dashboard_data['alerts'].append({
                    'type': 'cost_spike',
                    'message': f"最近7天成本增长 {((recent_avg - previous_avg) / previous_avg * 100):.1f}%",
                    'severity': 'critical'
                })
        
        return dashboard_data
    
    def export_recommendations(self, format: str = 'json') -> str:
        """导出优化建议"""
        if format == 'json':
            return json.dumps([asdict(rec) for rec in self.recommendations], 
                            indent=2, default=str, ensure_ascii=False)
        elif format == 'csv':
            df = pd.DataFrame([asdict(rec) for rec in self.recommendations])
            return df.to_csv(index=False)
        else:
            raise ValueError(f"不支持的格式: {format}")

def main():
    """主函数"""
    # 创建成本优化器
    optimizer = MultiCloudCostOptimizer()
    
    # 设置时间范围
    end_date = datetime.now()
    start_date = end_date - timedelta(days=30)
    
    # 收集成本数据
    print("收集成本数据...")
    cost_data = optimizer.collect_cost_data(start_date, end_date)
    print(f"收集到 {len(cost_data)} 条成本记录")
    
    # 分析成本趋势
    print("\n分析成本趋势...")
    trends = optimizer.analyze_cost_trends()
    print(f"总成本: ${trends.get('total_cost', 0):.2f}")
    print(f"日均成本: ${trends.get('daily_average', 0):.2f}")
    print("按云提供商分布:")
    for provider, cost in trends.get('cost_by_provider', {}).items():
        print(f"  {provider}: ${cost:.2f}")
    
    # 生成优化建议
    print("\n生成优化建议...")
    recommendations = optimizer.generate_optimization_recommendations()
    print(f"生成 {len(recommendations)} 条优化建议")
    
    # 显示前5条建议
    print("\n前5条优化建议:")
    for i, rec in enumerate(recommendations[:5], 1):
        print(f"{i}. [{rec.type.value}] {rec.description}")
        print(f"   当前成本: ${rec.current_cost:.2f}, 潜在节省: ${rec.potential_savings:.2f}")
        print(f"   置信度: {rec.confidence:.1%}, 实施难度: {rec.implementation_effort}")
    
    # 创建仪表板数据
    print("\n创建成本仪表板...")
    dashboard = optimizer.create_cost_dashboard_data()
    print(f"总潜在节省: ${dashboard.get('recommendations_summary', {}).get('potential_savings', 0):.2f}")
    
    if dashboard.get('alerts'):
        print("\n成本告警:")
        for alert in dashboard['alerts']:
            print(f"  [{alert['severity']}] {alert['message']}")
    
    # 导出建议
    print("\n导出优化建议...")
    json_export = optimizer.export_recommendations('json')
    with open('cost_optimization_recommendations.json', 'w', encoding='utf-8') as f:
        f.write(json_export)
    print("建议已导出到 cost_optimization_recommendations.json")

if __name__ == "__main__":
    main()

监控与可观测性

多云环境需要统一的监控和可观测性平台,以实现跨云的性能监控、故障诊断和运维管理。

统一监控平台

# prometheus-multicloud-config.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - "multicloud_rules.yml"

alerting:
  alertmanagers:
    - static_configs:
        - targets:
          - alertmanager:9093

scrape_configs:
  # AWS CloudWatch Exporter
  - job_name: 'aws-cloudwatch'
    static_configs:
      - targets: ['cloudwatch-exporter:9106']
    scrape_interval: 60s
    metrics_path: /metrics
    params:
      region: ['us-east-1', 'us-west-2', 'eu-west-1']

  # Azure Monitor Exporter
  - job_name: 'azure-monitor'
    static_configs:
      - targets: ['azure-exporter:9090']
    scrape_interval: 60s

  # GCP Monitoring Exporter
  - job_name: 'gcp-monitoring'
    static_configs:
      - targets: ['gcp-exporter:9091']
    scrape_interval: 60s

  # Kubernetes clusters
  - job_name: 'kubernetes-apiservers'
    kubernetes_sd_configs:
    - role: endpoints
    scheme: https
    tls_config:
      ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
    bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
    relabel_configs:
    - source_labels: [__meta_kubernetes_namespace, __meta_kubernetes_service_name, __meta_kubernetes_endpoint_port_name]
      action: keep
      regex: default;kubernetes;https

  # Multi-cloud application metrics
  - job_name: 'multicloud-apps'
    consul_sd_configs:
      - server: 'consul:8500'
        services: ['web-app', 'api-service', 'database']
    relabel_configs:
    - source_labels: [__meta_consul_service]
      target_label: service
    - source_labels: [__meta_consul_node]
      target_label: instance
    - source_labels: [__meta_consul_tags]
      regex: '.*,cloud:([^,]+),.*'
      target_label: cloud_provider

多云告警规则

# multicloud_rules.yml
groups:
- name: multicloud.rules
  rules:
  # 跨云资源可用性告警
  - alert: MultiCloudServiceDown
    expr: up{job=~"aws-.*|azure-.*|gcp-.*"} == 0
    for: 5m
    labels:
      severity: critical
      category: availability
    annotations:
      summary: "Multi-cloud service {{ $labels.job }} is down"
      description: "Service {{ $labels.job }} on {{ $labels.cloud_provider }} has been down for more than 5 minutes."

  # 成本异常告警
  - alert: HighCostIncrease
    expr: increase(cloud_cost_total[1h]) > 100
    for: 10m
    labels:
      severity: warning
      category: cost
    annotations:
      summary: "High cost increase detected"
      description: "Cloud cost increased by ${{ $value }} in the last hour on {{ $labels.cloud_provider }}."

  # 跨云延迟告警
  - alert: HighCrossCloudLatency
    expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket{job="multicloud-apps"}[5m])) > 0.5
    for: 5m
    labels:
      severity: warning
      category: performance
    annotations:
      summary: "High cross-cloud latency detected"
      description: "95th percentile latency is {{ $value }}s for {{ $labels.service }} between {{ $labels.source_cloud }} and {{ $labels.target_cloud }}."

  # 资源利用率告警
  - alert: LowResourceUtilization
    expr: avg_over_time(cpu_usage_percent[24h]) < 20
    for: 1h
    labels:
      severity: info
      category: optimization
    annotations:
      summary: "Low resource utilization detected"
      description: "Resource {{ $labels.instance }} on {{ $labels.cloud_provider }} has average CPU utilization of {{ $value }}% over 24 hours."

  # 多云数据同步告警
  - alert: DataSyncFailure
    expr: increase(data_sync_failures_total[1h]) > 0
    for: 0m
    labels:
      severity: critical
      category: data_integrity
    annotations:
      summary: "Data synchronization failure"
      description: "Data sync failed {{ $value }} times in the last hour between {{ $labels.source_cloud }} and {{ $labels.target_cloud }}."

- name: multicloud.cost.rules
  rules:
  # 成本预测规则
  - record: cloud_cost_prediction_7d
    expr: predict_linear(cloud_cost_total[7d], 7*24*3600)

  - record: cloud_cost_efficiency_ratio
    expr: cloud_revenue_total / cloud_cost_total

  # 资源利用率记录
  - record: resource_utilization_avg_24h
    expr: avg_over_time(cpu_usage_percent[24h])

  - record: storage_efficiency_ratio
    expr: storage_used_bytes / storage_allocated_bytes

可观测性仪表板

#!/usr/bin/env python3
"""
多云可观测性仪表板
提供统一的多云监控和可视化界面
"""

import json
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
import requests
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import streamlit as st

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class MetricData:
    """指标数据类"""
    timestamp: datetime
    value: float
    labels: Dict[str, str]
    metric_name: str

@dataclass
class AlertData:
    """告警数据类"""
    alert_name: str
    severity: str
    status: str
    started_at: datetime
    labels: Dict[str, str]
    annotations: Dict[str, str]

class MultiCloudObservabilityDashboard:
    """多云可观测性仪表板"""
    
    def __init__(self, prometheus_url: str = "http://prometheus:9090"):
        self.prometheus_url = prometheus_url
        self.metrics_cache = {}
        self.alerts_cache = []
        
    def query_prometheus(self, query: str, start_time: datetime = None, end_time: datetime = None) -> List[MetricData]:
        """查询Prometheus指标"""
        try:
            if start_time and end_time:
                # 范围查询
                params = {
                    'query': query,
                    'start': start_time.isoformat(),
                    'end': end_time.isoformat(),
                    'step': '60s'
                }
                response = requests.get(f"{self.prometheus_url}/api/v1/query_range", params=params)
            else:
                # 即时查询
                params = {'query': query}
                response = requests.get(f"{self.prometheus_url}/api/v1/query", params=params)
            
            response.raise_for_status()
            data = response.json()
            
            metrics = []
            if data['status'] == 'success':
                for result in data['data']['result']:
                    labels = result['metric']
                    
                    if 'values' in result:  # 范围查询
                        for timestamp, value in result['values']:
                            metrics.append(MetricData(
                                timestamp=datetime.fromtimestamp(float(timestamp)),
                                value=float(value),
                                labels=labels,
                                metric_name=query
                            ))
                    else:  # 即时查询
                        timestamp, value = result['value']
                        metrics.append(MetricData(
                            timestamp=datetime.fromtimestamp(float(timestamp)),
                            value=float(value),
                            labels=labels,
                            metric_name=query
                        ))
            
            return metrics
            
        except Exception as e:
            logger.error(f"查询Prometheus失败: {e}")
            return []
    
    def get_active_alerts(self) -> List[AlertData]:
        """获取活跃告警"""
        try:
            response = requests.get(f"{self.prometheus_url}/api/v1/alerts")
            response.raise_for_status()
            data = response.json()
            
            alerts = []
            if data['status'] == 'success':
                for alert in data['data']['alerts']:
                    alerts.append(AlertData(
                        alert_name=alert['labels']['alertname'],
                        severity=alert['labels'].get('severity', 'unknown'),
                        status=alert['state'],
                        started_at=datetime.fromisoformat(alert['activeAt'].rstrip('Z')),
                        labels=alert['labels'],
                        annotations=alert.get('annotations', {})
                    ))
            
            self.alerts_cache = alerts
            return alerts
            
        except Exception as e:
            logger.error(f"获取告警失败: {e}")
            return []
    
    def create_cost_overview_chart(self) -> go.Figure:
        """创建成本概览图表"""
        # 查询各云提供商的成本数据
        end_time = datetime.now()
        start_time = end_time - timedelta(days=7)
        
        cost_metrics = self.query_prometheus(
            'sum by (cloud_provider) (cloud_cost_total)',
            start_time, end_time
        )
        
        if not cost_metrics:
            return go.Figure()
        
        # 转换为DataFrame
        df = pd.DataFrame([
            {
                'timestamp': metric.timestamp,
                'cost': metric.value,
                'provider': metric.labels.get('cloud_provider', 'unknown')
            }
            for metric in cost_metrics
        ])
        
        # 创建堆叠面积图
        fig = px.area(
            df, 
            x='timestamp', 
            y='cost', 
            color='provider',
            title='多云成本趋势 (7天)',
            labels={'cost': '成本 ($)', 'timestamp': '时间'}
        )
        
        fig.update_layout(
            xaxis_title="时间",
            yaxis_title="成本 ($)",
            legend_title="云提供商"
        )
        
        return fig
    
    def create_resource_utilization_chart(self) -> go.Figure:
        """创建资源利用率图表"""
        # 查询CPU利用率数据
        cpu_metrics = self.query_prometheus(
            'avg by (cloud_provider) (cpu_usage_percent)'
        )
        
        # 查询内存利用率数据
        memory_metrics = self.query_prometheus(
            'avg by (cloud_provider) (memory_usage_percent)'
        )
        
        # 查询存储利用率数据
        storage_metrics = self.query_prometheus(
            'avg by (cloud_provider) (storage_efficiency_ratio * 100)'
        )
        
        providers = list(set([m.labels.get('cloud_provider', 'unknown') for m in cpu_metrics]))
        
        cpu_values = []
        memory_values = []
        storage_values = []
        
        for provider in providers:
            cpu_val = next((m.value for m in cpu_metrics if m.labels.get('cloud_provider') == provider), 0)
            memory_val = next((m.value for m in memory_metrics if m.labels.get('cloud_provider') == provider), 0)
            storage_val = next((m.value for m in storage_metrics if m.labels.get('cloud_provider') == provider), 0)
            
            cpu_values.append(cpu_val)
            memory_values.append(memory_val)
            storage_values.append(storage_val)
        
        # 创建分组柱状图
        fig = go.Figure(data=[
            go.Bar(name='CPU', x=providers, y=cpu_values),
            go.Bar(name='内存', x=providers, y=memory_values),
            go.Bar(name='存储', x=providers, y=storage_values)
        ])
        
        fig.update_layout(
            title='各云提供商资源利用率',
            xaxis_title='云提供商',
            yaxis_title='利用率 (%)',
            barmode='group'
        )
        
        return fig
    
    def create_performance_metrics_chart(self) -> go.Figure:
        """创建性能指标图表"""
        end_time = datetime.now()
        start_time = end_time - timedelta(hours=24)
        
        # 查询响应时间数据
        latency_metrics = self.query_prometheus(
            'histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))',
            start_time, end_time
        )
        
        # 查询错误率数据
        error_metrics = self.query_prometheus(
            'rate(http_requests_total{status=~"5.."}[5m]) / rate(http_requests_total[5m]) * 100',
            start_time, end_time
        )
        
        # 创建子图
        fig = make_subplots(
            rows=2, cols=1,
            subplot_titles=('响应时间 (95th percentile)', '错误率'),
            vertical_spacing=0.1
        )
        
        if latency_metrics:
            df_latency = pd.DataFrame([
                {
                    'timestamp': metric.timestamp,
                    'latency': metric.value,
                    'service': metric.labels.get('service', 'unknown')
                }
                for metric in latency_metrics
            ])
            
            for service in df_latency['service'].unique():
                service_data = df_latency[df_latency['service'] == service]
                fig.add_trace(
                    go.Scatter(
                        x=service_data['timestamp'],
                        y=service_data['latency'],
                        mode='lines',
                        name=f'{service} 延迟',
                        line=dict(width=2)
                    ),
                    row=1, col=1
                )
        
        if error_metrics:
            df_error = pd.DataFrame([
                {
                    'timestamp': metric.timestamp,
                    'error_rate': metric.value,
                    'service': metric.labels.get('service', 'unknown')
                }
                for metric in error_metrics
            ])
            
            for service in df_error['service'].unique():
                service_data = df_error[df_error['service'] == service]
                fig.add_trace(
                    go.Scatter(
                        x=service_data['timestamp'],
                        y=service_data['error_rate'],
                        mode='lines',
                        name=f'{service} 错误率',
                        line=dict(width=2)
                    ),
                    row=2, col=1
                )
        
        fig.update_xaxes(title_text="时间", row=2, col=1)
        fig.update_yaxes(title_text="延迟 (秒)", row=1, col=1)
        fig.update_yaxes(title_text="错误率 (%)", row=2, col=1)
        
        fig.update_layout(
            title='应用性能指标 (24小时)',
            height=600
        )
        
        return fig
    
    def create_alerts_summary(self) -> Dict[str, Any]:
        """创建告警摘要"""
        alerts = self.get_active_alerts()
        
        summary = {
            'total_alerts': len(alerts),
            'critical_alerts': len([a for a in alerts if a.severity == 'critical']),
            'warning_alerts': len([a for a in alerts if a.severity == 'warning']),
            'info_alerts': len([a for a in alerts if a.severity == 'info']),
            'alerts_by_category': {},
            'recent_alerts': []
        }
        
        # 按类别统计告警
        for alert in alerts:
            category = alert.labels.get('category', 'other')
            if category not in summary['alerts_by_category']:
                summary['alerts_by_category'][category] = 0
            summary['alerts_by_category'][category] += 1
        
        # 最近的告警
        recent_alerts = sorted(alerts, key=lambda x: x.started_at, reverse=True)[:10]
        summary['recent_alerts'] = [
            {
                'name': alert.alert_name,
                'severity': alert.severity,
                'started_at': alert.started_at.strftime('%Y-%m-%d %H:%M:%S'),
                'description': alert.annotations.get('summary', ''),
                'cloud_provider': alert.labels.get('cloud_provider', 'unknown')
            }
            for alert in recent_alerts
        ]
        
        return summary
    
    def generate_health_score(self) -> Dict[str, float]:
        """生成健康评分"""
        scores = {}
        
        # 可用性评分 (基于服务正常运行时间)
        uptime_metrics = self.query_prometheus('avg(up)')
        availability_score = uptime_metrics[0].value * 100 if uptime_metrics else 0
        
        # 性能评分 (基于响应时间)
        latency_metrics = self.query_prometheus(
            'histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))'
        )
        if latency_metrics:
            avg_latency = sum(m.value for m in latency_metrics) / len(latency_metrics)
            performance_score = max(0, 100 - (avg_latency * 100))  # 延迟越低分数越高
        else:
            performance_score = 100
        
        # 成本效率评分 (基于资源利用率)
        utilization_metrics = self.query_prometheus('avg(cpu_usage_percent)')
        if utilization_metrics:
            avg_utilization = utilization_metrics[0].value
            # 理想利用率在60-80%之间
            if 60 <= avg_utilization <= 80:
                cost_efficiency_score = 100
            elif avg_utilization < 60:
                cost_efficiency_score = avg_utilization / 60 * 100
            else:
                cost_efficiency_score = max(0, 100 - (avg_utilization - 80) * 2)
        else:
            cost_efficiency_score = 50
        
        # 安全评分 (基于告警数量)
        alerts = self.get_active_alerts()
        critical_alerts = len([a for a in alerts if a.severity == 'critical'])
        security_score = max(0, 100 - critical_alerts * 10)
        
        # 综合评分
        overall_score = (
            availability_score * 0.3 +
            performance_score * 0.25 +
            cost_efficiency_score * 0.25 +
            security_score * 0.2
        )
        
        scores = {
            'overall': round(overall_score, 1),
            'availability': round(availability_score, 1),
            'performance': round(performance_score, 1),
            'cost_efficiency': round(cost_efficiency_score, 1),
            'security': round(security_score, 1)
        }
        
        return scores

def create_streamlit_dashboard():
    """创建Streamlit仪表板"""
    st.set_page_config(
        page_title="多云可观测性仪表板",
        page_icon="☁️",
        layout="wide"
    )
    
    st.title("☁️ 多云可观测性仪表板")
    
    # 初始化仪表板
    dashboard = MultiCloudObservabilityDashboard()
    
    # 侧边栏配置
    st.sidebar.header("配置")
    prometheus_url = st.sidebar.text_input(
        "Prometheus URL", 
        value="http://prometheus:9090"
    )
    dashboard.prometheus_url = prometheus_url
    
    refresh_interval = st.sidebar.selectbox(
        "刷新间隔",
        options=[30, 60, 300, 600],
        index=1,
        format_func=lambda x: f"{x}秒"
    )
    
    # 自动刷新
    if st.sidebar.button("刷新数据"):
        st.experimental_rerun()
    
    # 健康评分
    st.header("🎯 系统健康评分")
    health_scores = dashboard.generate_health_score()
    
    col1, col2, col3, col4, col5 = st.columns(5)
    
    with col1:
        st.metric("综合评分", f"{health_scores['overall']}/100")
    with col2:
        st.metric("可用性", f"{health_scores['availability']}/100")
    with col3:
        st.metric("性能", f"{health_scores['performance']}/100")
    with col4:
        st.metric("成本效率", f"{health_scores['cost_efficiency']}/100")
    with col5:
        st.metric("安全性", f"{health_scores['security']}/100")
    
    # 告警摘要
    st.header("🚨 告警摘要")
    alerts_summary = dashboard.create_alerts_summary()
    
    col1, col2, col3, col4 = st.columns(4)
    with col1:
        st.metric("总告警", alerts_summary['total_alerts'])
    with col2:
        st.metric("严重告警", alerts_summary['critical_alerts'])
    with col3:
        st.metric("警告告警", alerts_summary['warning_alerts'])
    with col4:
        st.metric("信息告警", alerts_summary['info_alerts'])
    
    # 最近告警
    if alerts_summary['recent_alerts']:
        st.subheader("最近告警")
        alerts_df = pd.DataFrame(alerts_summary['recent_alerts'])
        st.dataframe(alerts_df, use_container_width=True)
    
    # 图表展示
    st.header("📊 监控图表")
    
    # 成本概览
    st.subheader("成本概览")
    cost_chart = dashboard.create_cost_overview_chart()
    st.plotly_chart(cost_chart, use_container_width=True)
    
    # 资源利用率和性能指标
    col1, col2 = st.columns(2)
    
    with col1:
        st.subheader("资源利用率")
        utilization_chart = dashboard.create_resource_utilization_chart()
        st.plotly_chart(utilization_chart, use_container_width=True)
    
    with col2:
        st.subheader("性能指标")
        performance_chart = dashboard.create_performance_metrics_chart()
        st.plotly_chart(performance_chart, use_container_width=True)

def main():
    """主函数"""
    create_streamlit_dashboard()

if __name__ == "__main__":
    main()

最佳实践与建议

1. 治理框架建立

组织架构

  • 建立多云治理委员会,包含技术、财务、安全等部门代表
  • 定义清晰的角色和职责分工
  • 制定多云策略和标准操作程序

政策制定

  • 云服务选择标准和评估流程
  • 数据分类和存储位置要求
  • 安全和合规基线要求
  • 成本控制和预算管理政策

2. 技术实施要点

标准化

  • 统一的资源标记和命名规范
  • 标准化的部署模板和流程
  • 一致的监控和日志格式
  • 统一的身份认证和授权机制

自动化

  • 基础设施即代码(IaC)实践
  • 自动化的合规检查和修复
  • 持续的成本优化和资源调整
  • 自动化的备份和灾难恢复

3. 成本优化策略

预算管理

  • 设置详细的成本预算和告警
  • 定期进行成本审查和分析
  • 建立成本分摊和计费机制
  • 实施成本优化激励措施

资源优化

  • 定期评估资源使用情况
  • 实施自动化的资源调度
  • 优化数据传输和存储策略
  • 合理使用预留实例和Spot实例

总结

多云治理与成本优化是一个持续的过程,需要技术、流程和组织的协同配合。通过建立完善的治理框架、实施有效的成本控制措施、部署统一的监控平台,企业可以充分发挥多云架构的优势,同时控制复杂性和成本。

关键成功因素包括:

  1. 统一的治理框架 - 建立清晰的政策、流程和标准
  2. 自动化工具 - 减少手动操作,提高效率和一致性
  3. 持续监控 - 实时掌握资源状态和成本变化
  4. 优化文化 - 培养成本意识和优化习惯
  5. 技能建设 - 提升团队的多云管理能力

随着云技术的不断发展,多云治理和成本优化的工具和方法也在持续演进。企业需要保持学习和适应,不断完善自己的多云管理体系,以实现业务目标和技术效益的最佳平衡。

分享文章