多云环境下的架构治理与成本优化:构建统一、高效的云管理平台
目录
多云架构概述
多云环境已成为现代企业IT架构的主流选择,它提供了更好的灵活性、可靠性和成本效益。然而,多云环境也带来了复杂的治理和管理挑战。
graph TB
subgraph "多云架构全景图"
subgraph "治理层"
A[统一治理平台]
B[策略管理]
C[合规监控]
end
subgraph "管理层"
D[身份管理]
E[资源编排]
F[成本管理]
G[监控告警]
end
subgraph "云服务提供商"
H[AWS]
I[Azure]
J[GCP]
K[阿里云]
end
subgraph "应用层"
L[Web应用]
M[数据库]
N[存储服务]
O[AI/ML服务]
end
end
A --> D
A --> E
A --> F
A --> G
D --> H
D --> I
D --> J
D --> K
E --> H
E --> I
E --> J
E --> K
H --> L
H --> M
I --> N
J --> O
多云架构分析器
#!/usr/bin/env python3
"""
多云架构分析器
分析和评估多云环境的架构特征和优化建议
"""
import json
import boto3
import azure.identity
import azure.mgmt.resource
from google.cloud import resource_manager
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class CloudProvider:
"""云服务提供商配置"""
name: str
region: str
credentials: Dict[str, str]
services: List[str]
cost_center: str
compliance_requirements: List[str]
@dataclass
class ResourceInventory:
"""资源清单"""
provider: str
region: str
resource_type: str
resource_id: str
name: str
status: str
cost_monthly: float
tags: Dict[str, str]
compliance_status: str
class MultiCloudAnalyzer:
"""多云架构分析器"""
def __init__(self):
self.providers = {}
self.resource_inventory = []
self.cost_data = {}
self.compliance_rules = []
def add_cloud_provider(self, provider: CloudProvider):
"""添加云服务提供商"""
self.providers[provider.name] = provider
logger.info(f"添加云服务提供商: {provider.name}")
def analyze_aws_resources(self, aws_config: Dict[str, str]) -> List[ResourceInventory]:
"""分析AWS资源"""
try:
session = boto3.Session(
aws_access_key_id=aws_config.get('access_key'),
aws_secret_access_key=aws_config.get('secret_key'),
region_name=aws_config.get('region', 'us-east-1')
)
resources = []
# EC2实例
ec2 = session.client('ec2')
instances = ec2.describe_instances()
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
tags = {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}
resources.append(ResourceInventory(
provider="AWS",
region=instance['Placement']['AvailabilityZone'][:-1],
resource_type="EC2",
resource_id=instance['InstanceId'],
name=tags.get('Name', instance['InstanceId']),
status=instance['State']['Name'],
cost_monthly=self._estimate_ec2_cost(instance['InstanceType']),
tags=tags,
compliance_status=self._check_compliance(tags)
))
# RDS实例
rds = session.client('rds')
db_instances = rds.describe_db_instances()
for db in db_instances['DBInstances']:
tags = {tag['Key']: tag['Value'] for tag in db.get('TagList', [])}
resources.append(ResourceInventory(
provider="AWS",
region=db['AvailabilityZone'][:-1] if db.get('AvailabilityZone') else aws_config.get('region'),
resource_type="RDS",
resource_id=db['DBInstanceIdentifier'],
name=db['DBInstanceIdentifier'],
status=db['DBInstanceStatus'],
cost_monthly=self._estimate_rds_cost(db['DBInstanceClass']),
tags=tags,
compliance_status=self._check_compliance(tags)
))
# S3存储桶
s3 = session.client('s3')
buckets = s3.list_buckets()
for bucket in buckets['Buckets']:
try:
tags_response = s3.get_bucket_tagging(Bucket=bucket['Name'])
tags = {tag['Key']: tag['Value'] for tag in tags_response.get('TagSet', [])}
except:
tags = {}
resources.append(ResourceInventory(
provider="AWS",
region=aws_config.get('region'),
resource_type="S3",
resource_id=bucket['Name'],
name=bucket['Name'],
status="Active",
cost_monthly=self._estimate_s3_cost(bucket['Name'], session),
tags=tags,
compliance_status=self._check_compliance(tags)
))
return resources
except Exception as e:
logger.error(f"分析AWS资源失败: {e}")
return []
def analyze_azure_resources(self, azure_config: Dict[str, str]) -> List[ResourceInventory]:
"""分析Azure资源"""
try:
credential = azure.identity.ClientSecretCredential(
tenant_id=azure_config.get('tenant_id'),
client_id=azure_config.get('client_id'),
client_secret=azure_config.get('client_secret')
)
resource_client = azure.mgmt.resource.ResourceManagementClient(
credential, azure_config.get('subscription_id')
)
resources = []
# 获取所有资源组
resource_groups = resource_client.resource_groups.list()
for rg in resource_groups:
# 获取资源组中的资源
rg_resources = resource_client.resources.list_by_resource_group(rg.name)
for resource in rg_resources:
tags = resource.tags or {}
resources.append(ResourceInventory(
provider="Azure",
region=resource.location,
resource_type=resource.type.split('/')[-1],
resource_id=resource.id,
name=resource.name,
status="Active",
cost_monthly=self._estimate_azure_cost(resource.type),
tags=tags,
compliance_status=self._check_compliance(tags)
))
return resources
except Exception as e:
logger.error(f"分析Azure资源失败: {e}")
return []
def analyze_gcp_resources(self, gcp_config: Dict[str, str]) -> List[ResourceInventory]:
"""分析GCP资源"""
try:
# 这里简化处理,实际需要配置GCP认证
resources = []
# 模拟GCP资源分析
sample_resources = [
{
"name": "web-server-instance",
"type": "compute.instances",
"zone": "us-central1-a",
"status": "RUNNING",
"labels": {"env": "production", "team": "backend"}
},
{
"name": "database-instance",
"type": "sql.instances",
"region": "us-central1",
"status": "RUNNABLE",
"labels": {"env": "production", "team": "data"}
}
]
for resource in sample_resources:
resources.append(ResourceInventory(
provider="GCP",
region=resource.get('zone', resource.get('region', 'us-central1')),
resource_type=resource['type'].split('.')[-1],
resource_id=resource['name'],
name=resource['name'],
status=resource['status'],
cost_monthly=self._estimate_gcp_cost(resource['type']),
tags=resource.get('labels', {}),
compliance_status=self._check_compliance(resource.get('labels', {}))
))
return resources
except Exception as e:
logger.error(f"分析GCP资源失败: {e}")
return []
def _estimate_ec2_cost(self, instance_type: str) -> float:
"""估算EC2成本"""
cost_map = {
't3.micro': 8.5,
't3.small': 17.0,
't3.medium': 34.0,
't3.large': 68.0,
'm5.large': 88.0,
'm5.xlarge': 176.0,
'c5.large': 78.0,
'r5.large': 115.0
}
return cost_map.get(instance_type, 50.0)
def _estimate_rds_cost(self, instance_class: str) -> float:
"""估算RDS成本"""
cost_map = {
'db.t3.micro': 15.0,
'db.t3.small': 30.0,
'db.t3.medium': 60.0,
'db.m5.large': 150.0,
'db.r5.large': 200.0
}
return cost_map.get(instance_class, 75.0)
def _estimate_s3_cost(self, bucket_name: str, session) -> float:
"""估算S3成本"""
try:
cloudwatch = session.client('cloudwatch')
# 获取存储使用量指标
response = cloudwatch.get_metric_statistics(
Namespace='AWS/S3',
MetricName='BucketSizeBytes',
Dimensions=[
{'Name': 'BucketName', 'Value': bucket_name},
{'Name': 'StorageType', 'Value': 'StandardStorage'}
],
StartTime=datetime.now() - timedelta(days=1),
EndTime=datetime.now(),
Period=86400,
Statistics=['Average']
)
if response['Datapoints']:
size_bytes = response['Datapoints'][0]['Average']
size_gb = size_bytes / (1024 ** 3)
return size_gb * 0.023 # $0.023 per GB
except:
pass
return 10.0 # 默认估算
def _estimate_azure_cost(self, resource_type: str) -> float:
"""估算Azure成本"""
cost_map = {
'virtualMachines': 80.0,
'sqlDatabases': 120.0,
'storageAccounts': 15.0,
'networkSecurityGroups': 0.0,
'publicIPAddresses': 3.0
}
return cost_map.get(resource_type.split('/')[-1], 25.0)
def _estimate_gcp_cost(self, resource_type: str) -> float:
"""估算GCP成本"""
cost_map = {
'instances': 75.0,
'sql.instances': 110.0,
'storage.buckets': 12.0
}
return cost_map.get(resource_type, 30.0)
def _check_compliance(self, tags: Dict[str, str]) -> str:
"""检查合规性"""
required_tags = ['Environment', 'Owner', 'Project']
missing_tags = [tag for tag in required_tags if tag not in tags]
if not missing_tags:
return "Compliant"
elif len(missing_tags) <= 1:
return "Warning"
else:
return "Non-Compliant"
def generate_multi_cloud_report(self) -> Dict[str, Any]:
"""生成多云分析报告"""
try:
# 按云服务提供商分组
provider_summary = {}
total_cost = 0
compliance_summary = {"Compliant": 0, "Warning": 0, "Non-Compliant": 0}
for resource in self.resource_inventory:
provider = resource.provider
if provider not in provider_summary:
provider_summary[provider] = {
"resource_count": 0,
"total_cost": 0,
"resource_types": set(),
"regions": set()
}
provider_summary[provider]["resource_count"] += 1
provider_summary[provider]["total_cost"] += resource.cost_monthly
provider_summary[provider]["resource_types"].add(resource.resource_type)
provider_summary[provider]["regions"].add(resource.region)
total_cost += resource.cost_monthly
compliance_summary[resource.compliance_status] += 1
# 转换set为list以便JSON序列化
for provider in provider_summary:
provider_summary[provider]["resource_types"] = list(provider_summary[provider]["resource_types"])
provider_summary[provider]["regions"] = list(provider_summary[provider]["regions"])
# 成本分析
cost_analysis = {
"total_monthly_cost": total_cost,
"cost_by_provider": {
provider: data["total_cost"]
for provider, data in provider_summary.items()
},
"cost_by_resource_type": {},
"cost_optimization_opportunities": []
}
# 按资源类型统计成本
for resource in self.resource_inventory:
resource_type = resource.resource_type
if resource_type not in cost_analysis["cost_by_resource_type"]:
cost_analysis["cost_by_resource_type"][resource_type] = 0
cost_analysis["cost_by_resource_type"][resource_type] += resource.cost_monthly
# 识别成本优化机会
for resource in self.resource_inventory:
if resource.status in ['stopped', 'deallocated'] and resource.cost_monthly > 0:
cost_analysis["cost_optimization_opportunities"].append({
"type": "unused_resource",
"resource": f"{resource.provider}:{resource.resource_id}",
"potential_savings": resource.cost_monthly,
"recommendation": "Consider terminating unused resource"
})
return {
"timestamp": datetime.now().isoformat(),
"summary": {
"total_resources": len(self.resource_inventory),
"total_providers": len(provider_summary),
"total_monthly_cost": total_cost,
"compliance_score": compliance_summary["Compliant"] / len(self.resource_inventory) * 100 if self.resource_inventory else 0
},
"provider_summary": provider_summary,
"cost_analysis": cost_analysis,
"compliance_summary": compliance_summary,
"recommendations": self._generate_recommendations()
}
except Exception as e:
logger.error(f"生成多云分析报告失败: {e}")
return {}
def _generate_recommendations(self) -> List[Dict[str, str]]:
"""生成优化建议"""
recommendations = []
# 成本优化建议
if len(self.providers) > 1:
recommendations.append({
"category": "Cost Optimization",
"priority": "High",
"title": "实施多云成本比较",
"description": "定期比较不同云服务提供商的价格,选择最具成本效益的服务"
})
# 合规性建议
non_compliant_count = sum(1 for r in self.resource_inventory if r.compliance_status == "Non-Compliant")
if non_compliant_count > 0:
recommendations.append({
"category": "Compliance",
"priority": "Critical",
"title": "修复合规性问题",
"description": f"有{non_compliant_count}个资源不符合标签要求,需要立即修复"
})
# 架构优化建议
recommendations.append({
"category": "Architecture",
"priority": "Medium",
"title": "实施统一监控",
"description": "建立跨云的统一监控和告警系统"
})
recommendations.append({
"category": "Security",
"priority": "High",
"title": "统一身份管理",
"description": "实施跨云的统一身份和访问管理(IAM)系统"
})
return recommendations
# 使用示例
def main():
"""主函数 - 多云架构分析示例"""
analyzer = MultiCloudAnalyzer()
print("=== 多云架构分析示例 ===")
# 添加云服务提供商
aws_provider = CloudProvider(
name="AWS",
region="us-east-1",
credentials={"access_key": "AKIA...", "secret_key": "xxx"},
services=["EC2", "RDS", "S3", "Lambda"],
cost_center="IT-Infrastructure",
compliance_requirements=["SOC2", "ISO27001"]
)
azure_provider = CloudProvider(
name="Azure",
region="East US",
credentials={"tenant_id": "xxx", "client_id": "xxx", "client_secret": "xxx"},
services=["VirtualMachines", "SQLDatabase", "Storage"],
cost_center="IT-Infrastructure",
compliance_requirements=["SOC2", "GDPR"]
)
analyzer.add_cloud_provider(aws_provider)
analyzer.add_cloud_provider(azure_provider)
# 模拟资源清单
analyzer.resource_inventory = [
ResourceInventory(
provider="AWS",
region="us-east-1",
resource_type="EC2",
resource_id="i-1234567890abcdef0",
name="web-server-1",
status="running",
cost_monthly=88.0,
tags={"Environment": "production", "Owner": "team-a", "Project": "web-app"},
compliance_status="Compliant"
),
ResourceInventory(
provider="Azure",
region="East US",
resource_type="VirtualMachine",
resource_id="/subscriptions/.../vm-web-2",
name="web-server-2",
status="running",
cost_monthly=95.0,
tags={"Environment": "production", "Project": "web-app"},
compliance_status="Warning"
),
ResourceInventory(
provider="AWS",
region="us-west-2",
resource_type="RDS",
resource_id="database-prod",
name="main-database",
status="available",
cost_monthly=150.0,
tags={"Environment": "production", "Owner": "team-b"},
compliance_status="Warning"
)
]
# 生成分析报告
report = analyzer.generate_multi_cloud_report()
print("\n多云架构分析报告:")
print(json.dumps(report, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
多云治理框架
建立统一的多云治理框架是管理复杂多云环境的关键。
治理策略管理器
#!/usr/bin/env python3
"""
多云治理策略管理器
提供统一的策略定义、执行和监控能力
"""
import json
import yaml
import re
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PolicyType(Enum):
"""策略类型"""
SECURITY = "security"
COST = "cost"
COMPLIANCE = "compliance"
RESOURCE = "resource"
NETWORK = "network"
class PolicySeverity(Enum):
"""策略严重性"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class GovernancePolicy:
"""治理策略"""
id: str
name: str
description: str
policy_type: PolicyType
severity: PolicySeverity
conditions: List[Dict[str, Any]]
actions: List[Dict[str, Any]]
enabled: bool = True
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
@dataclass
class PolicyViolation:
"""策略违规"""
policy_id: str
resource_id: str
provider: str
violation_type: str
description: str
severity: PolicySeverity
detected_at: datetime
resolved: bool = False
class MultiCloudGovernanceManager:
"""多云治理管理器"""
def __init__(self):
self.policies = {}
self.violations = []
self.policy_templates = self._load_policy_templates()
def _load_policy_templates(self) -> Dict[str, GovernancePolicy]:
"""加载策略模板"""
templates = {}
# 安全策略模板
templates['security_sg_open'] = GovernancePolicy(
id="security_sg_open",
name="禁止开放安全组",
description="检测并阻止创建允许0.0.0.0/0访问的安全组",
policy_type=PolicyType.SECURITY,
severity=PolicySeverity.CRITICAL,
conditions=[
{
"resource_type": "SecurityGroup",
"field": "ingress_rules",
"operator": "contains",
"value": "0.0.0.0/0"
}
],
actions=[
{
"type": "block",
"message": "不允许创建开放的安全组规则"
},
{
"type": "notify",
"channels": ["email", "slack"],
"recipients": ["security-team@company.com"]
}
]
)
# 成本策略模板
templates['cost_large_instance'] = GovernancePolicy(
id="cost_large_instance",
name="大型实例成本控制",
description="监控和控制大型实例的创建",
policy_type=PolicyType.COST,
severity=PolicySeverity.HIGH,
conditions=[
{
"resource_type": "VirtualMachine",
"field": "instance_size",
"operator": "in",
"value": ["xlarge", "2xlarge", "4xlarge"]
}
],
actions=[
{
"type": "require_approval",
"approvers": ["cost-center-manager"]
},
{
"type": "notify",
"channels": ["email"],
"recipients": ["finance-team@company.com"]
}
]
)
# 合规策略模板
templates['compliance_tagging'] = GovernancePolicy(
id="compliance_tagging",
name="强制资源标签",
description="确保所有资源都有必需的标签",
policy_type=PolicyType.COMPLIANCE,
severity=PolicySeverity.MEDIUM,
conditions=[
{
"resource_type": "*",
"field": "tags",
"operator": "missing",
"value": ["Environment", "Owner", "Project", "CostCenter"]
}
],
actions=[
{
"type": "auto_tag",
"default_tags": {
"Environment": "unspecified",
"Owner": "unknown",
"Project": "unassigned",
"CostCenter": "default"
}
},
{
"type": "notify",
"channels": ["email"],
"recipients": ["governance-team@company.com"]
}
]
)
# 资源策略模板
templates['resource_lifecycle'] = GovernancePolicy(
id="resource_lifecycle",
name="资源生命周期管理",
description="自动管理资源的生命周期",
policy_type=PolicyType.RESOURCE,
severity=PolicySeverity.MEDIUM,
conditions=[
{
"resource_type": "*",
"field": "age_days",
"operator": "greater_than",
"value": 90
},
{
"resource_type": "*",
"field": "utilization",
"operator": "less_than",
"value": 10
}
],
actions=[
{
"type": "schedule_deletion",
"grace_period_days": 7
},
{
"type": "notify",
"channels": ["email"],
"recipients": ["resource-owners"]
}
]
)
return templates
def create_policy(self, policy: GovernancePolicy) -> bool:
"""创建治理策略"""
try:
policy.created_at = datetime.now()
policy.updated_at = datetime.now()
self.policies[policy.id] = policy
logger.info(f"策略创建成功: {policy.name}")
return True
except Exception as e:
logger.error(f"创建策略失败: {e}")
return False
def update_policy(self, policy_id: str, updates: Dict[str, Any]) -> bool:
"""更新治理策略"""
try:
if policy_id not in self.policies:
logger.error(f"策略不存在: {policy_id}")
return False
policy = self.policies[policy_id]
for key, value in updates.items():
if hasattr(policy, key):
setattr(policy, key, value)
policy.updated_at = datetime.now()
logger.info(f"策略更新成功: {policy.name}")
return True
except Exception as e:
logger.error(f"更新策略失败: {e}")
return False
def evaluate_resource(self, resource: ResourceInventory) -> List[PolicyViolation]:
"""评估资源是否违反策略"""
violations = []
try:
for policy in self.policies.values():
if not policy.enabled:
continue
if self._check_policy_conditions(resource, policy):
violation = PolicyViolation(
policy_id=policy.id,
resource_id=resource.resource_id,
provider=resource.provider,
violation_type=policy.policy_type.value,
description=f"资源违反策略: {policy.name}",
severity=policy.severity,
detected_at=datetime.now()
)
violations.append(violation)
# 执行策略动作
self._execute_policy_actions(resource, policy, violation)
return violations
except Exception as e:
logger.error(f"评估资源策略失败: {e}")
return []
def _check_policy_conditions(self, resource: ResourceInventory,
policy: GovernancePolicy) -> bool:
"""检查策略条件"""
try:
for condition in policy.conditions:
resource_type = condition.get('resource_type', '*')
# 检查资源类型匹配
if resource_type != '*' and resource_type != resource.resource_type:
continue
field = condition.get('field')
operator = condition.get('operator')
value = condition.get('value')
# 获取资源字段值
resource_value = self._get_resource_field_value(resource, field)
# 执行条件检查
if not self._evaluate_condition(resource_value, operator, value):
return False
return True
except Exception as e:
logger.error(f"检查策略条件失败: {e}")
return False
def _get_resource_field_value(self, resource: ResourceInventory, field: str) -> Any:
"""获取资源字段值"""
if field == "tags":
return resource.tags
elif field == "status":
return resource.status
elif field == "cost_monthly":
return resource.cost_monthly
elif field == "age_days":
# 模拟计算资源年龄
return 30 # 默认30天
elif field == "utilization":
# 模拟资源利用率
return 15 # 默认15%
else:
return getattr(resource, field, None)
def _evaluate_condition(self, resource_value: Any, operator: str, condition_value: Any) -> bool:
"""评估条件"""
try:
if operator == "equals":
return resource_value == condition_value
elif operator == "not_equals":
return resource_value != condition_value
elif operator == "contains":
return condition_value in str(resource_value)
elif operator == "missing":
if isinstance(resource_value, dict) and isinstance(condition_value, list):
return any(tag not in resource_value for tag in condition_value)
return resource_value is None
elif operator == "greater_than":
return float(resource_value) > float(condition_value)
elif operator == "less_than":
return float(resource_value) < float(condition_value)
elif operator == "in":
return resource_value in condition_value
else:
return False
except Exception as e:
logger.error(f"评估条件失败: {e}")
return False
def _execute_policy_actions(self, resource: ResourceInventory,
policy: GovernancePolicy, violation: PolicyViolation):
"""执行策略动作"""
try:
for action in policy.actions:
action_type = action.get('type')
if action_type == "block":
logger.warning(f"阻止操作: {action.get('message', '策略违规')}")
elif action_type == "notify":
self._send_notification(action, violation)
elif action_type == "auto_tag":
self._auto_tag_resource(resource, action.get('default_tags', {}))
elif action_type == "require_approval":
self._require_approval(resource, action.get('approvers', []))
elif action_type == "schedule_deletion":
grace_period = action.get('grace_period_days', 7)
self._schedule_resource_deletion(resource, grace_period)
except Exception as e:
logger.error(f"执行策略动作失败: {e}")
def _send_notification(self, action: Dict[str, Any], violation: PolicyViolation):
"""发送通知"""
channels = action.get('channels', [])
recipients = action.get('recipients', [])
message = f"策略违规检测: {violation.description}"
for channel in channels:
for recipient in recipients:
logger.info(f"发送{channel}通知到{recipient}: {message}")
def _auto_tag_resource(self, resource: ResourceInventory, default_tags: Dict[str, str]):
"""自动标签资源"""
for tag_key, tag_value in default_tags.items():
if tag_key not in resource.tags:
resource.tags[tag_key] = tag_value
logger.info(f"自动添加标签 {tag_key}={tag_value} 到资源 {resource.resource_id}")
def _require_approval(self, resource: ResourceInventory, approvers: List[str]):
"""需要审批"""
logger.info(f"资源 {resource.resource_id} 需要以下人员审批: {', '.join(approvers)}")
def _schedule_resource_deletion(self, resource: ResourceInventory, grace_period_days: int):
"""计划删除资源"""
deletion_date = datetime.now() + timedelta(days=grace_period_days)
logger.info(f"资源 {resource.resource_id} 计划于 {deletion_date} 删除")
def generate_governance_report(self) -> Dict[str, Any]:
"""生成治理报告"""
try:
# 策略统计
policy_stats = {
"total_policies": len(self.policies),
"enabled_policies": len([p for p in self.policies.values() if p.enabled]),
"policies_by_type": {},
"policies_by_severity": {}
}
for policy in self.policies.values():
policy_type = policy.policy_type.value
severity = policy.severity.value
policy_stats["policies_by_type"][policy_type] = policy_stats["policies_by_type"].get(policy_type, 0) + 1
policy_stats["policies_by_severity"][severity] = policy_stats["policies_by_severity"].get(severity, 0) + 1
# 违规统计
violation_stats = {
"total_violations": len(self.violations),
"unresolved_violations": len([v for v in self.violations if not v.resolved]),
"violations_by_type": {},
"violations_by_severity": {},
"violations_by_provider": {}
}
for violation in self.violations:
violation_type = violation.violation_type
severity = violation.severity.value
provider = violation.provider
violation_stats["violations_by_type"][violation_type] = violation_stats["violations_by_type"].get(violation_type, 0) + 1
violation_stats["violations_by_severity"][severity] = violation_stats["violations_by_severity"].get(severity, 0) + 1
violation_stats["violations_by_provider"][provider] = violation_stats["violations_by_provider"].get(provider, 0) + 1
return {
"timestamp": datetime.now().isoformat(),
"policy_statistics": policy_stats,
"violation_statistics": violation_stats,
"compliance_score": self._calculate_compliance_score(),
"recommendations": self._generate_governance_recommendations()
}
except Exception as e:
logger.error(f"生成治理报告失败: {e}")
return {}
def _calculate_compliance_score(self) -> float:
"""计算合规分数"""
if not self.violations:
return 100.0
total_violations = len(self.violations)
resolved_violations = len([v for v in self.violations if v.resolved])
return (resolved_violations / total_violations) * 100
def _generate_governance_recommendations(self) -> List[Dict[str, str]]:
"""生成治理建议"""
recommendations = []
unresolved_critical = len([v for v in self.violations
if not v.resolved and v.severity == PolicySeverity.CRITICAL])
if unresolved_critical > 0:
recommendations.append({
"priority": "Critical",
"title": "立即处理关键违规",
"description": f"有{unresolved_critical}个关键违规需要立即处理"
})
if len(self.policies) < 5:
recommendations.append({
"priority": "Medium",
"title": "扩展策略覆盖",
"description": "建议添加更多治理策略以提高覆盖率"
})
return recommendations
# 使用示例
def main():
"""主函数 - 多云治理管理示例"""
governance = MultiCloudGovernanceManager()
print("=== 多云治理管理示例 ===")
# 创建策略
for template_id, template in governance.policy_templates.items():
governance.create_policy(template)
# 模拟策略违规
governance.violations = [
PolicyViolation(
policy_id="security_sg_open",
resource_id="sg-12345",
provider="AWS",
violation_type="security",
description="安全组允许0.0.0.0/0访问",
severity=PolicySeverity.CRITICAL,
detected_at=datetime.now() - timedelta(hours=2)
),
PolicyViolation(
policy_id="compliance_tagging",
resource_id="vm-67890",
provider="Azure",
violation_type="compliance",
description="缺少必需标签",
severity=PolicySeverity.MEDIUM,
detected_at=datetime.now() - timedelta(hours=1)
)
]
# 生成治理报告
report = governance.generate_governance_report()
print("\n治理报告:")
print(json.dumps(report, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
统一身份与访问管理
多云环境需要统一的身份和访问管理系统来确保安全性和一致性。
多云IAM管理器
#!/usr/bin/env python3
"""
多云统一身份与访问管理器
提供跨云平台的统一身份认证和权限管理
"""
import json
import jwt
import hashlib
import secrets
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PermissionLevel(Enum):
"""权限级别"""
READ = "read"
WRITE = "write"
ADMIN = "admin"
OWNER = "owner"
class ResourceType(Enum):
"""资源类型"""
COMPUTE = "compute"
STORAGE = "storage"
DATABASE = "database"
NETWORK = "network"
SECURITY = "security"
ALL = "*"
@dataclass
class User:
"""用户"""
id: str
username: str
email: str
full_name: str
department: str
role: str
active: bool = True
created_at: Optional[datetime] = None
last_login: Optional[datetime] = None
@dataclass
class Role:
"""角色"""
id: str
name: str
description: str
permissions: List[Dict[str, Any]]
cloud_providers: List[str]
created_at: Optional[datetime] = None
@dataclass
class Permission:
"""权限"""
resource_type: ResourceType
permission_level: PermissionLevel
cloud_provider: str
region: Optional[str] = None
resource_filter: Optional[Dict[str, str]] = None
@dataclass
class AccessToken:
"""访问令牌"""
token: str
user_id: str
expires_at: datetime
scopes: List[str]
cloud_providers: List[str]
class MultiCloudIAMManager:
"""多云IAM管理器"""
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.users = {}
self.roles = {}
self.user_roles = {} # user_id -> [role_ids]
self.active_tokens = {}
self.audit_logs = []
# 初始化默认角色
self._initialize_default_roles()
def _initialize_default_roles(self):
"""初始化默认角色"""
# 只读角色
readonly_role = Role(
id="readonly",
name="只读用户",
description="只能查看资源,无法修改",
permissions=[
{
"resource_type": ResourceType.ALL.value,
"permission_level": PermissionLevel.READ.value,
"cloud_providers": ["AWS", "Azure", "GCP"]
}
],
cloud_providers=["AWS", "Azure", "GCP"],
created_at=datetime.now()
)
# 开发者角色
developer_role = Role(
id="developer",
name="开发者",
description="可以管理开发环境资源",
permissions=[
{
"resource_type": ResourceType.COMPUTE.value,
"permission_level": PermissionLevel.WRITE.value,
"cloud_providers": ["AWS", "Azure", "GCP"],
"resource_filter": {"Environment": "development"}
},
{
"resource_type": ResourceType.STORAGE.value,
"permission_level": PermissionLevel.WRITE.value,
"cloud_providers": ["AWS", "Azure", "GCP"],
"resource_filter": {"Environment": "development"}
}
],
cloud_providers=["AWS", "Azure", "GCP"],
created_at=datetime.now()
)
# 运维角色
ops_role = Role(
id="ops",
name="运维工程师",
description="可以管理生产环境资源",
permissions=[
{
"resource_type": ResourceType.ALL.value,
"permission_level": PermissionLevel.ADMIN.value,
"cloud_providers": ["AWS", "Azure", "GCP"]
}
],
cloud_providers=["AWS", "Azure", "GCP"],
created_at=datetime.now()
)
# 安全管理员角色
security_role = Role(
id="security_admin",
name="安全管理员",
description="管理安全相关资源和策略",
permissions=[
{
"resource_type": ResourceType.SECURITY.value,
"permission_level": PermissionLevel.OWNER.value,
"cloud_providers": ["AWS", "Azure", "GCP"]
},
{
"resource_type": ResourceType.NETWORK.value,
"permission_level": PermissionLevel.ADMIN.value,
"cloud_providers": ["AWS", "Azure", "GCP"]
}
],
cloud_providers=["AWS", "Azure", "GCP"],
created_at=datetime.now()
)
self.roles = {
"readonly": readonly_role,
"developer": developer_role,
"ops": ops_role,
"security_admin": security_role
}
def create_user(self, user: User) -> bool:
"""创建用户"""
try:
user.created_at = datetime.now()
self.users[user.id] = user
self.user_roles[user.id] = []
self._log_audit_event("USER_CREATED", user.id, {"username": user.username})
logger.info(f"用户创建成功: {user.username}")
return True
except Exception as e:
logger.error(f"创建用户失败: {e}")
return False
def assign_role(self, user_id: str, role_id: str) -> bool:
"""分配角色"""
try:
if user_id not in self.users:
logger.error(f"用户不存在: {user_id}")
return False
if role_id not in self.roles:
logger.error(f"角色不存在: {role_id}")
return False
if role_id not in self.user_roles[user_id]:
self.user_roles[user_id].append(role_id)
self._log_audit_event("ROLE_ASSIGNED", user_id, {
"role_id": role_id,
"role_name": self.roles[role_id].name
})
logger.info(f"角色分配成功: {user_id} -> {role_id}")
return True
return True
except Exception as e:
logger.error(f"分配角色失败: {e}")
return False
def authenticate_user(self, username: str, password: str) -> Optional[AccessToken]:
"""用户认证"""
try:
# 查找用户
user = None
for u in self.users.values():
if u.username == username and u.active:
user = u
break
if not user:
logger.warning(f"用户认证失败: {username}")
return None
# 简化密码验证(实际应用中应使用安全的密码哈希)
# 这里假设密码验证通过
# 获取用户权限范围
scopes = []
cloud_providers = set()
for role_id in self.user_roles[user.id]:
role = self.roles[role_id]
for permission in role.permissions:
scopes.append(f"{permission['resource_type']}:{permission['permission_level']}")
cloud_providers.update(role.cloud_providers)
# 生成JWT令牌
payload = {
"user_id": user.id,
"username": user.username,
"scopes": scopes,
"cloud_providers": list(cloud_providers),
"iat": datetime.now(),
"exp": datetime.now() + timedelta(hours=8)
}
token = jwt.encode(payload, self.secret_key, algorithm="HS256")
access_token = AccessToken(
token=token,
user_id=user.id,
expires_at=payload["exp"],
scopes=scopes,
cloud_providers=list(cloud_providers)
)
self.active_tokens[token] = access_token
user.last_login = datetime.now()
self._log_audit_event("USER_LOGIN", user.id, {"username": username})
logger.info(f"用户认证成功: {username}")
return access_token
except Exception as e:
logger.error(f"用户认证失败: {e}")
return None
def validate_token(self, token: str) -> Optional[AccessToken]:
"""验证访问令牌"""
try:
if token not in self.active_tokens:
return None
access_token = self.active_tokens[token]
# 检查令牌是否过期
if datetime.now() > access_token.expires_at:
del self.active_tokens[token]
return None
# 验证JWT签名
payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
return access_token
except jwt.InvalidTokenError:
logger.warning("无效的访问令牌")
return None
except Exception as e:
logger.error(f"验证令牌失败: {e}")
return None
def check_permission(self, token: str, resource_type: str,
permission_level: str, cloud_provider: str,
resource_tags: Optional[Dict[str, str]] = None) -> bool:
"""检查权限"""
try:
access_token = self.validate_token(token)
if not access_token:
return False
# 检查云服务提供商权限
if cloud_provider not in access_token.cloud_providers:
return False
# 检查资源类型和权限级别
required_scope = f"{resource_type}:{permission_level}"
all_scope = f"*:{permission_level}"
admin_scope = f"{resource_type}:admin"
owner_scope = f"{resource_type}:owner"
has_permission = (
required_scope in access_token.scopes or
all_scope in access_token.scopes or
admin_scope in access_token.scopes or
owner_scope in access_token.scopes
)
if not has_permission:
return False
# 检查资源过滤器
user_roles = self.user_roles.get(access_token.user_id, [])
for role_id in user_roles:
role = self.roles[role_id]
for permission in role.permissions:
if (permission['resource_type'] in [resource_type, '*'] and
cloud_provider in permission.get('cloud_providers', [])):
resource_filter = permission.get('resource_filter')
if resource_filter and resource_tags:
# 检查资源标签是否匹配过滤器
for filter_key, filter_value in resource_filter.items():
if resource_tags.get(filter_key) != filter_value:
continue
return True
return has_permission
except Exception as e:
logger.error(f"检查权限失败: {e}")
return False
def revoke_token(self, token: str) -> bool:
"""撤销访问令牌"""
try:
if token in self.active_tokens:
access_token = self.active_tokens[token]
del self.active_tokens[token]
self._log_audit_event("TOKEN_REVOKED", access_token.user_id, {"token": token[:10] + "..."})
logger.info("访问令牌已撤销")
return True
return False
except Exception as e:
logger.error(f"撤销令牌失败: {e}")
return False
def _log_audit_event(self, event_type: str, user_id: str, details: Dict[str, Any]):
"""记录审计事件"""
audit_event = {
"timestamp": datetime.now().isoformat(),
"event_type": event_type,
"user_id": user_id,
"details": details
}
self.audit_logs.append(audit_event)
logger.info(f"审计事件: {event_type} - {user_id}")
def generate_access_report(self) -> Dict[str, Any]:
"""生成访问报告"""
try:
# 用户统计
user_stats = {
"total_users": len(self.users),
"active_users": len([u for u in self.users.values() if u.active]),
"users_by_department": {},
"users_by_role": {}
}
for user in self.users.values():
dept = user.department
user_stats["users_by_department"][dept] = user_stats["users_by_department"].get(dept, 0) + 1
for role_id in self.user_roles.get(user.id, []):
role_name = self.roles[role_id].name
user_stats["users_by_role"][role_name] = user_stats["users_by_role"].get(role_name, 0) + 1
# 令牌统计
token_stats = {
"active_tokens": len(self.active_tokens),
"expired_tokens": 0,
"tokens_by_user": {}
}
for token, access_token in self.active_tokens.items():
user_id = access_token.user_id
token_stats["tokens_by_user"][user_id] = token_stats["tokens_by_user"].get(user_id, 0) + 1
# 审计统计
audit_stats = {
"total_events": len(self.audit_logs),
"events_by_type": {},
"recent_events": self.audit_logs[-10:] if self.audit_logs else []
}
for event in self.audit_logs:
event_type = event["event_type"]
audit_stats["events_by_type"][event_type] = audit_stats["events_by_type"].get(event_type, 0) + 1
return {
"timestamp": datetime.now().isoformat(),
"user_statistics": user_stats,
"token_statistics": token_stats,
"audit_statistics": audit_stats,
"security_recommendations": self._generate_security_recommendations()
}
except Exception as e:
logger.error(f"生成访问报告失败: {e}")
return {}
def _generate_security_recommendations(self) -> List[Dict[str, str]]:
"""生成安全建议"""
recommendations = []
# 检查长期未登录用户
inactive_users = []
for user in self.users.values():
if user.last_login and (datetime.now() - user.last_login).days > 90:
inactive_users.append(user.username)
if inactive_users:
recommendations.append({
"priority": "Medium",
"title": "清理非活跃用户",
"description": f"有{len(inactive_users)}个用户超过90天未登录,建议审查并禁用"
})
# 检查过期令牌
expired_tokens = [token for token, access_token in self.active_tokens.items()
if datetime.now() > access_token.expires_at]
if expired_tokens:
recommendations.append({
"priority": "High",
"title": "清理过期令牌",
"description": f"有{len(expired_tokens)}个过期令牌需要清理"
})
# 检查权限过度分配
admin_users = []
for user_id, role_ids in self.user_roles.items():
for role_id in role_ids:
role = self.roles[role_id]
for permission in role.permissions:
if permission.get('permission_level') in ['admin', 'owner']:
admin_users.append(self.users[user_id].username)
break
if len(admin_users) > len(self.users) * 0.2: # 超过20%的用户有管理员权限
recommendations.append({
"priority": "High",
"title": "审查管理员权限",
"description": "管理员权限用户比例过高,建议审查权限分配"
})
return recommendations
# 使用示例
def main():
"""主函数 - 多云IAM管理示例"""
iam_manager = MultiCloudIAMManager(secret_key="your-secret-key-here")
print("=== 多云IAM管理示例 ===")
# 创建用户
users = [
User(
id="user1",
username="john.doe",
email="john.doe@company.com",
full_name="John Doe",
department="Engineering",
role="Senior Developer"
),
User(
id="user2",
username="jane.smith",
email="jane.smith@company.com",
full_name="Jane Smith",
department="Operations",
role="DevOps Engineer"
),
User(
id="user3",
username="bob.wilson",
email="bob.wilson@company.com",
full_name="Bob Wilson",
department="Security",
role="Security Engineer"
)
]
for user in users:
iam_manager.create_user(user)
# 分配角色
iam_manager.assign_role("user1", "developer")
iam_manager.assign_role("user2", "ops")
iam_manager.assign_role("user3", "security_admin")
# 用户认证
token = iam_manager.authenticate_user("john.doe", "password123")
if token:
print(f"\n认证成功,令牌: {token.token[:50]}...")
# 权限检查
has_permission = iam_manager.check_permission(
token.token,
"compute",
"write",
"AWS",
{"Environment": "development"}
)
print(f"开发环境计算资源写权限: {has_permission}")
has_permission = iam_manager.check_permission(
token.token,
"compute",
"write",
"AWS",
{"Environment": "production"}
)
print(f"生产环境计算资源写权限: {has_permission}")
# 生成访问报告
report = iam_manager.generate_access_report()
print("\n访问管理报告:")
print(json.dumps(report, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
资源管理与编排
多云环境需要统一的资源管理和编排能力,以实现跨云的资源协调和自动化部署。
多云资源编排器
#!/bin/bash
# 多云资源编排部署脚本
# 提供跨云平台的统一资源部署和管理能力
set -e
# 配置变量
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
CONFIG_DIR="${SCRIPT_DIR}/config"
TERRAFORM_DIR="${SCRIPT_DIR}/terraform"
ANSIBLE_DIR="${SCRIPT_DIR}/ansible"
# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# 日志函数
log_info() {
echo -e "${GREEN}[INFO]${NC} $1"
}
log_warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
log_debug() {
echo -e "${BLUE}[DEBUG]${NC} $1"
}
# 检查依赖
check_dependencies() {
log_info "检查依赖工具..."
local deps=("terraform" "ansible" "kubectl" "aws" "az" "gcloud")
local missing_deps=()
for dep in "${deps[@]}"; do
if ! command -v "$dep" &> /dev/null; then
missing_deps+=("$dep")
fi
done
if [ ${#missing_deps[@]} -ne 0 ]; then
log_error "缺少以下依赖工具: ${missing_deps[*]}"
log_info "请安装缺少的工具后重试"
exit 1
fi
log_info "所有依赖工具已安装"
}
# 初始化Terraform配置
init_terraform() {
log_info "初始化Terraform配置..."
mkdir -p "$TERRAFORM_DIR"
# 创建主配置文件
cat > "$TERRAFORM_DIR/main.tf" << 'EOF'
# 多云资源编排主配置
terraform {
required_version = ">= 1.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
azurerm = {
source = "hashicorp/azurerm"
version = "~> 3.0"
}
google = {
source = "hashicorp/google"
version = "~> 4.0"
}
}
backend "s3" {
# 配置远程状态存储
bucket = "terraform-state-bucket"
key = "multi-cloud/terraform.tfstate"
region = "us-east-1"
}
}
# AWS Provider
provider "aws" {
region = var.aws_region
default_tags {
tags = var.common_tags
}
}
# Azure Provider
provider "azurerm" {
features {}
subscription_id = var.azure_subscription_id
}
# Google Cloud Provider
provider "google" {
project = var.gcp_project_id
region = var.gcp_region
}
# 变量定义
variable "aws_region" {
description = "AWS region"
type = string
default = "us-east-1"
}
variable "azure_subscription_id" {
description = "Azure subscription ID"
type = string
}
variable "gcp_project_id" {
description = "GCP project ID"
type = string
}
variable "gcp_region" {
description = "GCP region"
type = string
default = "us-central1"
}
variable "common_tags" {
description = "Common tags for all resources"
type = map(string)
default = {
Environment = "production"
Project = "multi-cloud"
ManagedBy = "terraform"
}
}
variable "environment" {
description = "Environment name"
type = string
default = "production"
}
EOF
# 创建AWS资源配置
cat > "$TERRAFORM_DIR/aws.tf" << 'EOF'
# AWS 资源配置
# VPC
resource "aws_vpc" "main" {
cidr_block = "10.0.0.0/16"
enable_dns_hostnames = true
enable_dns_support = true
tags = merge(var.common_tags, {
Name = "${var.environment}-vpc"
})
}
# 子网
resource "aws_subnet" "public" {
count = 2
vpc_id = aws_vpc.main.id
cidr_block = "10.0.${count.index + 1}.0/24"
availability_zone = data.aws_availability_zones.available.names[count.index]
map_public_ip_on_launch = true
tags = merge(var.common_tags, {
Name = "${var.environment}-public-subnet-${count.index + 1}"
Type = "public"
})
}
resource "aws_subnet" "private" {
count = 2
vpc_id = aws_vpc.main.id
cidr_block = "10.0.${count.index + 10}.0/24"
availability_zone = data.aws_availability_zones.available.names[count.index]
tags = merge(var.common_tags, {
Name = "${var.environment}-private-subnet-${count.index + 1}"
Type = "private"
})
}
# 互联网网关
resource "aws_internet_gateway" "main" {
vpc_id = aws_vpc.main.id
tags = merge(var.common_tags, {
Name = "${var.environment}-igw"
})
}
# NAT网关
resource "aws_eip" "nat" {
count = 2
domain = "vpc"
tags = merge(var.common_tags, {
Name = "${var.environment}-nat-eip-${count.index + 1}"
})
}
resource "aws_nat_gateway" "main" {
count = 2
allocation_id = aws_eip.nat[count.index].id
subnet_id = aws_subnet.public[count.index].id
tags = merge(var.common_tags, {
Name = "${var.environment}-nat-${count.index + 1}"
})
depends_on = [aws_internet_gateway.main]
}
# 路由表
resource "aws_route_table" "public" {
vpc_id = aws_vpc.main.id
route {
cidr_block = "0.0.0.0/0"
gateway_id = aws_internet_gateway.main.id
}
tags = merge(var.common_tags, {
Name = "${var.environment}-public-rt"
})
}
resource "aws_route_table" "private" {
count = 2
vpc_id = aws_vpc.main.id
route {
cidr_block = "0.0.0.0/0"
nat_gateway_id = aws_nat_gateway.main[count.index].id
}
tags = merge(var.common_tags, {
Name = "${var.environment}-private-rt-${count.index + 1}"
})
}
# 路由表关联
resource "aws_route_table_association" "public" {
count = 2
subnet_id = aws_subnet.public[count.index].id
route_table_id = aws_route_table.public.id
}
resource "aws_route_table_association" "private" {
count = 2
subnet_id = aws_subnet.private[count.index].id
route_table_id = aws_route_table.private[count.index].id
}
# 安全组
resource "aws_security_group" "web" {
name_prefix = "${var.environment}-web-"
vpc_id = aws_vpc.main.id
ingress {
from_port = 80
to_port = 80
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
ingress {
from_port = 443
to_port = 443
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
tags = merge(var.common_tags, {
Name = "${var.environment}-web-sg"
})
}
# EKS集群
resource "aws_eks_cluster" "main" {
name = "${var.environment}-eks-cluster"
role_arn = aws_iam_role.eks_cluster.arn
version = "1.28"
vpc_config {
subnet_ids = concat(aws_subnet.public[*].id, aws_subnet.private[*].id)
endpoint_private_access = true
endpoint_public_access = true
}
enabled_cluster_log_types = ["api", "audit", "authenticator", "controllerManager", "scheduler"]
tags = var.common_tags
depends_on = [
aws_iam_role_policy_attachment.eks_cluster_policy,
aws_iam_role_policy_attachment.eks_vpc_resource_controller,
]
}
# EKS节点组
resource "aws_eks_node_group" "main" {
cluster_name = aws_eks_cluster.main.name
node_group_name = "${var.environment}-eks-nodes"
node_role_arn = aws_iam_role.eks_node_group.arn
subnet_ids = aws_subnet.private[*].id
instance_types = ["t3.medium"]
scaling_config {
desired_size = 2
max_size = 4
min_size = 1
}
update_config {
max_unavailable = 1
}
tags = var.common_tags
depends_on = [
aws_iam_role_policy_attachment.eks_worker_node_policy,
aws_iam_role_policy_attachment.eks_cni_policy,
aws_iam_role_policy_attachment.eks_container_registry_policy,
]
}
# 数据源
data "aws_availability_zones" "available" {
state = "available"
}
# IAM角色
resource "aws_iam_role" "eks_cluster" {
name = "${var.environment}-eks-cluster-role"
assume_role_policy = jsonencode({
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "eks.amazonaws.com"
}
}]
Version = "2012-10-17"
})
}
resource "aws_iam_role_policy_attachment" "eks_cluster_policy" {
policy_arn = "arn:aws:iam::aws:policy/AmazonEKSClusterPolicy"
role = aws_iam_role.eks_cluster.name
}
resource "aws_iam_role_policy_attachment" "eks_vpc_resource_controller" {
policy_arn = "arn:aws:iam::aws:policy/AmazonEKSVPCResourceController"
role = aws_iam_role.eks_cluster.name
}
resource "aws_iam_role" "eks_node_group" {
name = "${var.environment}-eks-node-group-role"
assume_role_policy = jsonencode({
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "ec2.amazonaws.com"
}
}]
Version = "2012-10-17"
})
}
resource "aws_iam_role_policy_attachment" "eks_worker_node_policy" {
policy_arn = "arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy"
role = aws_iam_role.eks_node_group.name
}
resource "aws_iam_role_policy_attachment" "eks_cni_policy" {
policy_arn = "arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy"
role = aws_iam_role.eks_node_group.name
}
resource "aws_iam_role_policy_attachment" "eks_container_registry_policy" {
policy_arn = "arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly"
role = aws_iam_role.eks_node_group.name
}
# 输出
output "aws_vpc_id" {
value = aws_vpc.main.id
}
output "aws_eks_cluster_name" {
value = aws_eks_cluster.main.name
}
output "aws_eks_cluster_endpoint" {
value = aws_eks_cluster.main.endpoint
}
EOF
# 创建Azure资源配置
cat > "$TERRAFORM_DIR/azure.tf" << 'EOF'
# Azure 资源配置
# 资源组
resource "azurerm_resource_group" "main" {
name = "${var.environment}-rg"
location = "East US"
tags = var.common_tags
}
# 虚拟网络
resource "azurerm_virtual_network" "main" {
name = "${var.environment}-vnet"
address_space = ["10.1.0.0/16"]
location = azurerm_resource_group.main.location
resource_group_name = azurerm_resource_group.main.name
tags = var.common_tags
}
# 子网
resource "azurerm_subnet" "public" {
name = "${var.environment}-public-subnet"
resource_group_name = azurerm_resource_group.main.name
virtual_network_name = azurerm_virtual_network.main.name
address_prefixes = ["10.1.1.0/24"]
}
resource "azurerm_subnet" "private" {
name = "${var.environment}-private-subnet"
resource_group_name = azurerm_resource_group.main.name
virtual_network_name = azurerm_virtual_network.main.name
address_prefixes = ["10.1.2.0/24"]
}
# 网络安全组
resource "azurerm_network_security_group" "web" {
name = "${var.environment}-web-nsg"
location = azurerm_resource_group.main.location
resource_group_name = azurerm_resource_group.main.name
security_rule {
name = "HTTP"
priority = 1001
direction = "Inbound"
access = "Allow"
protocol = "Tcp"
source_port_range = "*"
destination_port_range = "80"
source_address_prefix = "*"
destination_address_prefix = "*"
}
security_rule {
name = "HTTPS"
priority = 1002
direction = "Inbound"
access = "Allow"
protocol = "Tcp"
source_port_range = "*"
destination_port_range = "443"
source_address_prefix = "*"
destination_address_prefix = "*"
}
tags = var.common_tags
}
# AKS集群
resource "azurerm_kubernetes_cluster" "main" {
name = "${var.environment}-aks-cluster"
location = azurerm_resource_group.main.location
resource_group_name = azurerm_resource_group.main.name
dns_prefix = "${var.environment}-aks"
default_node_pool {
name = "default"
node_count = 2
vm_size = "Standard_D2_v2"
vnet_subnet_id = azurerm_subnet.private.id
}
identity {
type = "SystemAssigned"
}
network_profile {
network_plugin = "azure"
network_policy = "azure"
}
tags = var.common_tags
}
# 输出
output "azure_resource_group_name" {
value = azurerm_resource_group.main.name
}
output "azure_aks_cluster_name" {
value = azurerm_kubernetes_cluster.main.name
}
output "azure_aks_cluster_fqdn" {
value = azurerm_kubernetes_cluster.main.fqdn
}
EOF
# 创建GCP资源配置
cat > "$TERRAFORM_DIR/gcp.tf" << 'EOF'
# GCP 资源配置
# VPC网络
resource "google_compute_network" "main" {
name = "${var.environment}-vpc"
auto_create_subnetworks = false
}
# 子网
resource "google_compute_subnetwork" "public" {
name = "${var.environment}-public-subnet"
ip_cidr_range = "10.2.1.0/24"
region = var.gcp_region
network = google_compute_network.main.id
secondary_ip_range {
range_name = "pods"
ip_cidr_range = "10.2.16.0/20"
}
secondary_ip_range {
range_name = "services"
ip_cidr_range = "10.2.32.0/20"
}
}
resource "google_compute_subnetwork" "private" {
name = "${var.environment}-private-subnet"
ip_cidr_range = "10.2.2.0/24"
region = var.gcp_region
network = google_compute_network.main.id
}
# 防火墙规则
resource "google_compute_firewall" "web" {
name = "${var.environment}-web-firewall"
network = google_compute_network.main.name
allow {
protocol = "tcp"
ports = ["80", "443"]
}
source_ranges = ["0.0.0.0/0"]
target_tags = ["web"]
}
# GKE集群
resource "google_container_cluster" "main" {
name = "${var.environment}-gke-cluster"
location = var.gcp_region
# 删除默认节点池
remove_default_node_pool = true
initial_node_count = 1
network = google_compute_network.main.name
subnetwork = google_compute_subnetwork.public.name
ip_allocation_policy {
cluster_secondary_range_name = "pods"
services_secondary_range_name = "services"
}
workload_identity_config {
workload_pool = "${var.gcp_project_id}.svc.id.goog"
}
}
# GKE节点池
resource "google_container_node_pool" "main" {
name = "${var.environment}-node-pool"
location = var.gcp_region
cluster = google_container_cluster.main.name
node_count = 2
node_config {
preemptible = false
machine_type = "e2-medium"
service_account = google_service_account.gke.email
oauth_scopes = [
"https://www.googleapis.com/auth/cloud-platform"
]
workload_metadata_config {
mode = "GKE_METADATA"
}
}
}
# 服务账号
resource "google_service_account" "gke" {
account_id = "${var.environment}-gke-sa"
display_name = "GKE Service Account"
}
# 输出
output "gcp_network_name" {
value = google_compute_network.main.name
}
output "gcp_gke_cluster_name" {
value = google_container_cluster.main.name
}
output "gcp_gke_cluster_endpoint" {
value = google_container_cluster.main.endpoint
}
EOF
log_info "Terraform配置初始化完成"
}
# 初始化Ansible配置
init_ansible() {
log_info "初始化Ansible配置..."
mkdir -p "$ANSIBLE_DIR"/{playbooks,roles,inventory}
# 创建Ansible配置文件
cat > "$ANSIBLE_DIR/ansible.cfg" << 'EOF'
[defaults]
host_key_checking = False
inventory = inventory/
roles_path = roles/
remote_user = ubuntu
private_key_file = ~/.ssh/id_rsa
[inventory]
enable_plugins = aws_ec2, azure_rm, gcp_compute
[ssh_connection]
ssh_args = -o ControlMaster=auto -o ControlPersist=60s
pipelining = True
EOF
# 创建动态清单配置
cat > "$ANSIBLE_DIR/inventory/aws_ec2.yml" << 'EOF'
plugin: aws_ec2
regions:
- us-east-1
- us-west-2
keyed_groups:
- key: tags
prefix: tag
- key: instance_type
prefix: instance_type
- key: placement.region
prefix: aws_region
compose:
ansible_host: public_ip_address
EOF
cat > "$ANSIBLE_DIR/inventory/azure_rm.yml" << 'EOF'
plugin: azure_rm
include_vm_resource_groups:
- "*"
auth_source: auto
keyed_groups:
- key: tags
prefix: tag
- key: location
prefix: azure_location
compose:
ansible_host: public_ipv4_addresses[0] | default(private_ipv4_addresses[0])
EOF
cat > "$ANSIBLE_DIR/inventory/gcp_compute.yml" << 'EOF'
plugin: gcp_compute
projects:
- your-gcp-project-id
auth_kind: serviceaccount
service_account_file: ~/.gcp/credentials.json
keyed_groups:
- key: labels
prefix: label
- key: zone
prefix: gcp_zone
compose:
ansible_host: networkInterfaces[0].accessConfigs[0].natIP | default(networkInterfaces[0].networkIP)
EOF
# 创建多云部署playbook
cat > "$ANSIBLE_DIR/playbooks/multi-cloud-deploy.yml" << 'EOF'
---
- name: 多云应用部署
hosts: all
become: yes
vars:
app_name: "multi-cloud-app"
app_version: "1.0.0"
docker_image: "nginx:latest"
tasks:
- name: 更新包管理器
package:
update_cache: yes
when: ansible_os_family == "Debian"
- name: 安装Docker
package:
name: docker.io
state: present
- name: 启动Docker服务
service:
name: docker
state: started
enabled: yes
- name: 拉取应用镜像
docker_image:
name: "{{ docker_image }}"
source: pull
- name: 运行应用容器
docker_container:
name: "{{ app_name }}"
image: "{{ docker_image }}"
state: started
restart_policy: always
ports:
- "80:80"
env:
APP_ENV: "{{ ansible_hostname }}"
CLOUD_PROVIDER: "{{ group_names[0] | regex_replace('^tag_', '') }}"
- name: 配置防火墙
ufw:
rule: allow
port: "80"
proto: tcp
when: ansible_os_family == "Debian"
- name: 配置监控
hosts: all
become: yes
tasks:
- name: 安装Node Exporter
get_url:
url: https://github.com/prometheus/node_exporter/releases/download/v1.6.1/node_exporter-1.6.1.linux-amd64.tar.gz
dest: /tmp/node_exporter.tar.gz
- name: 解压Node Exporter
unarchive:
src: /tmp/node_exporter.tar.gz
dest: /tmp/
remote_src: yes
- name: 复制Node Exporter二进制文件
copy:
src: /tmp/node_exporter-1.6.1.linux-amd64/node_exporter
dest: /usr/local/bin/node_exporter
mode: '0755'
remote_src: yes
- name: 创建Node Exporter服务
copy:
content: |
[Unit]
Description=Node Exporter
After=network.target
[Service]
Type=simple
ExecStart=/usr/local/bin/node_exporter
Restart=always
[Install]
WantedBy=multi-user.target
dest: /etc/systemd/system/node_exporter.service
- name: 启动Node Exporter服务
systemd:
name: node_exporter
state: started
enabled: yes
daemon_reload: yes
EOF
log_info "Ansible配置初始化完成"
}
# 部署多云基础设施
deploy_infrastructure() {
local environment="${1:-production}"
log_info "开始部署多云基础设施 (环境: $environment)..."
cd "$TERRAFORM_DIR"
# 初始化Terraform
log_info "初始化Terraform..."
terraform init
# 创建terraform.tfvars文件
cat > terraform.tfvars << EOF
environment = "$environment"
aws_region = "us-east-1"
azure_subscription_id = "your-azure-subscription-id"
gcp_project_id = "your-gcp-project-id"
gcp_region = "us-central1"
common_tags = {
Environment = "$environment"
Project = "multi-cloud"
ManagedBy = "terraform"
Owner = "devops-team"
}
EOF
# 验证配置
log_info "验证Terraform配置..."
terraform validate
# 规划部署
log_info "生成部署计划..."
terraform plan -out=tfplan
# 执行部署
log_info "执行基础设施部署..."
terraform apply tfplan
# 获取输出
log_info "获取部署输出..."
terraform output -json > ../outputs.json
log_info "多云基础设施部署完成"
}
# 部署应用
deploy_applications() {
log_info "开始部署应用..."
cd "$ANSIBLE_DIR"
# 更新动态清单
log_info "更新动态清单..."
ansible-inventory --list > inventory/dynamic_inventory.json
# 执行应用部署
log_info "执行应用部署..."
ansible-playbook -i inventory/ playbooks/multi-cloud-deploy.yml
log_info "应用部署完成"
}
# 配置Kubernetes集群
configure_kubernetes() {
log_info "配置Kubernetes集群..."
# 配置AWS EKS
if [ -f "$TERRAFORM_DIR/../outputs.json" ]; then
local aws_cluster_name=$(jq -r '.aws_eks_cluster_name.value' "$TERRAFORM_DIR/../outputs.json")
if [ "$aws_cluster_name" != "null" ]; then
log_info "配置AWS EKS集群: $aws_cluster_name"
aws eks update-kubeconfig --region us-east-1 --name "$aws_cluster_name" --alias aws-cluster
fi
# 配置Azure AKS
local azure_cluster_name=$(jq -r '.azure_aks_cluster_name.value' "$TERRAFORM_DIR/../outputs.json")
local azure_rg_name=$(jq -r '.azure_resource_group_name.value' "$TERRAFORM_DIR/../outputs.json")
if [ "$azure_cluster_name" != "null" ] && [ "$azure_rg_name" != "null" ]; then
log_info "配置Azure AKS集群: $azure_cluster_name"
az aks get-credentials --resource-group "$azure_rg_name" --name "$azure_cluster_name" --admin --context azure-cluster
fi
# 配置GCP GKE
local gcp_cluster_name=$(jq -r '.gcp_gke_cluster_name.value' "$TERRAFORM_DIR/../outputs.json")
if [ "$gcp_cluster_name" != "null" ]; then
log_info "配置GCP GKE集群: $gcp_cluster_name"
gcloud container clusters get-credentials "$gcp_cluster_name" --region us-central1 --project your-gcp-project-id
kubectl config rename-context "gke_your-gcp-project-id_us-central1_$gcp_cluster_name" gcp-cluster
fi
fi
# 显示集群信息
log_info "Kubernetes集群配置完成"
kubectl config get-contexts
}
# 销毁基础设施
destroy_infrastructure() {
log_warn "准备销毁多云基础设施..."
read -p "确认要销毁所有资源吗?(yes/no): " confirm
if [ "$confirm" != "yes" ]; then
log_info "取消销毁操作"
return 0
fi
cd "$TERRAFORM_DIR"
log_info "开始销毁基础设施..."
terraform destroy -auto-approve
log_info "基础设施销毁完成"
}
# 显示帮助信息
show_help() {
cat << EOF
多云资源编排部署脚本
用法: $0 [命令] [选项]
命令:
init 初始化配置文件
deploy [环境名] 部署基础设施 (默认: production)
deploy-apps 部署应用
configure-k8s 配置Kubernetes集群
destroy 销毁基础设施
help 显示帮助信息
示例:
$0 init # 初始化配置
$0 deploy staging # 部署staging环境
$0 deploy-apps # 部署应用
$0 configure-k8s # 配置K8s集群
$0 destroy # 销毁基础设施
EOF
}
# 主函数
main() {
case "${1:-help}" in
"init")
check_dependencies
init_terraform
init_ansible
;;
"deploy")
check_dependencies
deploy_infrastructure "${2:-production}"
;;
"deploy-apps")
check_dependencies
deploy_applications
;;
"configure-k8s")
check_dependencies
configure_kubernetes
;;
"destroy")
check_dependencies
destroy_infrastructure
;;
"help"|*)
show_help
;;
esac
}
# 执行主函数
main "$@"
多云资源管理器
#!/usr/bin/env python3
"""
多云资源管理器
提供统一的多云资源管理和编排能力
"""
import json
import logging
import asyncio
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
from enum import Enum
import boto3
from azure.identity import DefaultAzureCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.compute import ComputeManagementClient
from google.cloud import resource_manager
from google.cloud import compute_v1
import kubernetes
from kubernetes import client, config
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CloudProvider(Enum):
"""云服务提供商枚举"""
AWS = "aws"
AZURE = "azure"
GCP = "gcp"
class ResourceType(Enum):
"""资源类型枚举"""
COMPUTE = "compute"
STORAGE = "storage"
NETWORK = "network"
DATABASE = "database"
KUBERNETES = "kubernetes"
@dataclass
class CloudResource:
"""云资源数据类"""
id: str
name: str
type: ResourceType
provider: CloudProvider
region: str
status: str
tags: Dict[str, str]
metadata: Dict[str, Any]
created_at: datetime
cost_estimate: float = 0.0
@dataclass
class DeploymentTemplate:
"""部署模板数据类"""
name: str
description: str
provider: CloudProvider
resources: List[Dict[str, Any]]
parameters: Dict[str, Any]
dependencies: List[str]
class MultiCloudResourceManager:
"""多云资源管理器"""
def __init__(self):
self.aws_session = None
self.azure_credential = None
self.gcp_client = None
self.k8s_clients = {}
self.resources_cache = {}
self.templates = {}
self._initialize_clients()
def _initialize_clients(self):
"""初始化云服务客户端"""
try:
# 初始化AWS客户端
self.aws_session = boto3.Session()
logger.info("AWS客户端初始化成功")
# 初始化Azure客户端
self.azure_credential = DefaultAzureCredential()
logger.info("Azure客户端初始化成功")
# 初始化GCP客户端
self.gcp_client = resource_manager.Client()
logger.info("GCP客户端初始化成功")
# 初始化Kubernetes客户端
try:
config.load_kube_config()
self.k8s_clients['default'] = client.ApiClient()
logger.info("Kubernetes客户端初始化成功")
except Exception as e:
logger.warning(f"Kubernetes客户端初始化失败: {e}")
except Exception as e:
logger.error(f"客户端初始化失败: {e}")
async def discover_resources(self, providers: List[CloudProvider] = None) -> Dict[CloudProvider, List[CloudResource]]:
"""发现多云资源"""
if providers is None:
providers = [CloudProvider.AWS, CloudProvider.AZURE, CloudProvider.GCP]
discovered_resources = {}
for provider in providers:
try:
if provider == CloudProvider.AWS:
resources = await self._discover_aws_resources()
elif provider == CloudProvider.AZURE:
resources = await self._discover_azure_resources()
elif provider == CloudProvider.GCP:
resources = await self._discover_gcp_resources()
else:
resources = []
discovered_resources[provider] = resources
self.resources_cache[provider] = resources
logger.info(f"发现 {provider.value} 资源: {len(resources)} 个")
except Exception as e:
logger.error(f"发现 {provider.value} 资源失败: {e}")
discovered_resources[provider] = []
return discovered_resources
async def _discover_aws_resources(self) -> List[CloudResource]:
"""发现AWS资源"""
resources = []
try:
# 发现EC2实例
ec2 = self.aws_session.client('ec2')
instances = ec2.describe_instances()
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
resource = CloudResource(
id=instance['InstanceId'],
name=self._get_tag_value(instance.get('Tags', []), 'Name', instance['InstanceId']),
type=ResourceType.COMPUTE,
provider=CloudProvider.AWS,
region=instance['Placement']['AvailabilityZone'][:-1],
status=instance['State']['Name'],
tags=self._parse_aws_tags(instance.get('Tags', [])),
metadata={
'instance_type': instance['InstanceType'],
'vpc_id': instance.get('VpcId'),
'subnet_id': instance.get('SubnetId'),
'security_groups': [sg['GroupId'] for sg in instance.get('SecurityGroups', [])]
},
created_at=instance['LaunchTime']
)
resources.append(resource)
# 发现RDS实例
rds = self.aws_session.client('rds')
db_instances = rds.describe_db_instances()
for db_instance in db_instances['DBInstances']:
resource = CloudResource(
id=db_instance['DBInstanceIdentifier'],
name=db_instance['DBInstanceIdentifier'],
type=ResourceType.DATABASE,
provider=CloudProvider.AWS,
region=db_instance['AvailabilityZone'][:-1] if db_instance.get('AvailabilityZone') else 'unknown',
status=db_instance['DBInstanceStatus'],
tags=self._parse_aws_tags(db_instance.get('TagList', [])),
metadata={
'engine': db_instance['Engine'],
'engine_version': db_instance['EngineVersion'],
'instance_class': db_instance['DBInstanceClass'],
'allocated_storage': db_instance['AllocatedStorage']
},
created_at=db_instance['InstanceCreateTime']
)
resources.append(resource)
# 发现EKS集群
eks = self.aws_session.client('eks')
clusters = eks.list_clusters()
for cluster_name in clusters['clusters']:
cluster_info = eks.describe_cluster(name=cluster_name)['cluster']
resource = CloudResource(
id=cluster_info['arn'],
name=cluster_name,
type=ResourceType.KUBERNETES,
provider=CloudProvider.AWS,
region=cluster_info['arn'].split(':')[3],
status=cluster_info['status'],
tags=cluster_info.get('tags', {}),
metadata={
'version': cluster_info['version'],
'platform_version': cluster_info['platformVersion'],
'endpoint': cluster_info['endpoint']
},
created_at=cluster_info['createdAt']
)
resources.append(resource)
except Exception as e:
logger.error(f"发现AWS资源失败: {e}")
return resources
async def _discover_azure_resources(self) -> List[CloudResource]:
"""发现Azure资源"""
resources = []
try:
subscription_id = "your-subscription-id" # 从配置获取
# 发现虚拟机
compute_client = ComputeManagementClient(self.azure_credential, subscription_id)
vms = compute_client.virtual_machines.list_all()
for vm in vms:
resource = CloudResource(
id=vm.id,
name=vm.name,
type=ResourceType.COMPUTE,
provider=CloudProvider.AZURE,
region=vm.location,
status=vm.provisioning_state,
tags=vm.tags or {},
metadata={
'vm_size': vm.hardware_profile.vm_size,
'os_type': vm.storage_profile.os_disk.os_type.value if vm.storage_profile.os_disk.os_type else 'unknown',
'resource_group': vm.id.split('/')[4]
},
created_at=datetime.now() # Azure API不直接提供创建时间
)
resources.append(resource)
# 发现AKS集群
# 这里需要使用Container Service客户端
# 为简化示例,暂时跳过
except Exception as e:
logger.error(f"发现Azure资源失败: {e}")
return resources
async def _discover_gcp_resources(self) -> List[CloudResource]:
"""发现GCP资源"""
resources = []
try:
project_id = "your-project-id" # 从配置获取
# 发现Compute Engine实例
instances_client = compute_v1.InstancesClient()
# 获取所有区域
zones_client = compute_v1.ZonesClient()
zones = zones_client.list(project=project_id)
for zone in zones:
instances = instances_client.list(project=project_id, zone=zone.name)
for instance in instances:
resource = CloudResource(
id=str(instance.id),
name=instance.name,
type=ResourceType.COMPUTE,
provider=CloudProvider.GCP,
region=zone.region.split('/')[-1],
status=instance.status,
tags=dict(instance.labels) if instance.labels else {},
metadata={
'machine_type': instance.machine_type.split('/')[-1],
'zone': zone.name,
'network_interfaces': len(instance.network_interfaces)
},
created_at=datetime.fromisoformat(instance.creation_timestamp.rstrip('Z'))
)
resources.append(resource)
# 发现GKE集群
# 这里需要使用Container客户端
# 为简化示例,暂时跳过
except Exception as e:
logger.error(f"发现GCP资源失败: {e}")
return resources
def create_deployment_template(self, template: DeploymentTemplate) -> bool:
"""创建部署模板"""
try:
self.templates[template.name] = template
logger.info(f"创建部署模板: {template.name}")
return True
except Exception as e:
logger.error(f"创建部署模板失败: {e}")
return False
async def deploy_from_template(self, template_name: str, parameters: Dict[str, Any] = None) -> Dict[str, Any]:
"""从模板部署资源"""
if template_name not in self.templates:
raise ValueError(f"模板 {template_name} 不存在")
template = self.templates[template_name]
deployment_params = {**template.parameters}
if parameters:
deployment_params.update(parameters)
deployment_result = {
'template_name': template_name,
'provider': template.provider.value,
'status': 'deploying',
'resources': [],
'errors': []
}
try:
if template.provider == CloudProvider.AWS:
result = await self._deploy_aws_template(template, deployment_params)
elif template.provider == CloudProvider.AZURE:
result = await self._deploy_azure_template(template, deployment_params)
elif template.provider == CloudProvider.GCP:
result = await self._deploy_gcp_template(template, deployment_params)
else:
raise ValueError(f"不支持的云提供商: {template.provider}")
deployment_result.update(result)
deployment_result['status'] = 'completed'
except Exception as e:
logger.error(f"模板部署失败: {e}")
deployment_result['status'] = 'failed'
deployment_result['errors'].append(str(e))
return deployment_result
async def _deploy_aws_template(self, template: DeploymentTemplate, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""部署AWS模板"""
# 这里实现AWS CloudFormation或Terraform部署逻辑
# 为简化示例,返回模拟结果
return {
'resources': [f"aws-resource-{i}" for i in range(len(template.resources))],
'deployment_id': f"aws-deployment-{datetime.now().strftime('%Y%m%d%H%M%S')}"
}
async def _deploy_azure_template(self, template: DeploymentTemplate, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""部署Azure模板"""
# 这里实现Azure ARM模板部署逻辑
# 为简化示例,返回模拟结果
return {
'resources': [f"azure-resource-{i}" for i in range(len(template.resources))],
'deployment_id': f"azure-deployment-{datetime.now().strftime('%Y%m%d%H%M%S')}"
}
async def _deploy_gcp_template(self, template: DeploymentTemplate, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""部署GCP模板"""
# 这里实现GCP Deployment Manager部署逻辑
# 为简化示例,返回模拟结果
return {
'resources': [f"gcp-resource-{i}" for i in range(len(template.resources))],
'deployment_id': f"gcp-deployment-{datetime.now().strftime('%Y%m%d%H%M%S')}"
}
def sync_kubernetes_resources(self, cluster_contexts: List[str] = None) -> Dict[str, List[Dict[str, Any]]]:
"""同步Kubernetes资源"""
if cluster_contexts is None:
cluster_contexts = ['aws-cluster', 'azure-cluster', 'gcp-cluster']
k8s_resources = {}
for context in cluster_contexts:
try:
# 切换到指定的集群上下文
config.load_kube_config(context=context)
v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
resources = {
'namespaces': [],
'deployments': [],
'services': [],
'pods': []
}
# 获取命名空间
namespaces = v1.list_namespace()
for ns in namespaces.items:
resources['namespaces'].append({
'name': ns.metadata.name,
'status': ns.status.phase,
'created_at': ns.metadata.creation_timestamp.isoformat()
})
# 获取部署
deployments = apps_v1.list_deployment_for_all_namespaces()
for deploy in deployments.items:
resources['deployments'].append({
'name': deploy.metadata.name,
'namespace': deploy.metadata.namespace,
'replicas': deploy.spec.replicas,
'ready_replicas': deploy.status.ready_replicas or 0,
'created_at': deploy.metadata.creation_timestamp.isoformat()
})
# 获取服务
services = v1.list_service_for_all_namespaces()
for svc in services.items:
resources['services'].append({
'name': svc.metadata.name,
'namespace': svc.metadata.namespace,
'type': svc.spec.type,
'cluster_ip': svc.spec.cluster_ip,
'created_at': svc.metadata.creation_timestamp.isoformat()
})
k8s_resources[context] = resources
logger.info(f"同步 {context} 集群资源完成")
except Exception as e:
logger.error(f"同步 {context} 集群资源失败: {e}")
k8s_resources[context] = {}
return k8s_resources
def generate_resource_report(self) -> Dict[str, Any]:
"""生成资源报告"""
report = {
'generated_at': datetime.now().isoformat(),
'summary': {
'total_resources': 0,
'by_provider': {},
'by_type': {},
'by_region': {}
},
'details': {},
'recommendations': []
}
# 统计资源
for provider, resources in self.resources_cache.items():
provider_name = provider.value
report['summary']['by_provider'][provider_name] = len(resources)
report['summary']['total_resources'] += len(resources)
report['details'][provider_name] = []
for resource in resources:
# 按类型统计
resource_type = resource.type.value
if resource_type not in report['summary']['by_type']:
report['summary']['by_type'][resource_type] = 0
report['summary']['by_type'][resource_type] += 1
# 按区域统计
if resource.region not in report['summary']['by_region']:
report['summary']['by_region'][resource.region] = 0
report['summary']['by_region'][resource.region] += 1
# 添加详细信息
report['details'][provider_name].append(asdict(resource))
# 生成建议
self._generate_recommendations(report)
return report
def _generate_recommendations(self, report: Dict[str, Any]):
"""生成优化建议"""
recommendations = []
# 检查资源分布
provider_counts = report['summary']['by_provider']
if len(provider_counts) > 1:
max_provider = max(provider_counts, key=provider_counts.get)
max_count = provider_counts[max_provider]
total_count = report['summary']['total_resources']
if max_count / total_count > 0.8:
recommendations.append({
'type': 'distribution',
'priority': 'medium',
'message': f"资源过度集中在 {max_provider},建议考虑多云分布以提高可用性"
})
# 检查区域分布
region_counts = report['summary']['by_region']
if len(region_counts) == 1:
recommendations.append({
'type': 'availability',
'priority': 'high',
'message': "所有资源都在同一区域,建议部署到多个区域以提高容灾能力"
})
# 检查资源类型
type_counts = report['summary']['by_type']
if 'kubernetes' in type_counts and type_counts['kubernetes'] > 1:
recommendations.append({
'type': 'optimization',
'priority': 'low',
'message': "检测到多个Kubernetes集群,建议评估是否可以合并以降低管理复杂度"
})
report['recommendations'] = recommendations
def _get_tag_value(self, tags: List[Dict], key: str, default: str = "") -> str:
"""获取标签值"""
for tag in tags:
if tag.get('Key') == key:
return tag.get('Value', default)
return default
def _parse_aws_tags(self, tags: List[Dict]) -> Dict[str, str]:
"""解析AWS标签"""
return {tag['Key']: tag['Value'] for tag in tags}
def main():
"""主函数"""
# 创建多云资源管理器
manager = MultiCloudResourceManager()
async def run_demo():
# 发现资源
print("发现多云资源...")
resources = await manager.discover_resources([CloudProvider.AWS])
for provider, resource_list in resources.items():
print(f"\n{provider.value} 资源:")
for resource in resource_list[:3]: # 只显示前3个
print(f" - {resource.name} ({resource.type.value}) - {resource.status}")
# 创建部署模板
print("\n创建部署模板...")
template = DeploymentTemplate(
name="web-app-template",
description="Web应用部署模板",
provider=CloudProvider.AWS,
resources=[
{"type": "compute", "count": 2},
{"type": "load_balancer", "count": 1},
{"type": "database", "count": 1}
],
parameters={
"instance_type": "t3.medium",
"region": "us-east-1"
},
dependencies=[]
)
manager.create_deployment_template(template)
# 从模板部署
print("\n从模板部署资源...")
deployment_result = await manager.deploy_from_template(
"web-app-template",
{"instance_type": "t3.large"}
)
print(f"部署状态: {deployment_result['status']}")
print(f"部署ID: {deployment_result.get('deployment_id', 'N/A')}")
# 同步Kubernetes资源
print("\n同步Kubernetes资源...")
k8s_resources = manager.sync_kubernetes_resources(['default'])
for context, resources in k8s_resources.items():
if resources:
print(f"{context} 集群:")
print(f" 命名空间: {len(resources.get('namespaces', []))}")
print(f" 部署: {len(resources.get('deployments', []))}")
print(f" 服务: {len(resources.get('services', []))}")
# 生成资源报告
print("\n生成资源报告...")
report = manager.generate_resource_report()
print(f"总资源数: {report['summary']['total_resources']}")
print(f"云提供商分布: {report['summary']['by_provider']}")
print(f"资源类型分布: {report['summary']['by_type']}")
if report['recommendations']:
print("\n优化建议:")
for rec in report['recommendations']:
print(f" [{rec['priority']}] {rec['message']}")
# 运行演示
asyncio.run(run_demo())
if __name__ == "__main__":
main()
成本优化与监控
多云环境的成本优化需要综合考虑资源使用效率、定价策略和自动化管理。
成本优化分析器
#!/usr/bin/env python3
"""
多云成本优化分析器
提供跨云平台的成本分析和优化建议
"""
import json
import logging
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
from enum import Enum
import pandas as pd
import numpy as np
from collections import defaultdict
import boto3
from azure.identity import DefaultAzureCredential
from azure.mgmt.consumption import ConsumptionManagementClient
from google.cloud import billing
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class OptimizationType(Enum):
"""优化类型枚举"""
RIGHT_SIZING = "right_sizing"
RESERVED_INSTANCES = "reserved_instances"
SPOT_INSTANCES = "spot_instances"
STORAGE_OPTIMIZATION = "storage_optimization"
NETWORK_OPTIMIZATION = "network_optimization"
SCHEDULING = "scheduling"
@dataclass
class CostData:
"""成本数据类"""
provider: str
service: str
resource_id: str
resource_name: str
region: str
cost: float
currency: str
period_start: datetime
period_end: datetime
usage_quantity: float
usage_unit: str
tags: Dict[str, str]
@dataclass
class OptimizationRecommendation:
"""优化建议数据类"""
type: OptimizationType
resource_id: str
resource_name: str
current_cost: float
potential_savings: float
confidence: float
description: str
implementation_effort: str
risk_level: str
details: Dict[str, Any]
class MultiCloudCostOptimizer:
"""多云成本优化器"""
def __init__(self):
self.aws_session = None
self.azure_credential = None
self.gcp_billing_client = None
self.cost_data = []
self.recommendations = []
self._initialize_clients()
def _initialize_clients(self):
"""初始化云服务客户端"""
try:
# 初始化AWS客户端
self.aws_session = boto3.Session()
logger.info("AWS成本管理客户端初始化成功")
# 初始化Azure客户端
self.azure_credential = DefaultAzureCredential()
logger.info("Azure成本管理客户端初始化成功")
# 初始化GCP客户端
self.gcp_billing_client = billing.CloudBillingClient()
logger.info("GCP计费客户端初始化成功")
except Exception as e:
logger.error(f"客户端初始化失败: {e}")
def collect_cost_data(self, start_date: datetime, end_date: datetime) -> List[CostData]:
"""收集成本数据"""
all_cost_data = []
# 收集AWS成本数据
aws_costs = self._collect_aws_costs(start_date, end_date)
all_cost_data.extend(aws_costs)
# 收集Azure成本数据
azure_costs = self._collect_azure_costs(start_date, end_date)
all_cost_data.extend(azure_costs)
# 收集GCP成本数据
gcp_costs = self._collect_gcp_costs(start_date, end_date)
all_cost_data.extend(gcp_costs)
self.cost_data = all_cost_data
logger.info(f"收集到 {len(all_cost_data)} 条成本数据")
return all_cost_data
def _collect_aws_costs(self, start_date: datetime, end_date: datetime) -> List[CostData]:
"""收集AWS成本数据"""
cost_data = []
try:
ce_client = self.aws_session.client('ce')
response = ce_client.get_cost_and_usage(
TimePeriod={
'Start': start_date.strftime('%Y-%m-%d'),
'End': end_date.strftime('%Y-%m-%d')
},
Granularity='DAILY',
Metrics=['BlendedCost', 'UsageQuantity'],
GroupBy=[
{'Type': 'DIMENSION', 'Key': 'SERVICE'},
{'Type': 'DIMENSION', 'Key': 'REGION'}
]
)
for result in response['ResultsByTime']:
period_start = datetime.strptime(result['TimePeriod']['Start'], '%Y-%m-%d')
period_end = datetime.strptime(result['TimePeriod']['End'], '%Y-%m-%d')
for group in result['Groups']:
service = group['Keys'][0] if len(group['Keys']) > 0 else 'Unknown'
region = group['Keys'][1] if len(group['Keys']) > 1 else 'Unknown'
cost = float(group['Metrics']['BlendedCost']['Amount'])
usage = float(group['Metrics']['UsageQuantity']['Amount'])
cost_data.append(CostData(
provider='AWS',
service=service,
resource_id=f"aws-{service}-{region}",
resource_name=f"{service} in {region}",
region=region,
cost=cost,
currency=group['Metrics']['BlendedCost']['Unit'],
period_start=period_start,
period_end=period_end,
usage_quantity=usage,
usage_unit=group['Metrics']['UsageQuantity']['Unit'],
tags={}
))
except Exception as e:
logger.error(f"收集AWS成本数据失败: {e}")
return cost_data
def _collect_azure_costs(self, start_date: datetime, end_date: datetime) -> List[CostData]:
"""收集Azure成本数据"""
cost_data = []
try:
subscription_id = "your-subscription-id" # 从配置获取
consumption_client = ConsumptionManagementClient(
self.azure_credential,
subscription_id
)
# 获取使用详情
usage_details = consumption_client.usage_details.list(
scope=f"/subscriptions/{subscription_id}",
filter=f"properties/usageStart ge '{start_date.strftime('%Y-%m-%d')}' and properties/usageEnd le '{end_date.strftime('%Y-%m-%d')}'"
)
for usage in usage_details:
cost_data.append(CostData(
provider='Azure',
service=usage.consumed_service,
resource_id=usage.instance_id or 'unknown',
resource_name=usage.instance_name or usage.consumed_service,
region=usage.resource_location or 'unknown',
cost=float(usage.cost),
currency=usage.billing_currency,
period_start=usage.usage_start,
period_end=usage.usage_end,
usage_quantity=float(usage.usage_quantity),
usage_unit=usage.unit_of_measure,
tags=usage.tags or {}
))
except Exception as e:
logger.error(f"收集Azure成本数据失败: {e}")
return cost_data
def _collect_gcp_costs(self, start_date: datetime, end_date: datetime) -> List[CostData]:
"""收集GCP成本数据"""
cost_data = []
try:
# GCP BigQuery导出的计费数据查询
# 这里需要配置BigQuery客户端和查询
# 为简化示例,返回模拟数据
services = ['Compute Engine', 'Cloud Storage', 'BigQuery', 'Cloud SQL']
regions = ['us-central1', 'us-east1', 'europe-west1']
current_date = start_date
while current_date < end_date:
for service in services:
for region in regions:
cost_data.append(CostData(
provider='GCP',
service=service,
resource_id=f"gcp-{service.lower().replace(' ', '-')}-{region}",
resource_name=f"{service} in {region}",
region=region,
cost=np.random.uniform(10, 1000),
currency='USD',
period_start=current_date,
period_end=current_date + timedelta(days=1),
usage_quantity=np.random.uniform(1, 100),
usage_unit='hours',
tags={}
))
current_date += timedelta(days=1)
except Exception as e:
logger.error(f"收集GCP成本数据失败: {e}")
return cost_data
def analyze_cost_trends(self) -> Dict[str, Any]:
"""分析成本趋势"""
if not self.cost_data:
return {}
# 转换为DataFrame进行分析
df = pd.DataFrame([asdict(cost) for cost in self.cost_data])
df['period_start'] = pd.to_datetime(df['period_start'])
analysis = {
'total_cost': df['cost'].sum(),
'daily_average': df.groupby('period_start')['cost'].sum().mean(),
'cost_by_provider': df.groupby('provider')['cost'].sum().to_dict(),
'cost_by_service': df.groupby('service')['cost'].sum().to_dict(),
'cost_by_region': df.groupby('region')['cost'].sum().to_dict(),
'trend_analysis': {},
'growth_rate': {}
}
# 计算趋势
daily_costs = df.groupby('period_start')['cost'].sum().sort_index()
if len(daily_costs) > 1:
# 计算增长率
first_week = daily_costs.head(7).mean()
last_week = daily_costs.tail(7).mean()
if first_week > 0:
growth_rate = ((last_week - first_week) / first_week) * 100
analysis['growth_rate']['weekly'] = growth_rate
# 趋势分析
x = np.arange(len(daily_costs))
y = daily_costs.values
if len(x) > 1:
slope, intercept = np.polyfit(x, y, 1)
analysis['trend_analysis']['slope'] = slope
analysis['trend_analysis']['direction'] = 'increasing' if slope > 0 else 'decreasing'
return analysis
def generate_optimization_recommendations(self) -> List[OptimizationRecommendation]:
"""生成优化建议"""
recommendations = []
if not self.cost_data:
return recommendations
# 转换为DataFrame
df = pd.DataFrame([asdict(cost) for cost in self.cost_data])
# 右尺寸优化建议
recommendations.extend(self._analyze_right_sizing(df))
# 预留实例建议
recommendations.extend(self._analyze_reserved_instances(df))
# Spot实例建议
recommendations.extend(self._analyze_spot_instances(df))
# 存储优化建议
recommendations.extend(self._analyze_storage_optimization(df))
# 网络优化建议
recommendations.extend(self._analyze_network_optimization(df))
# 调度优化建议
recommendations.extend(self._analyze_scheduling_optimization(df))
# 按潜在节省排序
recommendations.sort(key=lambda x: x.potential_savings, reverse=True)
self.recommendations = recommendations
return recommendations
def _analyze_right_sizing(self, df: pd.DataFrame) -> List[OptimizationRecommendation]:
"""分析右尺寸优化"""
recommendations = []
# 查找高成本计算资源
compute_services = df[df['service'].str.contains('Compute|EC2|Virtual Machines', case=False, na=False)]
if not compute_services.empty:
high_cost_resources = compute_services.groupby(['resource_id', 'resource_name'])['cost'].sum()
high_cost_resources = high_cost_resources[high_cost_resources > high_cost_resources.quantile(0.8)]
for resource_id, cost in high_cost_resources.items():
recommendations.append(OptimizationRecommendation(
type=OptimizationType.RIGHT_SIZING,
resource_id=resource_id[0],
resource_name=resource_id[1],
current_cost=cost,
potential_savings=cost * 0.3, # 假设可节省30%
confidence=0.7,
description=f"资源 {resource_id[1]} 的成本较高,建议评估是否可以降低配置",
implementation_effort="Medium",
risk_level="Low",
details={
'current_monthly_cost': cost,
'recommended_action': 'Downsize instance type',
'monitoring_period': '2 weeks'
}
))
return recommendations
def _analyze_reserved_instances(self, df: pd.DataFrame) -> List[OptimizationRecommendation]:
"""分析预留实例优化"""
recommendations = []
# 查找稳定运行的计算资源
compute_services = df[df['service'].str.contains('Compute|EC2|Virtual Machines', case=False, na=False)]
if not compute_services.empty:
# 按资源分组,计算使用稳定性
resource_usage = compute_services.groupby(['resource_id', 'resource_name']).agg({
'cost': 'sum',
'usage_quantity': ['mean', 'std']
}).reset_index()
# 扁平化列名
resource_usage.columns = ['resource_id', 'resource_name', 'total_cost', 'avg_usage', 'usage_std']
# 查找使用稳定且成本较高的资源
stable_resources = resource_usage[
(resource_usage['usage_std'] / resource_usage['avg_usage'] < 0.2) & # 使用量变化小于20%
(resource_usage['total_cost'] > resource_usage['total_cost'].quantile(0.6)) # 成本较高
]
for _, resource in stable_resources.iterrows():
savings = resource['total_cost'] * 0.4 # 预留实例可节省40%
recommendations.append(OptimizationRecommendation(
type=OptimizationType.RESERVED_INSTANCES,
resource_id=resource['resource_id'],
resource_name=resource['resource_name'],
current_cost=resource['total_cost'],
potential_savings=savings,
confidence=0.8,
description=f"资源 {resource['resource_name']} 使用稳定,建议购买预留实例",
implementation_effort="Low",
risk_level="Low",
details={
'usage_stability': resource['usage_std'] / resource['avg_usage'],
'recommended_term': '1 year',
'payment_option': 'Partial Upfront'
}
))
return recommendations
def _analyze_spot_instances(self, df: pd.DataFrame) -> List[OptimizationRecommendation]:
"""分析Spot实例优化"""
recommendations = []
# 查找可容错的工作负载
batch_services = df[df['service'].str.contains('Batch|Lambda|Functions', case=False, na=False)]
if not batch_services.empty:
high_cost_batch = batch_services.groupby(['resource_id', 'resource_name'])['cost'].sum()
high_cost_batch = high_cost_batch[high_cost_batch > high_cost_batch.quantile(0.7)]
for resource_id, cost in high_cost_batch.items():
recommendations.append(OptimizationRecommendation(
type=OptimizationType.SPOT_INSTANCES,
resource_id=resource_id[0],
resource_name=resource_id[1],
current_cost=cost,
potential_savings=cost * 0.6, # Spot实例可节省60%
confidence=0.6,
description=f"批处理工作负载 {resource_id[1]} 建议使用Spot实例",
implementation_effort="Medium",
risk_level="Medium",
details={
'workload_type': 'Batch processing',
'fault_tolerance': 'Required',
'recommended_strategy': 'Mixed instance types'
}
))
return recommendations
def _analyze_storage_optimization(self, df: pd.DataFrame) -> List[OptimizationRecommendation]:
"""分析存储优化"""
recommendations = []
# 查找存储服务
storage_services = df[df['service'].str.contains('Storage|S3|Blob|Cloud Storage', case=False, na=False)]
if not storage_services.empty:
high_cost_storage = storage_services.groupby(['resource_id', 'resource_name'])['cost'].sum()
high_cost_storage = high_cost_storage[high_cost_storage > high_cost_storage.quantile(0.7)]
for resource_id, cost in high_cost_storage.items():
recommendations.append(OptimizationRecommendation(
type=OptimizationType.STORAGE_OPTIMIZATION,
resource_id=resource_id[0],
resource_name=resource_id[1],
current_cost=cost,
potential_savings=cost * 0.25, # 存储优化可节省25%
confidence=0.8,
description=f"存储资源 {resource_id[1]} 建议优化存储类别和生命周期策略",
implementation_effort="Low",
risk_level="Low",
details={
'optimization_type': 'Storage class transition',
'lifecycle_policy': 'Recommended',
'compression': 'Evaluate'
}
))
return recommendations
def _analyze_network_optimization(self, df: pd.DataFrame) -> List[OptimizationRecommendation]:
"""分析网络优化"""
recommendations = []
# 查找网络服务
network_services = df[df['service'].str.contains('Network|VPC|CDN|Load Balancer', case=False, na=False)]
if not network_services.empty:
# 按区域分析网络成本
region_costs = network_services.groupby('region')['cost'].sum()
high_cost_regions = region_costs[region_costs > region_costs.quantile(0.8)]
for region, cost in high_cost_regions.items():
recommendations.append(OptimizationRecommendation(
type=OptimizationType.NETWORK_OPTIMIZATION,
resource_id=f"network-{region}",
resource_name=f"Network resources in {region}",
current_cost=cost,
potential_savings=cost * 0.2, # 网络优化可节省20%
confidence=0.6,
description=f"区域 {region} 的网络成本较高,建议优化数据传输和CDN配置",
implementation_effort="Medium",
risk_level="Low",
details={
'optimization_areas': ['Data transfer', 'CDN configuration', 'Load balancer optimization'],
'region': region
}
))
return recommendations
def _analyze_scheduling_optimization(self, df: pd.DataFrame) -> List[OptimizationRecommendation]:
"""分析调度优化"""
recommendations = []
# 查找可调度的资源
schedulable_services = df[df['service'].str.contains('Compute|EC2|Virtual Machines|Database', case=False, na=False)]
if not schedulable_services.empty:
# 分析使用模式
df['hour'] = pd.to_datetime(df['period_start']).dt.hour
hourly_usage = schedulable_services.groupby(['resource_id', 'hour'])['usage_quantity'].mean().unstack(fill_value=0)
# 查找有明显使用模式的资源
for resource_id in hourly_usage.index:
usage_pattern = hourly_usage.loc[resource_id]
peak_hours = usage_pattern.nlargest(8).index.tolist() # 前8小时为峰值
off_peak_hours = usage_pattern.nsmallest(8).index.tolist() # 后8小时为低谷
if usage_pattern.max() > usage_pattern.mean() * 2: # 峰值是平均值的2倍以上
resource_cost = schedulable_services[schedulable_services['resource_id'] == resource_id]['cost'].sum()
recommendations.append(OptimizationRecommendation(
type=OptimizationType.SCHEDULING,
resource_id=resource_id,
resource_name=resource_id,
current_cost=resource_cost,
potential_savings=resource_cost * 0.3, # 调度优化可节省30%
confidence=0.7,
description=f"资源 {resource_id} 有明显的使用模式,建议实施自动调度",
implementation_effort="High",
risk_level="Medium",
details={
'peak_hours': peak_hours,
'off_peak_hours': off_peak_hours,
'scaling_strategy': 'Time-based auto scaling'
}
))
return recommendations
def create_cost_dashboard_data(self) -> Dict[str, Any]:
"""创建成本仪表板数据"""
if not self.cost_data:
return {}
df = pd.DataFrame([asdict(cost) for cost in self.cost_data])
df['period_start'] = pd.to_datetime(df['period_start'])
# 准备仪表板数据
dashboard_data = {
'summary': {
'total_cost': df['cost'].sum(),
'total_resources': len(df['resource_id'].unique()),
'cost_trend': 'increasing', # 基于趋势分析
'top_cost_driver': df.groupby('service')['cost'].sum().idxmax()
},
'charts': {
'cost_by_provider': df.groupby('provider')['cost'].sum().to_dict(),
'cost_by_service': df.groupby('service')['cost'].sum().nlargest(10).to_dict(),
'cost_by_region': df.groupby('region')['cost'].sum().to_dict(),
'daily_cost_trend': df.groupby('period_start')['cost'].sum().to_dict()
},
'recommendations_summary': {
'total_recommendations': len(self.recommendations),
'potential_savings': sum(rec.potential_savings for rec in self.recommendations),
'high_priority_count': len([rec for rec in self.recommendations if rec.confidence > 0.8])
},
'alerts': []
}
# 生成告警
total_cost = dashboard_data['summary']['total_cost']
if total_cost > 10000: # 假设阈值
dashboard_data['alerts'].append({
'type': 'high_cost',
'message': f"总成本 ${total_cost:.2f} 超过预算阈值",
'severity': 'warning'
})
# 检查成本增长
daily_costs = df.groupby('period_start')['cost'].sum().sort_index()
if len(daily_costs) > 7:
recent_avg = daily_costs.tail(7).mean()
previous_avg = daily_costs.head(7).mean()
if recent_avg > previous_avg * 1.2: # 增长超过20%
dashboard_data['alerts'].append({
'type': 'cost_spike',
'message': f"最近7天成本增长 {((recent_avg - previous_avg) / previous_avg * 100):.1f}%",
'severity': 'critical'
})
return dashboard_data
def export_recommendations(self, format: str = 'json') -> str:
"""导出优化建议"""
if format == 'json':
return json.dumps([asdict(rec) for rec in self.recommendations],
indent=2, default=str, ensure_ascii=False)
elif format == 'csv':
df = pd.DataFrame([asdict(rec) for rec in self.recommendations])
return df.to_csv(index=False)
else:
raise ValueError(f"不支持的格式: {format}")
def main():
"""主函数"""
# 创建成本优化器
optimizer = MultiCloudCostOptimizer()
# 设置时间范围
end_date = datetime.now()
start_date = end_date - timedelta(days=30)
# 收集成本数据
print("收集成本数据...")
cost_data = optimizer.collect_cost_data(start_date, end_date)
print(f"收集到 {len(cost_data)} 条成本记录")
# 分析成本趋势
print("\n分析成本趋势...")
trends = optimizer.analyze_cost_trends()
print(f"总成本: ${trends.get('total_cost', 0):.2f}")
print(f"日均成本: ${trends.get('daily_average', 0):.2f}")
print("按云提供商分布:")
for provider, cost in trends.get('cost_by_provider', {}).items():
print(f" {provider}: ${cost:.2f}")
# 生成优化建议
print("\n生成优化建议...")
recommendations = optimizer.generate_optimization_recommendations()
print(f"生成 {len(recommendations)} 条优化建议")
# 显示前5条建议
print("\n前5条优化建议:")
for i, rec in enumerate(recommendations[:5], 1):
print(f"{i}. [{rec.type.value}] {rec.description}")
print(f" 当前成本: ${rec.current_cost:.2f}, 潜在节省: ${rec.potential_savings:.2f}")
print(f" 置信度: {rec.confidence:.1%}, 实施难度: {rec.implementation_effort}")
# 创建仪表板数据
print("\n创建成本仪表板...")
dashboard = optimizer.create_cost_dashboard_data()
print(f"总潜在节省: ${dashboard.get('recommendations_summary', {}).get('potential_savings', 0):.2f}")
if dashboard.get('alerts'):
print("\n成本告警:")
for alert in dashboard['alerts']:
print(f" [{alert['severity']}] {alert['message']}")
# 导出建议
print("\n导出优化建议...")
json_export = optimizer.export_recommendations('json')
with open('cost_optimization_recommendations.json', 'w', encoding='utf-8') as f:
f.write(json_export)
print("建议已导出到 cost_optimization_recommendations.json")
if __name__ == "__main__":
main()
监控与可观测性
多云环境需要统一的监控和可观测性平台,以实现跨云的性能监控、故障诊断和运维管理。
统一监控平台
# prometheus-multicloud-config.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "multicloud_rules.yml"
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
scrape_configs:
# AWS CloudWatch Exporter
- job_name: 'aws-cloudwatch'
static_configs:
- targets: ['cloudwatch-exporter:9106']
scrape_interval: 60s
metrics_path: /metrics
params:
region: ['us-east-1', 'us-west-2', 'eu-west-1']
# Azure Monitor Exporter
- job_name: 'azure-monitor'
static_configs:
- targets: ['azure-exporter:9090']
scrape_interval: 60s
# GCP Monitoring Exporter
- job_name: 'gcp-monitoring'
static_configs:
- targets: ['gcp-exporter:9091']
scrape_interval: 60s
# Kubernetes clusters
- job_name: 'kubernetes-apiservers'
kubernetes_sd_configs:
- role: endpoints
scheme: https
tls_config:
ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
relabel_configs:
- source_labels: [__meta_kubernetes_namespace, __meta_kubernetes_service_name, __meta_kubernetes_endpoint_port_name]
action: keep
regex: default;kubernetes;https
# Multi-cloud application metrics
- job_name: 'multicloud-apps'
consul_sd_configs:
- server: 'consul:8500'
services: ['web-app', 'api-service', 'database']
relabel_configs:
- source_labels: [__meta_consul_service]
target_label: service
- source_labels: [__meta_consul_node]
target_label: instance
- source_labels: [__meta_consul_tags]
regex: '.*,cloud:([^,]+),.*'
target_label: cloud_provider
多云告警规则
# multicloud_rules.yml
groups:
- name: multicloud.rules
rules:
# 跨云资源可用性告警
- alert: MultiCloudServiceDown
expr: up{job=~"aws-.*|azure-.*|gcp-.*"} == 0
for: 5m
labels:
severity: critical
category: availability
annotations:
summary: "Multi-cloud service {{ $labels.job }} is down"
description: "Service {{ $labels.job }} on {{ $labels.cloud_provider }} has been down for more than 5 minutes."
# 成本异常告警
- alert: HighCostIncrease
expr: increase(cloud_cost_total[1h]) > 100
for: 10m
labels:
severity: warning
category: cost
annotations:
summary: "High cost increase detected"
description: "Cloud cost increased by ${{ $value }} in the last hour on {{ $labels.cloud_provider }}."
# 跨云延迟告警
- alert: HighCrossCloudLatency
expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket{job="multicloud-apps"}[5m])) > 0.5
for: 5m
labels:
severity: warning
category: performance
annotations:
summary: "High cross-cloud latency detected"
description: "95th percentile latency is {{ $value }}s for {{ $labels.service }} between {{ $labels.source_cloud }} and {{ $labels.target_cloud }}."
# 资源利用率告警
- alert: LowResourceUtilization
expr: avg_over_time(cpu_usage_percent[24h]) < 20
for: 1h
labels:
severity: info
category: optimization
annotations:
summary: "Low resource utilization detected"
description: "Resource {{ $labels.instance }} on {{ $labels.cloud_provider }} has average CPU utilization of {{ $value }}% over 24 hours."
# 多云数据同步告警
- alert: DataSyncFailure
expr: increase(data_sync_failures_total[1h]) > 0
for: 0m
labels:
severity: critical
category: data_integrity
annotations:
summary: "Data synchronization failure"
description: "Data sync failed {{ $value }} times in the last hour between {{ $labels.source_cloud }} and {{ $labels.target_cloud }}."
- name: multicloud.cost.rules
rules:
# 成本预测规则
- record: cloud_cost_prediction_7d
expr: predict_linear(cloud_cost_total[7d], 7*24*3600)
- record: cloud_cost_efficiency_ratio
expr: cloud_revenue_total / cloud_cost_total
# 资源利用率记录
- record: resource_utilization_avg_24h
expr: avg_over_time(cpu_usage_percent[24h])
- record: storage_efficiency_ratio
expr: storage_used_bytes / storage_allocated_bytes
可观测性仪表板
#!/usr/bin/env python3
"""
多云可观测性仪表板
提供统一的多云监控和可视化界面
"""
import json
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
import requests
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import streamlit as st
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class MetricData:
"""指标数据类"""
timestamp: datetime
value: float
labels: Dict[str, str]
metric_name: str
@dataclass
class AlertData:
"""告警数据类"""
alert_name: str
severity: str
status: str
started_at: datetime
labels: Dict[str, str]
annotations: Dict[str, str]
class MultiCloudObservabilityDashboard:
"""多云可观测性仪表板"""
def __init__(self, prometheus_url: str = "http://prometheus:9090"):
self.prometheus_url = prometheus_url
self.metrics_cache = {}
self.alerts_cache = []
def query_prometheus(self, query: str, start_time: datetime = None, end_time: datetime = None) -> List[MetricData]:
"""查询Prometheus指标"""
try:
if start_time and end_time:
# 范围查询
params = {
'query': query,
'start': start_time.isoformat(),
'end': end_time.isoformat(),
'step': '60s'
}
response = requests.get(f"{self.prometheus_url}/api/v1/query_range", params=params)
else:
# 即时查询
params = {'query': query}
response = requests.get(f"{self.prometheus_url}/api/v1/query", params=params)
response.raise_for_status()
data = response.json()
metrics = []
if data['status'] == 'success':
for result in data['data']['result']:
labels = result['metric']
if 'values' in result: # 范围查询
for timestamp, value in result['values']:
metrics.append(MetricData(
timestamp=datetime.fromtimestamp(float(timestamp)),
value=float(value),
labels=labels,
metric_name=query
))
else: # 即时查询
timestamp, value = result['value']
metrics.append(MetricData(
timestamp=datetime.fromtimestamp(float(timestamp)),
value=float(value),
labels=labels,
metric_name=query
))
return metrics
except Exception as e:
logger.error(f"查询Prometheus失败: {e}")
return []
def get_active_alerts(self) -> List[AlertData]:
"""获取活跃告警"""
try:
response = requests.get(f"{self.prometheus_url}/api/v1/alerts")
response.raise_for_status()
data = response.json()
alerts = []
if data['status'] == 'success':
for alert in data['data']['alerts']:
alerts.append(AlertData(
alert_name=alert['labels']['alertname'],
severity=alert['labels'].get('severity', 'unknown'),
status=alert['state'],
started_at=datetime.fromisoformat(alert['activeAt'].rstrip('Z')),
labels=alert['labels'],
annotations=alert.get('annotations', {})
))
self.alerts_cache = alerts
return alerts
except Exception as e:
logger.error(f"获取告警失败: {e}")
return []
def create_cost_overview_chart(self) -> go.Figure:
"""创建成本概览图表"""
# 查询各云提供商的成本数据
end_time = datetime.now()
start_time = end_time - timedelta(days=7)
cost_metrics = self.query_prometheus(
'sum by (cloud_provider) (cloud_cost_total)',
start_time, end_time
)
if not cost_metrics:
return go.Figure()
# 转换为DataFrame
df = pd.DataFrame([
{
'timestamp': metric.timestamp,
'cost': metric.value,
'provider': metric.labels.get('cloud_provider', 'unknown')
}
for metric in cost_metrics
])
# 创建堆叠面积图
fig = px.area(
df,
x='timestamp',
y='cost',
color='provider',
title='多云成本趋势 (7天)',
labels={'cost': '成本 ($)', 'timestamp': '时间'}
)
fig.update_layout(
xaxis_title="时间",
yaxis_title="成本 ($)",
legend_title="云提供商"
)
return fig
def create_resource_utilization_chart(self) -> go.Figure:
"""创建资源利用率图表"""
# 查询CPU利用率数据
cpu_metrics = self.query_prometheus(
'avg by (cloud_provider) (cpu_usage_percent)'
)
# 查询内存利用率数据
memory_metrics = self.query_prometheus(
'avg by (cloud_provider) (memory_usage_percent)'
)
# 查询存储利用率数据
storage_metrics = self.query_prometheus(
'avg by (cloud_provider) (storage_efficiency_ratio * 100)'
)
providers = list(set([m.labels.get('cloud_provider', 'unknown') for m in cpu_metrics]))
cpu_values = []
memory_values = []
storage_values = []
for provider in providers:
cpu_val = next((m.value for m in cpu_metrics if m.labels.get('cloud_provider') == provider), 0)
memory_val = next((m.value for m in memory_metrics if m.labels.get('cloud_provider') == provider), 0)
storage_val = next((m.value for m in storage_metrics if m.labels.get('cloud_provider') == provider), 0)
cpu_values.append(cpu_val)
memory_values.append(memory_val)
storage_values.append(storage_val)
# 创建分组柱状图
fig = go.Figure(data=[
go.Bar(name='CPU', x=providers, y=cpu_values),
go.Bar(name='内存', x=providers, y=memory_values),
go.Bar(name='存储', x=providers, y=storage_values)
])
fig.update_layout(
title='各云提供商资源利用率',
xaxis_title='云提供商',
yaxis_title='利用率 (%)',
barmode='group'
)
return fig
def create_performance_metrics_chart(self) -> go.Figure:
"""创建性能指标图表"""
end_time = datetime.now()
start_time = end_time - timedelta(hours=24)
# 查询响应时间数据
latency_metrics = self.query_prometheus(
'histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))',
start_time, end_time
)
# 查询错误率数据
error_metrics = self.query_prometheus(
'rate(http_requests_total{status=~"5.."}[5m]) / rate(http_requests_total[5m]) * 100',
start_time, end_time
)
# 创建子图
fig = make_subplots(
rows=2, cols=1,
subplot_titles=('响应时间 (95th percentile)', '错误率'),
vertical_spacing=0.1
)
if latency_metrics:
df_latency = pd.DataFrame([
{
'timestamp': metric.timestamp,
'latency': metric.value,
'service': metric.labels.get('service', 'unknown')
}
for metric in latency_metrics
])
for service in df_latency['service'].unique():
service_data = df_latency[df_latency['service'] == service]
fig.add_trace(
go.Scatter(
x=service_data['timestamp'],
y=service_data['latency'],
mode='lines',
name=f'{service} 延迟',
line=dict(width=2)
),
row=1, col=1
)
if error_metrics:
df_error = pd.DataFrame([
{
'timestamp': metric.timestamp,
'error_rate': metric.value,
'service': metric.labels.get('service', 'unknown')
}
for metric in error_metrics
])
for service in df_error['service'].unique():
service_data = df_error[df_error['service'] == service]
fig.add_trace(
go.Scatter(
x=service_data['timestamp'],
y=service_data['error_rate'],
mode='lines',
name=f'{service} 错误率',
line=dict(width=2)
),
row=2, col=1
)
fig.update_xaxes(title_text="时间", row=2, col=1)
fig.update_yaxes(title_text="延迟 (秒)", row=1, col=1)
fig.update_yaxes(title_text="错误率 (%)", row=2, col=1)
fig.update_layout(
title='应用性能指标 (24小时)',
height=600
)
return fig
def create_alerts_summary(self) -> Dict[str, Any]:
"""创建告警摘要"""
alerts = self.get_active_alerts()
summary = {
'total_alerts': len(alerts),
'critical_alerts': len([a for a in alerts if a.severity == 'critical']),
'warning_alerts': len([a for a in alerts if a.severity == 'warning']),
'info_alerts': len([a for a in alerts if a.severity == 'info']),
'alerts_by_category': {},
'recent_alerts': []
}
# 按类别统计告警
for alert in alerts:
category = alert.labels.get('category', 'other')
if category not in summary['alerts_by_category']:
summary['alerts_by_category'][category] = 0
summary['alerts_by_category'][category] += 1
# 最近的告警
recent_alerts = sorted(alerts, key=lambda x: x.started_at, reverse=True)[:10]
summary['recent_alerts'] = [
{
'name': alert.alert_name,
'severity': alert.severity,
'started_at': alert.started_at.strftime('%Y-%m-%d %H:%M:%S'),
'description': alert.annotations.get('summary', ''),
'cloud_provider': alert.labels.get('cloud_provider', 'unknown')
}
for alert in recent_alerts
]
return summary
def generate_health_score(self) -> Dict[str, float]:
"""生成健康评分"""
scores = {}
# 可用性评分 (基于服务正常运行时间)
uptime_metrics = self.query_prometheus('avg(up)')
availability_score = uptime_metrics[0].value * 100 if uptime_metrics else 0
# 性能评分 (基于响应时间)
latency_metrics = self.query_prometheus(
'histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))'
)
if latency_metrics:
avg_latency = sum(m.value for m in latency_metrics) / len(latency_metrics)
performance_score = max(0, 100 - (avg_latency * 100)) # 延迟越低分数越高
else:
performance_score = 100
# 成本效率评分 (基于资源利用率)
utilization_metrics = self.query_prometheus('avg(cpu_usage_percent)')
if utilization_metrics:
avg_utilization = utilization_metrics[0].value
# 理想利用率在60-80%之间
if 60 <= avg_utilization <= 80:
cost_efficiency_score = 100
elif avg_utilization < 60:
cost_efficiency_score = avg_utilization / 60 * 100
else:
cost_efficiency_score = max(0, 100 - (avg_utilization - 80) * 2)
else:
cost_efficiency_score = 50
# 安全评分 (基于告警数量)
alerts = self.get_active_alerts()
critical_alerts = len([a for a in alerts if a.severity == 'critical'])
security_score = max(0, 100 - critical_alerts * 10)
# 综合评分
overall_score = (
availability_score * 0.3 +
performance_score * 0.25 +
cost_efficiency_score * 0.25 +
security_score * 0.2
)
scores = {
'overall': round(overall_score, 1),
'availability': round(availability_score, 1),
'performance': round(performance_score, 1),
'cost_efficiency': round(cost_efficiency_score, 1),
'security': round(security_score, 1)
}
return scores
def create_streamlit_dashboard():
"""创建Streamlit仪表板"""
st.set_page_config(
page_title="多云可观测性仪表板",
page_icon="☁️",
layout="wide"
)
st.title("☁️ 多云可观测性仪表板")
# 初始化仪表板
dashboard = MultiCloudObservabilityDashboard()
# 侧边栏配置
st.sidebar.header("配置")
prometheus_url = st.sidebar.text_input(
"Prometheus URL",
value="http://prometheus:9090"
)
dashboard.prometheus_url = prometheus_url
refresh_interval = st.sidebar.selectbox(
"刷新间隔",
options=[30, 60, 300, 600],
index=1,
format_func=lambda x: f"{x}秒"
)
# 自动刷新
if st.sidebar.button("刷新数据"):
st.experimental_rerun()
# 健康评分
st.header("🎯 系统健康评分")
health_scores = dashboard.generate_health_score()
col1, col2, col3, col4, col5 = st.columns(5)
with col1:
st.metric("综合评分", f"{health_scores['overall']}/100")
with col2:
st.metric("可用性", f"{health_scores['availability']}/100")
with col3:
st.metric("性能", f"{health_scores['performance']}/100")
with col4:
st.metric("成本效率", f"{health_scores['cost_efficiency']}/100")
with col5:
st.metric("安全性", f"{health_scores['security']}/100")
# 告警摘要
st.header("🚨 告警摘要")
alerts_summary = dashboard.create_alerts_summary()
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("总告警", alerts_summary['total_alerts'])
with col2:
st.metric("严重告警", alerts_summary['critical_alerts'])
with col3:
st.metric("警告告警", alerts_summary['warning_alerts'])
with col4:
st.metric("信息告警", alerts_summary['info_alerts'])
# 最近告警
if alerts_summary['recent_alerts']:
st.subheader("最近告警")
alerts_df = pd.DataFrame(alerts_summary['recent_alerts'])
st.dataframe(alerts_df, use_container_width=True)
# 图表展示
st.header("📊 监控图表")
# 成本概览
st.subheader("成本概览")
cost_chart = dashboard.create_cost_overview_chart()
st.plotly_chart(cost_chart, use_container_width=True)
# 资源利用率和性能指标
col1, col2 = st.columns(2)
with col1:
st.subheader("资源利用率")
utilization_chart = dashboard.create_resource_utilization_chart()
st.plotly_chart(utilization_chart, use_container_width=True)
with col2:
st.subheader("性能指标")
performance_chart = dashboard.create_performance_metrics_chart()
st.plotly_chart(performance_chart, use_container_width=True)
def main():
"""主函数"""
create_streamlit_dashboard()
if __name__ == "__main__":
main()
最佳实践与建议
1. 治理框架建立
组织架构
- 建立多云治理委员会,包含技术、财务、安全等部门代表
- 定义清晰的角色和职责分工
- 制定多云策略和标准操作程序
政策制定
- 云服务选择标准和评估流程
- 数据分类和存储位置要求
- 安全和合规基线要求
- 成本控制和预算管理政策
2. 技术实施要点
标准化
- 统一的资源标记和命名规范
- 标准化的部署模板和流程
- 一致的监控和日志格式
- 统一的身份认证和授权机制
自动化
- 基础设施即代码(IaC)实践
- 自动化的合规检查和修复
- 持续的成本优化和资源调整
- 自动化的备份和灾难恢复
3. 成本优化策略
预算管理
- 设置详细的成本预算和告警
- 定期进行成本审查和分析
- 建立成本分摊和计费机制
- 实施成本优化激励措施
资源优化
- 定期评估资源使用情况
- 实施自动化的资源调度
- 优化数据传输和存储策略
- 合理使用预留实例和Spot实例
总结
多云治理与成本优化是一个持续的过程,需要技术、流程和组织的协同配合。通过建立完善的治理框架、实施有效的成本控制措施、部署统一的监控平台,企业可以充分发挥多云架构的优势,同时控制复杂性和成本。
关键成功因素包括:
- 统一的治理框架 - 建立清晰的政策、流程和标准
- 自动化工具 - 减少手动操作,提高效率和一致性
- 持续监控 - 实时掌握资源状态和成本变化
- 优化文化 - 培养成本意识和优化习惯
- 技能建设 - 提升团队的多云管理能力
随着云技术的不断发展,多云治理和成本优化的工具和方法也在持续演进。企业需要保持学习和适应,不断完善自己的多云管理体系,以实现业务目标和技术效益的最佳平衡。