跳转到主要内容

云基础设施架构设计与实践:构建可扩展、高可用的现代云平台

博主
24 分钟
4939 字
--

AI 导读

深刻理解和准确把握"云基础设施架构设计与实践:构建可扩展、高可用的现代云平台"这一重要概念的核心要义,本文从理论基础、实践应用和发展前景等多个维度进行了系统性阐述,为读者提供了全面而深入的分析视角。

内容由AI智能生成

目录

  1. 云基础设施架构概述
  2. 网络架构设计
  3. 计算资源架构
  4. 存储架构设计
  5. 安全架构实施
  6. 监控与可观测性
  7. 基础设施即代码
  8. 成本优化策略
  9. 总结

云基础设施架构概述

现代云基础设施架构需要满足高可用性、可扩展性、安全性和成本效益等多重要求。本文将深入探讨如何设计和实施一个完整的云基础设施架构。

架构设计原则

# 云基础设施架构设计原则
architecture_principles:
  scalability:
    horizontal_scaling: true
    auto_scaling: true
    load_balancing: true
    
  availability:
    multi_az_deployment: true
    disaster_recovery: true
    fault_tolerance: true
    
  security:
    defense_in_depth: true
    zero_trust_model: true
    encryption_everywhere: true
    
  cost_optimization:
    resource_rightsizing: true
    reserved_instances: true
    spot_instances: true
    
  operational_excellence:
    infrastructure_as_code: true
    automated_deployment: true
    monitoring_alerting: true

架构层次模型

graph TB
    subgraph "应用层"
        A[Web应用] --> B[API服务]
        B --> C[微服务]
    end
    
    subgraph "平台层"
        D[容器编排] --> E[服务网格]
        E --> F[API网关]
    end
    
    subgraph "基础设施层"
        G[计算资源] --> H[存储系统]
        H --> I[网络架构]
    end
    
    subgraph "安全层"
        J[身份认证] --> K[访问控制]
        K --> L[数据加密]
    end
    
    A --> D
    D --> G
    G --> J

网络架构设计

网络架构是云基础设施的核心组件,需要提供安全、高性能和可扩展的连接能力。

VPC网络设计

#!/usr/bin/env python3
"""
云网络架构管理器
提供VPC、子网、路由表和安全组的自动化管理功能
"""

import boto3
import json
from typing import Dict, List, Optional
from dataclasses import dataclass
import ipaddress

@dataclass
class SubnetConfig:
    """子网配置"""
    name: str
    cidr: str
    availability_zone: str
    subnet_type: str  # public, private, database
    route_table: str

@dataclass
class VPCConfig:
    """VPC配置"""
    name: str
    cidr: str
    enable_dns_hostnames: bool = True
    enable_dns_support: bool = True
    subnets: List[SubnetConfig] = None

class CloudNetworkManager:
    """云网络架构管理器"""
    
    def __init__(self, region: str = 'us-west-2'):
        self.ec2 = boto3.client('ec2', region_name=region)
        self.region = region
        
    def create_vpc_architecture(self, config: VPCConfig) -> Dict:
        """创建完整的VPC架构"""
        try:
            # 创建VPC
            vpc_response = self.ec2.create_vpc(
                CidrBlock=config.cidr,
                TagSpecifications=[{
                    'ResourceType': 'vpc',
                    'Tags': [{'Key': 'Name', 'Value': config.name}]
                }]
            )
            vpc_id = vpc_response['Vpc']['VpcId']
            
            # 启用DNS解析
            self.ec2.modify_vpc_attribute(
                VpcId=vpc_id,
                EnableDnsHostnames={'Value': config.enable_dns_hostnames}
            )
            self.ec2.modify_vpc_attribute(
                VpcId=vpc_id,
                EnableDnsSupport={'Value': config.enable_dns_support}
            )
            
            # 创建Internet Gateway
            igw_response = self.ec2.create_internet_gateway(
                TagSpecifications=[{
                    'ResourceType': 'internet-gateway',
                    'Tags': [{'Key': 'Name', 'Value': f'{config.name}-igw'}]
                }]
            )
            igw_id = igw_response['InternetGateway']['InternetGatewayId']
            
            # 附加Internet Gateway到VPC
            self.ec2.attach_internet_gateway(
                InternetGatewayId=igw_id,
                VpcId=vpc_id
            )
            
            # 创建子网
            subnets = {}
            for subnet_config in config.subnets:
                subnet_id = self._create_subnet(vpc_id, subnet_config)
                subnets[subnet_config.name] = subnet_id
            
            # 创建路由表
            route_tables = self._create_route_tables(vpc_id, config.name, igw_id)
            
            # 关联子网到路由表
            self._associate_subnets_to_route_tables(subnets, route_tables, config.subnets)
            
            # 创建NAT Gateway
            nat_gateways = self._create_nat_gateways(subnets, config.subnets)
            
            # 更新私有路由表
            self._update_private_route_tables(route_tables, nat_gateways)
            
            return {
                'vpc_id': vpc_id,
                'internet_gateway_id': igw_id,
                'subnets': subnets,
                'route_tables': route_tables,
                'nat_gateways': nat_gateways
            }
            
        except Exception as e:
            print(f"创建VPC架构失败: {str(e)}")
            return None
    
    def _create_subnet(self, vpc_id: str, config: SubnetConfig) -> str:
        """创建子网"""
        response = self.ec2.create_subnet(
            VpcId=vpc_id,
            CidrBlock=config.cidr,
            AvailabilityZone=config.availability_zone,
            TagSpecifications=[{
                'ResourceType': 'subnet',
                'Tags': [
                    {'Key': 'Name', 'Value': config.name},
                    {'Key': 'Type', 'Value': config.subnet_type}
                ]
            }]
        )
        return response['Subnet']['SubnetId']
    
    def _create_route_tables(self, vpc_id: str, vpc_name: str, igw_id: str) -> Dict:
        """创建路由表"""
        route_tables = {}
        
        # 公有路由表
        public_rt_response = self.ec2.create_route_table(
            VpcId=vpc_id,
            TagSpecifications=[{
                'ResourceType': 'route-table',
                'Tags': [{'Key': 'Name', 'Value': f'{vpc_name}-public-rt'}]
            }]
        )
        public_rt_id = public_rt_response['RouteTable']['RouteTableId']
        
        # 添加到Internet Gateway的路由
        self.ec2.create_route(
            RouteTableId=public_rt_id,
            DestinationCidrBlock='0.0.0.0/0',
            GatewayId=igw_id
        )
        
        route_tables['public'] = public_rt_id
        
        # 私有路由表
        private_rt_response = self.ec2.create_route_table(
            VpcId=vpc_id,
            TagSpecifications=[{
                'ResourceType': 'route-table',
                'Tags': [{'Key': 'Name', 'Value': f'{vpc_name}-private-rt'}]
            }]
        )
        route_tables['private'] = private_rt_response['RouteTable']['RouteTableId']
        
        # 数据库路由表
        db_rt_response = self.ec2.create_route_table(
            VpcId=vpc_id,
            TagSpecifications=[{
                'ResourceType': 'route-table',
                'Tags': [{'Key': 'Name', 'Value': f'{vpc_name}-database-rt'}]
            }]
        )
        route_tables['database'] = db_rt_response['RouteTable']['RouteTableId']
        
        return route_tables
    
    def _create_nat_gateways(self, subnets: Dict, subnet_configs: List[SubnetConfig]) -> Dict:
        """创建NAT Gateway"""
        nat_gateways = {}
        
        # 为每个公有子网创建NAT Gateway
        for config in subnet_configs:
            if config.subnet_type == 'public':
                # 分配弹性IP
                eip_response = self.ec2.allocate_address(Domain='vpc')
                allocation_id = eip_response['AllocationId']
                
                # 创建NAT Gateway
                nat_response = self.ec2.create_nat_gateway(
                    SubnetId=subnets[config.name],
                    AllocationId=allocation_id,
                    TagSpecifications=[{
                        'ResourceType': 'nat-gateway',
                        'Tags': [{'Key': 'Name', 'Value': f'{config.name}-nat'}]
                    }]
                )
                
                nat_gateways[config.availability_zone] = nat_response['NatGateway']['NatGatewayId']
        
        return nat_gateways
    
    def create_security_groups(self, vpc_id: str) -> Dict:
        """创建安全组"""
        security_groups = {}
        
        # Web层安全组
        web_sg = self.ec2.create_security_group(
            GroupName='web-tier-sg',
            Description='Security group for web tier',
            VpcId=vpc_id,
            TagSpecifications=[{
                'ResourceType': 'security-group',
                'Tags': [{'Key': 'Name', 'Value': 'web-tier-sg'}]
            }]
        )
        web_sg_id = web_sg['GroupId']
        
        # Web层入站规则
        self.ec2.authorize_security_group_ingress(
            GroupId=web_sg_id,
            IpPermissions=[
                {
                    'IpProtocol': 'tcp',
                    'FromPort': 80,
                    'ToPort': 80,
                    'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
                },
                {
                    'IpProtocol': 'tcp',
                    'FromPort': 443,
                    'ToPort': 443,
                    'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
                }
            ]
        )
        
        security_groups['web'] = web_sg_id
        
        # 应用层安全组
        app_sg = self.ec2.create_security_group(
            GroupName='app-tier-sg',
            Description='Security group for application tier',
            VpcId=vpc_id,
            TagSpecifications=[{
                'ResourceType': 'security-group',
                'Tags': [{'Key': 'Name', 'Value': 'app-tier-sg'}]
            }]
        )
        app_sg_id = app_sg['GroupId']
        
        # 应用层入站规则(仅允许来自Web层的流量)
        self.ec2.authorize_security_group_ingress(
            GroupId=app_sg_id,
            IpPermissions=[
                {
                    'IpProtocol': 'tcp',
                    'FromPort': 8080,
                    'ToPort': 8080,
                    'UserIdGroupPairs': [{'GroupId': web_sg_id}]
                }
            ]
        )
        
        security_groups['app'] = app_sg_id
        
        # 数据库层安全组
        db_sg = self.ec2.create_security_group(
            GroupName='database-tier-sg',
            Description='Security group for database tier',
            VpcId=vpc_id,
            TagSpecifications=[{
                'ResourceType': 'security-group',
                'Tags': [{'Key': 'Name', 'Value': 'database-tier-sg'}]
            }]
        )
        db_sg_id = db_sg['GroupId']
        
        # 数据库层入站规则(仅允许来自应用层的流量)
        self.ec2.authorize_security_group_ingress(
            GroupId=db_sg_id,
            IpPermissions=[
                {
                    'IpProtocol': 'tcp',
                    'FromPort': 3306,
                    'ToPort': 3306,
                    'UserIdGroupPairs': [{'GroupId': app_sg_id}]
                },
                {
                    'IpProtocol': 'tcp',
                    'FromPort': 5432,
                    'ToPort': 5432,
                    'UserIdGroupPairs': [{'GroupId': app_sg_id}]
                }
            ]
        )
        
        security_groups['database'] = db_sg_id
        
        return security_groups

def main():
    """主函数 - 演示网络架构创建"""
    # 网络管理器
    network_manager = CloudNetworkManager()
    
    # VPC配置
    vpc_config = VPCConfig(
        name='production-vpc',
        cidr='10.0.0.0/16',
        subnets=[
            SubnetConfig('public-subnet-1a', '10.0.1.0/24', 'us-west-2a', 'public', 'public'),
            SubnetConfig('public-subnet-1b', '10.0.2.0/24', 'us-west-2b', 'public', 'public'),
            SubnetConfig('private-subnet-1a', '10.0.10.0/24', 'us-west-2a', 'private', 'private'),
            SubnetConfig('private-subnet-1b', '10.0.11.0/24', 'us-west-2b', 'private', 'private'),
            SubnetConfig('database-subnet-1a', '10.0.20.0/24', 'us-west-2a', 'database', 'database'),
            SubnetConfig('database-subnet-1b', '10.0.21.0/24', 'us-west-2b', 'database', 'database')
        ]
    )
    
    # 创建VPC架构
    print("创建VPC架构...")
    vpc_result = network_manager.create_vpc_architecture(vpc_config)
    
    if vpc_result:
        print(f"VPC创建成功: {vpc_result['vpc_id']}")
        
        # 创建安全组
        print("创建安全组...")
        security_groups = network_manager.create_security_groups(vpc_result['vpc_id'])
        print(f"安全组创建完成: {security_groups}")
        
        # 输出架构信息
        architecture_info = {
            'vpc': vpc_result,
            'security_groups': security_groups
        }
        
        print("\n=== 网络架构创建完成 ===")
        print(json.dumps(architecture_info, indent=2, default=str))
    else:
        print("VPC架构创建失败")

if __name__ == "__main__":
    main()

计算资源架构

计算资源架构需要考虑性能、可扩展性、成本和管理复杂度等因素。

自动扩缩容管理

#!/usr/bin/env python3
"""
计算资源自动扩缩容管理器
提供基于指标的自动扩缩容、负载均衡和实例管理功能
"""

import boto3
import json
import time
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class AutoScalingConfig:
    """自动扩缩容配置"""
    name: str
    min_size: int
    max_size: int
    desired_capacity: int
    target_group_arn: str
    launch_template_id: str
    subnet_ids: List[str]
    health_check_type: str = 'ELB'
    health_check_grace_period: int = 300

@dataclass
class ScalingPolicy:
    """扩缩容策略"""
    name: str
    policy_type: str
    metric_name: str
    target_value: float
    scale_out_cooldown: int = 300
    scale_in_cooldown: int = 300

class ComputeResourceManager:
    """计算资源管理器"""
    
    def __init__(self, region: str = 'us-west-2'):
        self.autoscaling = boto3.client('autoscaling', region_name=region)
        self.ec2 = boto3.client('ec2', region_name=region)
        self.elbv2 = boto3.client('elbv2', region_name=region)
        self.cloudwatch = boto3.client('cloudwatch', region_name=region)
        self.region = region
    
    def create_launch_template(self, name: str, config: Dict) -> str:
        """创建启动模板"""
        try:
            response = self.ec2.create_launch_template(
                LaunchTemplateName=name,
                LaunchTemplateData={
                    'ImageId': config['ami_id'],
                    'InstanceType': config['instance_type'],
                    'KeyName': config.get('key_name'),
                    'SecurityGroupIds': config['security_group_ids'],
                    'UserData': config.get('user_data', ''),
                    'IamInstanceProfile': {
                        'Name': config.get('iam_instance_profile', 'EC2-Default-Role')
                    },
                    'BlockDeviceMappings': [
                        {
                            'DeviceName': '/dev/xvda',
                            'Ebs': {
                                'VolumeSize': config.get('volume_size', 20),
                                'VolumeType': config.get('volume_type', 'gp3'),
                                'DeleteOnTermination': True,
                                'Encrypted': True
                            }
                        }
                    ],
                    'TagSpecifications': [
                        {
                            'ResourceType': 'instance',
                            'Tags': [
                                {'Key': 'Name', 'Value': f'{name}-instance'},
                                {'Key': 'Environment', 'Value': config.get('environment', 'production')},
                                {'Key': 'Application', 'Value': config.get('application', 'web-app')}
                            ]
                        }
                    ]
                }
            )
            
            return response['LaunchTemplate']['LaunchTemplateId']
            
        except Exception as e:
            print(f"创建启动模板失败: {str(e)}")
            return None
    
    def create_auto_scaling_group(self, config: AutoScalingConfig) -> bool:
        """创建自动扩缩容组"""
        try:
            response = self.autoscaling.create_auto_scaling_group(
                AutoScalingGroupName=config.name,
                LaunchTemplate={
                    'LaunchTemplateId': config.launch_template_id,
                    'Version': '$Latest'
                },
                MinSize=config.min_size,
                MaxSize=config.max_size,
                DesiredCapacity=config.desired_capacity,
                VPCZoneIdentifier=','.join(config.subnet_ids),
                TargetGroupARNs=[config.target_group_arn],
                HealthCheckType=config.health_check_type,
                HealthCheckGracePeriod=config.health_check_grace_period,
                Tags=[
                    {
                        'Key': 'Name',
                        'Value': config.name,
                        'PropagateAtLaunch': True,
                        'ResourceId': config.name,
                        'ResourceType': 'auto-scaling-group'
                    }
                ]
            )
            
            print(f"自动扩缩容组 {config.name} 创建成功")
            return True
            
        except Exception as e:
            print(f"创建自动扩缩容组失败: {str(e)}")
            return False
    
    def create_scaling_policies(self, asg_name: str, policies: List[ScalingPolicy]) -> Dict:
        """创建扩缩容策略"""
        policy_arns = {}
        
        for policy in policies:
            try:
                # 创建目标跟踪扩缩容策略
                response = self.autoscaling.put_scaling_policy(
                    AutoScalingGroupName=asg_name,
                    PolicyName=policy.name,
                    PolicyType=policy.policy_type,
                    TargetTrackingConfiguration={
                        'PredefinedMetricSpecification': {
                            'PredefinedMetricType': policy.metric_name
                        },
                        'TargetValue': policy.target_value,
                        'ScaleOutCooldown': policy.scale_out_cooldown,
                        'ScaleInCooldown': policy.scale_in_cooldown
                    }
                )
                
                policy_arns[policy.name] = response['PolicyARN']
                print(f"扩缩容策略 {policy.name} 创建成功")
                
            except Exception as e:
                print(f"创建扩缩容策略 {policy.name} 失败: {str(e)}")
        
        return policy_arns
    
    def create_load_balancer(self, name: str, subnet_ids: List[str], security_group_ids: List[str]) -> Dict:
        """创建应用负载均衡器"""
        try:
            # 创建负载均衡器
            lb_response = self.elbv2.create_load_balancer(
                Name=name,
                Subnets=subnet_ids,
                SecurityGroups=security_group_ids,
                Scheme='internet-facing',
                Type='application',
                IpAddressType='ipv4',
                Tags=[
                    {'Key': 'Name', 'Value': name},
                    {'Key': 'Environment', 'Value': 'production'}
                ]
            )
            
            lb_arn = lb_response['LoadBalancers'][0]['LoadBalancerArn']
            lb_dns = lb_response['LoadBalancers'][0]['DNSName']
            
            # 创建目标组
            tg_response = self.elbv2.create_target_group(
                Name=f'{name}-tg',
                Protocol='HTTP',
                Port=80,
                VpcId=self._get_vpc_id_from_subnet(subnet_ids[0]),
                HealthCheckProtocol='HTTP',
                HealthCheckPath='/health',
                HealthCheckIntervalSeconds=30,
                HealthCheckTimeoutSeconds=5,
                HealthyThresholdCount=2,
                UnhealthyThresholdCount=3,
                Tags=[
                    {'Key': 'Name', 'Value': f'{name}-tg'}
                ]
            )
            
            tg_arn = tg_response['TargetGroups'][0]['TargetGroupArn']
            
            # 创建监听器
            listener_response = self.elbv2.create_listener(
                LoadBalancerArn=lb_arn,
                Protocol='HTTP',
                Port=80,
                DefaultActions=[
                    {
                        'Type': 'forward',
                        'TargetGroupArn': tg_arn
                    }
                ]
            )
            
            return {
                'load_balancer_arn': lb_arn,
                'load_balancer_dns': lb_dns,
                'target_group_arn': tg_arn,
                'listener_arn': listener_response['Listeners'][0]['ListenerArn']
            }
            
        except Exception as e:
            print(f"创建负载均衡器失败: {str(e)}")
            return None
    
    def _get_vpc_id_from_subnet(self, subnet_id: str) -> str:
        """从子网ID获取VPC ID"""
        response = self.ec2.describe_subnets(SubnetIds=[subnet_id])
        return response['Subnets'][0]['VpcId']
    
    def monitor_scaling_activities(self, asg_name: str) -> List[Dict]:
        """监控扩缩容活动"""
        try:
            response = self.autoscaling.describe_scaling_activities(
                AutoScalingGroupName=asg_name,
                MaxRecords=10
            )
            
            activities = []
            for activity in response['Activities']:
                activities.append({
                    'activity_id': activity['ActivityId'],
                    'description': activity['Description'],
                    'cause': activity['Cause'],
                    'start_time': activity['StartTime'],
                    'end_time': activity.get('EndTime'),
                    'status_code': activity['StatusCode'],
                    'status_message': activity.get('StatusMessage', '')
                })
            
            return activities
            
        except Exception as e:
            print(f"获取扩缩容活动失败: {str(e)}")
            return []
    
    def get_instance_metrics(self, instance_ids: List[str], hours: int = 1) -> Dict:
        """获取实例指标"""
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(hours=hours)
        
        metrics = {}
        
        for instance_id in instance_ids:
            try:
                # CPU利用率
                cpu_response = self.cloudwatch.get_metric_statistics(
                    Namespace='AWS/EC2',
                    MetricName='CPUUtilization',
                    Dimensions=[
                        {'Name': 'InstanceId', 'Value': instance_id}
                    ],
                    StartTime=start_time,
                    EndTime=end_time,
                    Period=300,
                    Statistics=['Average', 'Maximum']
                )
                
                # 网络输入
                network_in_response = self.cloudwatch.get_metric_statistics(
                    Namespace='AWS/EC2',
                    MetricName='NetworkIn',
                    Dimensions=[
                        {'Name': 'InstanceId', 'Value': instance_id}
                    ],
                    StartTime=start_time,
                    EndTime=end_time,
                    Period=300,
                    Statistics=['Sum']
                )
                
                metrics[instance_id] = {
                    'cpu_utilization': cpu_response['Datapoints'],
                    'network_in': network_in_response['Datapoints']
                }
                
            except Exception as e:
                print(f"获取实例 {instance_id} 指标失败: {str(e)}")
                metrics[instance_id] = {}
        
        return metrics

def main():
    """主函数 - 演示计算资源架构创建"""
    # 计算资源管理器
    compute_manager = ComputeResourceManager()
    
    # 启动模板配置
    launch_template_config = {
        'ami_id': 'ami-0c02fb55956c7d316',  # Amazon Linux 2
        'instance_type': 't3.medium',
        'security_group_ids': ['sg-12345678'],  # 替换为实际的安全组ID
        'key_name': 'my-key-pair',
        'environment': 'production',
        'application': 'web-app',
        'volume_size': 20,
        'user_data': '''#!/bin/bash
yum update -y
yum install -y httpd
systemctl start httpd
systemctl enable httpd
echo "<h1>Hello from Auto Scaling Instance</h1>" > /var/www/html/index.html
'''
    }
    
    # 创建启动模板
    print("创建启动模板...")
    launch_template_id = compute_manager.create_launch_template(
        'web-app-template',
        launch_template_config
    )
    
    if launch_template_id:
        print(f"启动模板创建成功: {launch_template_id}")
        
        # 创建负载均衡器
        print("创建负载均衡器...")
        lb_result = compute_manager.create_load_balancer(
            'web-app-alb',
            ['subnet-12345678', 'subnet-87654321'],  # 替换为实际的子网ID
            ['sg-12345678']  # 替换为实际的安全组ID
        )
        
        if lb_result:
            print(f"负载均衡器创建成功: {lb_result['load_balancer_dns']}")
            
            # 自动扩缩容组配置
            asg_config = AutoScalingConfig(
                name='web-app-asg',
                min_size=2,
                max_size=10,
                desired_capacity=3,
                target_group_arn=lb_result['target_group_arn'],
                launch_template_id=launch_template_id,
                subnet_ids=['subnet-12345678', 'subnet-87654321']  # 替换为实际的子网ID
            )
            
            # 创建自动扩缩容组
            print("创建自动扩缩容组...")
            if compute_manager.create_auto_scaling_group(asg_config):
                
                # 扩缩容策略
                scaling_policies = [
                    ScalingPolicy(
                        name='cpu-target-tracking',
                        policy_type='TargetTrackingScaling',
                        metric_name='ASGAverageCPUUtilization',
                        target_value=70.0
                    ),
                    ScalingPolicy(
                        name='request-count-target-tracking',
                        policy_type='TargetTrackingScaling',
                        metric_name='ALBRequestCountPerTarget',
                        target_value=1000.0
                    )
                ]
                
                # 创建扩缩容策略
                print("创建扩缩容策略...")
                policy_arns = compute_manager.create_scaling_policies(
                    asg_config.name,
                    scaling_policies
                )
                
                print("\n=== 计算资源架构创建完成 ===")
                print(f"负载均衡器DNS: {lb_result['load_balancer_dns']}")
                print(f"自动扩缩容组: {asg_config.name}")
                print(f"扩缩容策略: {list(policy_arns.keys())}")
                
                # 监控扩缩容活动
                print("\n监控扩缩容活动...")
                time.sleep(10)  # 等待一段时间
                activities = compute_manager.monitor_scaling_activities(asg_config.name)
                for activity in activities[:3]:  # 显示最近3个活动
                    print(f"活动: {activity['description']}")
                    print(f"状态: {activity['status_code']}")
                    print(f"时间: {activity['start_time']}")
                    print("---")

if __name__ == "__main__":
    main()

存储架构设计

存储架构需要考虑性能、持久性、可用性和成本等因素,提供多层次的存储解决方案。

多层存储管理

#!/usr/bin/env python3
"""
云存储架构管理器
提供多层存储、数据生命周期管理和备份策略的自动化管理
"""

import boto3
import json
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class StorageClass:
    """存储类别配置"""
    name: str
    storage_class: str
    transition_days: int
    description: str

@dataclass
class LifecycleRule:
    """生命周期规则"""
    rule_id: str
    prefix: str
    status: str
    transitions: List[StorageClass]
    expiration_days: Optional[int] = None

class CloudStorageManager:
    """云存储架构管理器"""
    
    def __init__(self, region: str = 'us-west-2'):
        self.s3 = boto3.client('s3', region_name=region)
        self.efs = boto3.client('efs', region_name=region)
        self.fsx = boto3.client('fsx', region_name=region)
        self.region = region
    
    def create_s3_storage_architecture(self, bucket_configs: List[Dict]) -> Dict:
        """创建S3存储架构"""
        created_buckets = {}
        
        for config in bucket_configs:
            try:
                bucket_name = config['name']
                
                # 创建S3存储桶
                if self.region == 'us-east-1':
                    self.s3.create_bucket(Bucket=bucket_name)
                else:
                    self.s3.create_bucket(
                        Bucket=bucket_name,
                        CreateBucketConfiguration={'LocationConstraint': self.region}
                    )
                
                # 配置版本控制
                if config.get('versioning', False):
                    self.s3.put_bucket_versioning(
                        Bucket=bucket_name,
                        VersioningConfiguration={'Status': 'Enabled'}
                    )
                
                # 配置服务器端加密
                if config.get('encryption', True):
                    self.s3.put_bucket_encryption(
                        Bucket=bucket_name,
                        ServerSideEncryptionConfiguration={
                            'Rules': [
                                {
                                    'ApplyServerSideEncryptionByDefault': {
                                        'SSEAlgorithm': 'AES256'
                                    }
                                }
                            ]
                        }
                    )
                
                # 配置生命周期策略
                if 'lifecycle_rules' in config:
                    self._configure_lifecycle_policy(bucket_name, config['lifecycle_rules'])
                
                # 配置跨区域复制
                if 'replication' in config:
                    self._configure_replication(bucket_name, config['replication'])
                
                # 配置访问日志
                if config.get('access_logging', False):
                    self._configure_access_logging(bucket_name, config.get('log_bucket'))
                
                # 配置标签
                if 'tags' in config:
                    self.s3.put_bucket_tagging(
                        Bucket=bucket_name,
                        Tagging={'TagSet': config['tags']}
                    )
                
                created_buckets[bucket_name] = {
                    'region': self.region,
                    'versioning': config.get('versioning', False),
                    'encryption': config.get('encryption', True),
                    'lifecycle_configured': 'lifecycle_rules' in config
                }
                
                print(f"S3存储桶 {bucket_name} 创建成功")
                
            except Exception as e:
                print(f"创建S3存储桶 {config['name']} 失败: {str(e)}")
        
        return created_buckets
    
    def _configure_lifecycle_policy(self, bucket_name: str, lifecycle_rules: List[LifecycleRule]):
        """配置生命周期策略"""
        rules = []
        
        for rule in lifecycle_rules:
            rule_config = {
                'ID': rule.rule_id,
                'Status': rule.status,
                'Filter': {'Prefix': rule.prefix},
                'Transitions': []
            }
            
            # 添加转换规则
            for transition in rule.transitions:
                rule_config['Transitions'].append({
                    'Days': transition.transition_days,
                    'StorageClass': transition.storage_class
                })
            
            # 添加过期规则
            if rule.expiration_days:
                rule_config['Expiration'] = {'Days': rule.expiration_days}
            
            rules.append(rule_config)
        
        try:
            self.s3.put_bucket_lifecycle_configuration(
                Bucket=bucket_name,
                LifecycleConfiguration={'Rules': rules}
            )
            print(f"生命周期策略配置成功: {bucket_name}")
        except Exception as e:
            print(f"配置生命周期策略失败: {str(e)}")
    
    def create_efs_file_system(self, name: str, config: Dict) -> Dict:
        """创建EFS文件系统"""
        try:
            # 创建EFS文件系统
            response = self.efs.create_file_system(
                CreationToken=f"{name}-{int(datetime.now().timestamp())}",
                PerformanceMode=config.get('performance_mode', 'generalPurpose'),
                ThroughputMode=config.get('throughput_mode', 'bursting'),
                Encrypted=config.get('encrypted', True),
                Tags=[
                    {'Key': 'Name', 'Value': name},
                    {'Key': 'Environment', 'Value': config.get('environment', 'production')}
                ]
            )
            
            file_system_id = response['FileSystemId']
            
            # 创建挂载目标
            mount_targets = []
            for subnet_config in config.get('mount_targets', []):
                mt_response = self.efs.create_mount_target(
                    FileSystemId=file_system_id,
                    SubnetId=subnet_config['subnet_id'],
                    SecurityGroups=subnet_config['security_groups']
                )
                mount_targets.append(mt_response['MountTargetId'])
            
            # 配置生命周期策略
            if 'lifecycle_policy' in config:
                self.efs.put_lifecycle_configuration(
                    FileSystemId=file_system_id,
                    LifecyclePolicies=[
                        {
                            'TransitionToIA': config['lifecycle_policy']['transition_to_ia']
                        }
                    ]
                )
            
            return {
                'file_system_id': file_system_id,
                'mount_targets': mount_targets,
                'dns_name': f"{file_system_id}.efs.{self.region}.amazonaws.com"
            }
            
        except Exception as e:
            print(f"创建EFS文件系统失败: {str(e)}")
            return None
    
    def create_fsx_file_system(self, name: str, config: Dict) -> Dict:
        """创建FSx文件系统"""
        try:
            # 根据文件系统类型创建FSx
            if config['file_system_type'] == 'LUSTRE':
                response = self.fsx.create_file_system(
                    FileSystemType='LUSTRE',
                    StorageCapacity=config['storage_capacity'],
                    SubnetIds=config['subnet_ids'],
                    SecurityGroupIds=config['security_group_ids'],
                    Tags=[
                        {'Key': 'Name', 'Value': name}
                    ],
                    LustreConfiguration={
                        'DeploymentType': config.get('deployment_type', 'SCRATCH_2'),
                        'PerUnitStorageThroughput': config.get('throughput', 50)
                    }
                )
            elif config['file_system_type'] == 'WINDOWS':
                response = self.fsx.create_file_system(
                    FileSystemType='WINDOWS',
                    StorageCapacity=config['storage_capacity'],
                    SubnetIds=config['subnet_ids'],
                    SecurityGroupIds=config['security_group_ids'],
                    Tags=[
                        {'Key': 'Name', 'Value': name}
                    ],
                    WindowsConfiguration={
                        'ActiveDirectoryId': config.get('active_directory_id'),
                        'ThroughputCapacity': config.get('throughput_capacity', 8),
                        'DeploymentType': config.get('deployment_type', 'SINGLE_AZ_1')
                    }
                )
            
            return {
                'file_system_id': response['FileSystem']['FileSystemId'],
                'dns_name': response['FileSystem']['DNSName'],
                'lifecycle_status': response['FileSystem']['Lifecycle']
            }
            
        except Exception as e:
            print(f"创建FSx文件系统失败: {str(e)}")
            return None
    
    def create_backup_strategy(self, resources: Dict) -> Dict:
        """创建备份策略"""
        backup_plans = {}
        
        try:
            backup_client = boto3.client('backup', region_name=self.region)
            
            # 创建备份计划
            backup_plan = {
                'BackupPlanName': 'comprehensive-backup-plan',
                'Rules': [
                    {
                        'RuleName': 'daily-backup',
                        'TargetBackupVault': 'default',
                        'ScheduleExpression': 'cron(0 2 * * ? *)',  # 每天凌晨2点
                        'StartWindowMinutes': 60,
                        'CompletionWindowMinutes': 120,
                        'Lifecycle': {
                            'MoveToColdStorageAfterDays': 30,
                            'DeleteAfterDays': 365
                        },
                        'RecoveryPointTags': {
                            'BackupType': 'Daily',
                            'Environment': 'Production'
                        }
                    },
                    {
                        'RuleName': 'weekly-backup',
                        'TargetBackupVault': 'default',
                        'ScheduleExpression': 'cron(0 3 ? * SUN *)',  # 每周日凌晨3点
                        'StartWindowMinutes': 60,
                        'CompletionWindowMinutes': 180,
                        'Lifecycle': {
                            'MoveToColdStorageAfterDays': 7,
                            'DeleteAfterDays': 2555  # 7年
                        },
                        'RecoveryPointTags': {
                            'BackupType': 'Weekly',
                            'Environment': 'Production'
                        }
                    }
                ]
            }
            
            plan_response = backup_client.create_backup_plan(BackupPlan=backup_plan)
            backup_plan_id = plan_response['BackupPlanId']
            
            # 创建备份选择
            backup_selection = {
                'SelectionName': 'production-resources',
                'IamRoleArn': 'arn:aws:iam::123456789012:role/aws-backup-service-role',  # 替换为实际的IAM角色
                'Resources': [],
                'Conditions': {
                    'StringEquals': {
                        'aws:ResourceTag/Environment': ['Production']
                    }
                }
            }
            
            # 添加资源到备份选择
            for resource_type, resource_list in resources.items():
                if resource_type == 'ec2_instances':
                    for instance_id in resource_list:
                        backup_selection['Resources'].append(f'arn:aws:ec2:{self.region}:*:instance/{instance_id}')
                elif resource_type == 'ebs_volumes':
                    for volume_id in resource_list:
                        backup_selection['Resources'].append(f'arn:aws:ec2:{self.region}:*:volume/{volume_id}')
                elif resource_type == 'efs_file_systems':
                    for fs_id in resource_list:
                        backup_selection['Resources'].append(f'arn:aws:elasticfilesystem:{self.region}:*:file-system/{fs_id}')
            
            selection_response = backup_client.create_backup_selection(
                BackupPlanId=backup_plan_id,
                BackupSelection=backup_selection
            )
            
            backup_plans['comprehensive'] = {
                'backup_plan_id': backup_plan_id,
                'backup_selection_id': selection_response['SelectionId']
            }
            
            print("备份策略创建成功")
            
        except Exception as e:
            print(f"创建备份策略失败: {str(e)}")
        
        return backup_plans
    
    def monitor_storage_costs(self, bucket_names: List[str]) -> Dict:
        """监控存储成本"""
        cost_data = {}
        
        try:
            cloudwatch = boto3.client('cloudwatch', region_name=self.region)
            
            for bucket_name in bucket_names:
                # 获取存储桶大小指标
                response = cloudwatch.get_metric_statistics(
                    Namespace='AWS/S3',
                    MetricName='BucketSizeBytes',
                    Dimensions=[
                        {'Name': 'BucketName', 'Value': bucket_name},
                        {'Name': 'StorageType', 'Value': 'StandardStorage'}
                    ],
                    StartTime=datetime.utcnow() - timedelta(days=1),
                    EndTime=datetime.utcnow(),
                    Period=86400,  # 1天
                    Statistics=['Average']
                )
                
                if response['Datapoints']:
                    size_bytes = response['Datapoints'][-1]['Average']
                    size_gb = size_bytes / (1024 ** 3)
                    
                    # 估算成本(基于标准存储定价)
                    estimated_monthly_cost = size_gb * 0.023  # $0.023 per GB/month for Standard storage
                    
                    cost_data[bucket_name] = {
                        'size_gb': round(size_gb, 2),
                        'estimated_monthly_cost': round(estimated_monthly_cost, 2)
                    }
                
        except Exception as e:
            print(f"获取存储成本数据失败: {str(e)}")
        
        return cost_data

def main():
    """主函数 - 演示存储架构创建"""
    # 存储管理器
    storage_manager = CloudStorageManager()
    
    # S3存储桶配置
    s3_bucket_configs = [
        {
            'name': 'production-app-data',
            'versioning': True,
            'encryption': True,
            'access_logging': True,
            'lifecycle_rules': [
                LifecycleRule(
                    rule_id='data-lifecycle',
                    prefix='data/',
                    status='Enabled',
                    transitions=[
                        StorageClass('IA', 'STANDARD_IA', 30, 'Infrequent Access'),
                        StorageClass('Glacier', 'GLACIER', 90, 'Glacier'),
                        StorageClass('DeepArchive', 'DEEP_ARCHIVE', 365, 'Deep Archive')
                    ],
                    expiration_days=2555  # 7年
                )
            ],
            'tags': [
                {'Key': 'Environment', 'Value': 'Production'},
                {'Key': 'Application', 'Value': 'WebApp'},
                {'Key': 'DataClassification', 'Value': 'Sensitive'}
            ]
        },
        {
            'name': 'production-backup-data',
            'versioning': True,
            'encryption': True,
            'lifecycle_rules': [
                LifecycleRule(
                    rule_id='backup-lifecycle',
                    prefix='backups/',
                    status='Enabled',
                    transitions=[
                        StorageClass('Glacier', 'GLACIER', 1, 'Immediate Glacier'),
                        StorageClass('DeepArchive', 'DEEP_ARCHIVE', 30, 'Deep Archive')
                    ]
                )
            ],
            'tags': [
                {'Key': 'Environment', 'Value': 'Production'},
                {'Key': 'Purpose', 'Value': 'Backup'}
            ]
        }
    ]
    
    # 创建S3存储架构
    print("创建S3存储架构...")
    s3_buckets = storage_manager.create_s3_storage_architecture(s3_bucket_configs)
    
    # EFS文件系统配置
    efs_config = {
        'performance_mode': 'generalPurpose',
        'throughput_mode': 'provisioned',
        'provisioned_throughput': 100,
        'encrypted': True,
        'environment': 'production',
        'mount_targets': [
            {
                'subnet_id': 'subnet-12345678',  # 替换为实际的子网ID
                'security_groups': ['sg-12345678']  # 替换为实际的安全组ID
            }
        ],
        'lifecycle_policy': {
            'transition_to_ia': 'AFTER_30_DAYS'
        }
    }
    
    # 创建EFS文件系统
    print("创建EFS文件系统...")
    efs_result = storage_manager.create_efs_file_system('production-efs', efs_config)
    
    # FSx文件系统配置
    fsx_config = {
        'file_system_type': 'LUSTRE',
        'storage_capacity': 1200,  # GB
        'subnet_ids': ['subnet-12345678'],  # 替换为实际的子网ID
        'security_group_ids': ['sg-12345678'],  # 替换为实际的安全组ID
        'deployment_type': 'PERSISTENT_1',
        'throughput': 50
    }
    
    # 创建FSx文件系统
    print("创建FSx文件系统...")
    fsx_result = storage_manager.create_fsx_file_system('production-fsx', fsx_config)
    
    # 创建备份策略
    backup_resources = {
        'ec2_instances': ['i-1234567890abcdef0'],  # 替换为实际的实例ID
        'ebs_volumes': ['vol-1234567890abcdef0'],  # 替换为实际的卷ID
        'efs_file_systems': [efs_result['file_system_id']] if efs_result else []
    }
    
    print("创建备份策略...")
    backup_plans = storage_manager.create_backup_strategy(backup_resources)
    
    # 监控存储成本
    print("监控存储成本...")
    cost_data = storage_manager.monitor_storage_costs(list(s3_buckets.keys()))
    
    print("\n=== 存储架构创建完成 ===")
    print(f"S3存储桶: {list(s3_buckets.keys())}")
    if efs_result:
        print(f"EFS文件系统: {efs_result['file_system_id']}")
    if fsx_result:
        print(f"FSx文件系统: {fsx_result['file_system_id']}")
    print(f"备份计划: {list(backup_plans.keys())}")
    
    if cost_data:
        print("\n存储成本分析:")
        for bucket, data in cost_data.items():
            print(f"{bucket}: {data['size_gb']} GB, 预估月成本: ${data['estimated_monthly_cost']}")

if __name__ == "__main__":
    main()

安全架构实施

云安全架构需要实现多层防护,包括网络安全、身份认证、数据保护和合规性管理。

零信任安全模型

#!/bin/bash
# 云安全架构部署脚本
# 实现零信任安全模型的自动化部署

set -euo pipefail

# 配置变量
REGION="us-west-2"
ENVIRONMENT="production"
PROJECT_NAME="secure-cloud-platform"

# 日志函数
log() {
    echo "[$(date +'%Y-%m-%d %H:%M:%S')] $1"
}

# 错误处理
error_exit() {
    log "错误: $1"
    exit 1
}

# 创建IAM角色和策略
create_iam_security_roles() {
    log "创建IAM安全角色和策略..."
    
    # 创建零信任访问策略
    cat > zero-trust-policy.json << 'EOF'
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Deny",
            "Action": "*",
            "Resource": "*",
            "Condition": {
                "Bool": {
                    "aws:SecureTransport": "false"
                }
            }
        },
        {
            "Effect": "Deny",
            "Action": "*",
            "Resource": "*",
            "Condition": {
                "IpAddressIfExists": {
                    "aws:SourceIp": [
                        "0.0.0.0/0"
                    ]
                },
                "StringNotEquals": {
                    "aws:RequestedRegion": [
                        "us-west-2",
                        "us-east-1"
                    ]
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        }
    ]
}
EOF

    # 创建策略
    aws iam create-policy \
        --policy-name ZeroTrustSecurityPolicy \
        --policy-document file://zero-trust-policy.json \
        --description "Zero Trust Security Policy" || true
    
    # 创建安全审计角色
    cat > security-audit-trust-policy.json << 'EOF'
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "lambda.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
EOF

    aws iam create-role \
        --role-name SecurityAuditRole \
        --assume-role-policy-document file://security-audit-trust-policy.json \
        --description "Role for security audit functions" || true
    
    # 附加策略到角色
    aws iam attach-role-policy \
        --role-name SecurityAuditRole \
        --policy-arn arn:aws:iam::aws:policy/SecurityAudit || true
    
    log "IAM安全角色创建完成"
}

# 配置VPC安全组
configure_security_groups() {
    log "配置VPC安全组..."
    
    # 获取默认VPC ID
    VPC_ID=$(aws ec2 describe-vpcs \
        --filters "Name=is-default,Values=true" \
        --query 'Vpcs[0].VpcId' \
        --output text)
    
    if [ "$VPC_ID" = "None" ]; then
        error_exit "未找到默认VPC"
    fi
    
    # 创建Web层安全组
    WEB_SG_ID=$(aws ec2 create-security-group \
        --group-name "${PROJECT_NAME}-web-sg" \
        --description "Web tier security group with zero trust principles" \
        --vpc-id "$VPC_ID" \
        --query 'GroupId' \
        --output text 2>/dev/null || echo "exists")
    
    if [ "$WEB_SG_ID" != "exists" ]; then
        # 添加HTTPS入站规则
        aws ec2 authorize-security-group-ingress \
            --group-id "$WEB_SG_ID" \
            --protocol tcp \
            --port 443 \
            --cidr 0.0.0.0/0 || true
        
        # 添加HTTP重定向规则
        aws ec2 authorize-security-group-ingress \
            --group-id "$WEB_SG_ID" \
            --protocol tcp \
            --port 80 \
            --cidr 0.0.0.0/0 || true
        
        log "Web层安全组创建完成: $WEB_SG_ID"
    fi
    
    # 创建应用层安全组
    APP_SG_ID=$(aws ec2 create-security-group \
        --group-name "${PROJECT_NAME}-app-sg" \
        --description "Application tier security group" \
        --vpc-id "$VPC_ID" \
        --query 'GroupId' \
        --output text 2>/dev/null || echo "exists")
    
    if [ "$APP_SG_ID" != "exists" ]; then
        # 仅允许来自Web层的流量
        aws ec2 authorize-security-group-ingress \
            --group-id "$APP_SG_ID" \
            --protocol tcp \
            --port 8080 \
            --source-group "$WEB_SG_ID" || true
        
        log "应用层安全组创建完成: $APP_SG_ID"
    fi
    
    # 创建数据库层安全组
    DB_SG_ID=$(aws ec2 create-security-group \
        --group-name "${PROJECT_NAME}-db-sg" \
        --description "Database tier security group" \
        --vpc-id "$VPC_ID" \
        --query 'GroupId' \
        --output text 2>/dev/null || echo "exists")
    
    if [ "$DB_SG_ID" != "exists" ]; then
        # 仅允许来自应用层的数据库连接
        aws ec2 authorize-security-group-ingress \
            --group-id "$DB_SG_ID" \
            --protocol tcp \
            --port 3306 \
            --source-group "$APP_SG_ID" || true
        
        aws ec2 authorize-security-group-ingress \
            --group-id "$DB_SG_ID" \
            --protocol tcp \
            --port 5432 \
            --source-group "$APP_SG_ID" || true
        
        log "数据库层安全组创建完成: $DB_SG_ID"
    fi
}

# 配置WAF和Shield
configure_waf_protection() {
    log "配置WAF和DDoS防护..."
    
    # 创建WAF Web ACL
    cat > waf-rules.json << 'EOF'
{
    "Name": "ProductionWebACL",
    "Scope": "CLOUDFRONT",
    "DefaultAction": {
        "Allow": {}
    },
    "Rules": [
        {
            "Name": "AWSManagedRulesCommonRuleSet",
            "Priority": 1,
            "OverrideAction": {
                "None": {}
            },
            "Statement": {
                "ManagedRuleGroupStatement": {
                    "VendorName": "AWS",
                    "Name": "AWSManagedRulesCommonRuleSet"
                }
            },
            "VisibilityConfig": {
                "SampledRequestsEnabled": true,
                "CloudWatchMetricsEnabled": true,
                "MetricName": "CommonRuleSetMetric"
            }
        },
        {
            "Name": "AWSManagedRulesKnownBadInputsRuleSet",
            "Priority": 2,
            "OverrideAction": {
                "None": {}
            },
            "Statement": {
                "ManagedRuleGroupStatement": {
                    "VendorName": "AWS",
                    "Name": "AWSManagedRulesKnownBadInputsRuleSet"
                }
            },
            "VisibilityConfig": {
                "SampledRequestsEnabled": true,
                "CloudWatchMetricsEnabled": true,
                "MetricName": "KnownBadInputsMetric"
            }
        }
    ],
    "VisibilityConfig": {
        "SampledRequestsEnabled": true,
        "CloudWatchMetricsEnabled": true,
        "MetricName": "ProductionWebACLMetric"
    }
}
EOF

    # 创建WAF Web ACL
    aws wafv2 create-web-acl \
        --scope CLOUDFRONT \
        --cli-input-json file://waf-rules.json \
        --region us-east-1 || true
    
    log "WAF配置完成"
}

# 配置CloudTrail审计
configure_cloudtrail() {
    log "配置CloudTrail审计..."
    
    # 创建CloudTrail
    aws cloudtrail create-trail \
        --name "${PROJECT_NAME}-audit-trail" \
        --s3-bucket-name "${PROJECT_NAME}-audit-logs" \
        --include-global-service-events \
        --is-multi-region-trail \
        --enable-log-file-validation || true
    
    # 启动CloudTrail
    aws cloudtrail start-logging \
        --name "${PROJECT_NAME}-audit-trail" || true
    
    log "CloudTrail审计配置完成"
}

# 主函数
main() {
    log "开始部署云安全架构..."
    
    create_iam_security_roles
    configure_security_groups
    configure_waf_protection
    configure_cloudtrail
    
    log "云安全架构部署完成"
}

# 执行主函数
main "$@"

监控与可观测性

完善的监控体系是确保云基础设施稳定运行的关键。

综合监控系统

#!/usr/bin/env python3
"""
云基础设施监控系统
提供全方位的监控、告警和可观测性功能
"""

import boto3
import json
import time
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class MetricConfig:
    """监控指标配置"""
    name: str
    namespace: str
    metric_name: str
    dimensions: List[Dict]
    statistic: str
    threshold: float
    comparison_operator: str
    evaluation_periods: int = 2
    period: int = 300

@dataclass
class AlarmConfig:
    """告警配置"""
    alarm_name: str
    alarm_description: str
    metric_config: MetricConfig
    alarm_actions: List[str]
    ok_actions: List[str] = None
    treat_missing_data: str = 'breaching'

class CloudMonitoringSystem:
    """云监控系统"""
    
    def __init__(self, region: str = 'us-west-2'):
        self.cloudwatch = boto3.client('cloudwatch', region_name=region)
        self.sns = boto3.client('sns', region_name=region)
        self.logs = boto3.client('logs', region_name=region)
        self.region = region
    
    def create_monitoring_dashboard(self, dashboard_name: str, widgets: List[Dict]) -> bool:
        """创建监控仪表板"""
        try:
            dashboard_body = {
                "widgets": widgets
            }
            
            self.cloudwatch.put_dashboard(
                DashboardName=dashboard_name,
                DashboardBody=json.dumps(dashboard_body)
            )
            
            print(f"监控仪表板 {dashboard_name} 创建成功")
            return True
            
        except Exception as e:
            print(f"创建监控仪表板失败: {str(e)}")
            return False
    
    def create_comprehensive_alarms(self, resource_configs: Dict) -> Dict:
        """创建综合告警"""
        created_alarms = {}
        
        # 创建SNS主题用于告警通知
        sns_topic_arn = self._create_sns_topic('infrastructure-alerts')
        
        # EC2实例告警
        if 'ec2_instances' in resource_configs:
            for instance_id in resource_configs['ec2_instances']:
                alarms = self._create_ec2_alarms(instance_id, sns_topic_arn)
                created_alarms[f'ec2-{instance_id}'] = alarms
        
        # RDS数据库告警
        if 'rds_instances' in resource_configs:
            for db_instance_id in resource_configs['rds_instances']:
                alarms = self._create_rds_alarms(db_instance_id, sns_topic_arn)
                created_alarms[f'rds-{db_instance_id}'] = alarms
        
        # 负载均衡器告警
        if 'load_balancers' in resource_configs:
            for lb_arn in resource_configs['load_balancers']:
                alarms = self._create_alb_alarms(lb_arn, sns_topic_arn)
                created_alarms[f'alb-{lb_arn.split("/")[-1]}'] = alarms
        
        return created_alarms
    
    def _create_sns_topic(self, topic_name: str) -> str:
        """创建SNS主题"""
        try:
            response = self.sns.create_topic(Name=topic_name)
            topic_arn = response['TopicArn']
            
            # 订阅邮件通知(需要确认)
            self.sns.subscribe(
                TopicArn=topic_arn,
                Protocol='email',
                Endpoint='admin@example.com'  # 替换为实际邮箱
            )
            
            return topic_arn
            
        except Exception as e:
            print(f"创建SNS主题失败: {str(e)}")
            return None
    
    def _create_ec2_alarms(self, instance_id: str, sns_topic_arn: str) -> List[str]:
        """创建EC2实例告警"""
        alarm_names = []
        
        # CPU使用率告警
        cpu_alarm_config = AlarmConfig(
            alarm_name=f'{instance_id}-high-cpu',
            alarm_description=f'High CPU utilization for instance {instance_id}',
            metric_config=MetricConfig(
                name='CPUUtilization',
                namespace='AWS/EC2',
                metric_name='CPUUtilization',
                dimensions=[{'Name': 'InstanceId', 'Value': instance_id}],
                statistic='Average',
                threshold=80.0,
                comparison_operator='GreaterThanThreshold'
            ),
            alarm_actions=[sns_topic_arn]
        )
        
        if self._create_alarm(cpu_alarm_config):
            alarm_names.append(cpu_alarm_config.alarm_name)
        
        # 状态检查告警
        status_alarm_config = AlarmConfig(
            alarm_name=f'{instance_id}-status-check',
            alarm_description=f'Instance status check failed for {instance_id}',
            metric_config=MetricConfig(
                name='StatusCheckFailed',
                namespace='AWS/EC2',
                metric_name='StatusCheckFailed',
                dimensions=[{'Name': 'InstanceId', 'Value': instance_id}],
                statistic='Maximum',
                threshold=0.0,
                comparison_operator='GreaterThanThreshold'
            ),
            alarm_actions=[sns_topic_arn]
        )
        
        if self._create_alarm(status_alarm_config):
            alarm_names.append(status_alarm_config.alarm_name)
        
        return alarm_names
    
    def _create_alarm(self, config: AlarmConfig) -> bool:
        """创建CloudWatch告警"""
        try:
            self.cloudwatch.put_metric_alarm(
                AlarmName=config.alarm_name,
                AlarmDescription=config.alarm_description,
                ActionsEnabled=True,
                AlarmActions=config.alarm_actions,
                OKActions=config.ok_actions or [],
                MetricName=config.metric_config.metric_name,
                Namespace=config.metric_config.namespace,
                Statistic=config.metric_config.statistic,
                Dimensions=config.metric_config.dimensions,
                Period=config.metric_config.period,
                EvaluationPeriods=config.metric_config.evaluation_periods,
                Threshold=config.metric_config.threshold,
                ComparisonOperator=config.metric_config.comparison_operator,
                TreatMissingData=config.treat_missing_data
            )
            
            print(f"告警 {config.alarm_name} 创建成功")
            return True
            
        except Exception as e:
            print(f"创建告警 {config.alarm_name} 失败: {str(e)}")
            return False
    
    def create_log_groups(self, log_group_configs: List[Dict]) -> Dict:
        """创建日志组"""
        created_log_groups = {}
        
        for config in log_group_configs:
            try:
                log_group_name = config['name']
                
                self.logs.create_log_group(
                    logGroupName=log_group_name,
                    tags=config.get('tags', {})
                )
                
                # 设置保留期
                if 'retention_days' in config:
                    self.logs.put_retention_policy(
                        logGroupName=log_group_name,
                        retentionInDays=config['retention_days']
                    )
                
                created_log_groups[log_group_name] = {
                    'retention_days': config.get('retention_days'),
                    'tags': config.get('tags', {})
                }
                
                print(f"日志组 {log_group_name} 创建成功")
                
            except Exception as e:
                print(f"创建日志组 {config['name']} 失败: {str(e)}")
        
        return created_log_groups
    
    def setup_custom_metrics(self) -> bool:
        """设置自定义指标"""
        try:
            # 发送自定义指标示例
            self.cloudwatch.put_metric_data(
                Namespace='CustomApp/Performance',
                MetricData=[
                    {
                        'MetricName': 'ApplicationResponseTime',
                        'Value': 150.0,
                        'Unit': 'Milliseconds',
                        'Timestamp': datetime.utcnow(),
                        'Dimensions': [
                            {'Name': 'Environment', 'Value': 'Production'},
                            {'Name': 'Service', 'Value': 'WebAPI'}
                        ]
                    },
                    {
                        'MetricName': 'ActiveUserSessions',
                        'Value': 1250,
                        'Unit': 'Count',
                        'Timestamp': datetime.utcnow(),
                        'Dimensions': [
                            {'Name': 'Environment', 'Value': 'Production'},
                            {'Name': 'Application', 'Value': 'WebApp'}
                        ]
                    }
                ]
            )
            
            print("自定义指标设置成功")
            return True
            
        except Exception as e:
            print(f"设置自定义指标失败: {str(e)}")
            return False

def main():
    """主函数 - 演示监控系统创建"""
    # 监控系统
    monitoring = CloudMonitoringSystem()
    
    # 创建仪表板组件
    dashboard_widgets = [
        {
            "type": "metric",
            "x": 0, "y": 0,
            "width": 12, "height": 6,
            "properties": {
                "metrics": [
                    ["AWS/EC2", "CPUUtilization", "InstanceId", "i-1234567890abcdef0"],
                    ["AWS/ApplicationELB", "TargetResponseTime", "LoadBalancer", "app/my-load-balancer/50dc6c495c0c9188"]
                ],
                "period": 300,
                "stat": "Average",
                "region": "us-west-2",
                "title": "EC2 and ALB Performance"
            }
        },
        {
            "type": "log",
            "x": 0, "y": 6,
            "width": 24, "height": 6,
            "properties": {
                "query": "SOURCE '/aws/lambda/my-function' | fields @timestamp, @message | sort @timestamp desc | limit 20",
                "region": "us-west-2",
                "title": "Recent Application Logs"
            }
        }
    ]
    
    # 创建监控仪表板
    print("创建监控仪表板...")
    monitoring.create_monitoring_dashboard('infrastructure-overview', dashboard_widgets)
    
    # 资源配置
    resource_configs = {
        'ec2_instances': ['i-1234567890abcdef0'],  # 替换为实际实例ID
        'rds_instances': ['mydb-instance'],  # 替换为实际RDS实例ID
        'load_balancers': ['arn:aws:elasticloadbalancing:us-west-2:123456789012:loadbalancer/app/my-load-balancer/50dc6c495c0c9188']
    }
    
    # 创建告警
    print("创建综合告警...")
    alarms = monitoring.create_comprehensive_alarms(resource_configs)
    
    # 日志组配置
    log_group_configs = [
        {
            'name': '/aws/ec2/application',
            'retention_days': 30,
            'tags': {'Environment': 'Production', 'Service': 'Application'}
        },
        {
            'name': '/aws/lambda/security-audit',
            'retention_days': 90,
            'tags': {'Environment': 'Production', 'Service': 'Security'}
        }
    ]
    
    # 创建日志组
    print("创建日志组...")
    log_groups = monitoring.create_log_groups(log_group_configs)
    
    # 设置自定义指标
    print("设置自定义指标...")
    monitoring.setup_custom_metrics()
    
    print("\n=== 监控系统创建完成 ===")
    print(f"告警数量: {sum(len(alarms) for alarms in alarms.values())}")
    print(f"日志组: {list(log_groups.keys())}")

if __name__ == "__main__":
    main()

基础设施即代码

使用Terraform实现基础设施的版本化管理和自动化部署。

Terraform配置模板

# main.tf - 主要的基础设施配置
terraform {
  required_version = ">= 1.0"
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }
  
  backend "s3" {
    bucket = "terraform-state-bucket"
    key    = "infrastructure/terraform.tfstate"
    region = "us-west-2"
    encrypt = true
    dynamodb_table = "terraform-locks"
  }
}

provider "aws" {
  region = var.aws_region
  
  default_tags {
    tags = {
      Environment = var.environment
      Project     = var.project_name
      ManagedBy   = "Terraform"
    }
  }
}

# 变量定义
variable "aws_region" {
  description = "AWS region"
  type        = string
  default     = "us-west-2"
}

variable "environment" {
  description = "Environment name"
  type        = string
  default     = "production"
}

variable "project_name" {
  description = "Project name"
  type        = string
  default     = "cloud-infrastructure"
}

variable "vpc_cidr" {
  description = "VPC CIDR block"
  type        = string
  default     = "10.0.0.0/16"
}

# VPC模块
module "vpc" {
  source = "./modules/vpc"
  
  name               = "${var.project_name}-vpc"
  cidr               = var.vpc_cidr
  availability_zones = ["${var.aws_region}a", "${var.aws_region}b", "${var.aws_region}c"]
  
  public_subnets   = ["10.0.1.0/24", "10.0.2.0/24", "10.0.3.0/24"]
  private_subnets  = ["10.0.10.0/24", "10.0.11.0/24", "10.0.12.0/24"]
  database_subnets = ["10.0.20.0/24", "10.0.21.0/24", "10.0.22.0/24"]
  
  enable_nat_gateway = true
  enable_vpn_gateway = false
  enable_dns_hostnames = true
  enable_dns_support = true
  
  tags = {
    Environment = var.environment
    Project     = var.project_name
  }
}

# 安全组模块
module "security_groups" {
  source = "./modules/security-groups"
  
  vpc_id = module.vpc.vpc_id
  
  web_ingress_rules = [
    {
      from_port   = 80
      to_port     = 80
      protocol    = "tcp"
      cidr_blocks = ["0.0.0.0/0"]
    },
    {
      from_port   = 443
      to_port     = 443
      protocol    = "tcp"
      cidr_blocks = ["0.0.0.0/0"]
    }
  ]
  
  tags = {
    Environment = var.environment
    Project     = var.project_name
  }
}

# 计算资源模块
module "compute" {
  source = "./modules/compute"
  
  vpc_id             = module.vpc.vpc_id
  private_subnet_ids = module.vpc.private_subnets
  public_subnet_ids  = module.vpc.public_subnets
  
  web_security_group_id = module.security_groups.web_security_group_id
  app_security_group_id = module.security_groups.app_security_group_id
  
  instance_type = "t3.medium"
  min_size      = 2
  max_size      = 10
  desired_size  = 3
  
  tags = {
    Environment = var.environment
    Project     = var.project_name
  }
}

# 数据库模块
module "database" {
  source = "./modules/database"
  
  vpc_id               = module.vpc.vpc_id
  database_subnet_ids  = module.vpc.database_subnets
  database_security_group_id = module.security_groups.database_security_group_id
  
  engine         = "mysql"
  engine_version = "8.0"
  instance_class = "db.t3.medium"
  allocated_storage = 100
  
  db_name  = "production_db"
  username = "admin"
  
  backup_retention_period = 7
  backup_window          = "03:00-04:00"
  maintenance_window     = "sun:04:00-sun:05:00"
  
  multi_az               = true
  storage_encrypted      = true
  deletion_protection    = true
  
  tags = {
    Environment = var.environment
    Project     = var.project_name
  }
}

# 监控模块
module "monitoring" {
  source = "./modules/monitoring"
  
  project_name = var.project_name
  environment  = var.environment
  
  ec2_instance_ids = module.compute.instance_ids
  rds_instance_id  = module.database.db_instance_id
  alb_arn         = module.compute.load_balancer_arn
  
  notification_email = "admin@example.com"
  
  tags = {
    Environment = var.environment
    Project     = var.project_name
  }
}

# 输出
output "vpc_id" {
  description = "VPC ID"
  value       = module.vpc.vpc_id
}

output "load_balancer_dns" {
  description = "Load balancer DNS name"
  value       = module.compute.load_balancer_dns_name
}

output "database_endpoint" {
  description = "Database endpoint"
  value       = module.database.db_instance_endpoint
  sensitive   = true
}

output "monitoring_dashboard_url" {
  description = "CloudWatch dashboard URL"
  value       = module.monitoring.dashboard_url
}

成本优化策略

实施有效的成本优化策略,确保云基础设施的经济性。

成本优化自动化

#!/usr/bin/env python3
"""
云成本优化管理器
提供成本分析、资源优化建议和自动化成本控制功能
"""

import boto3
import json
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import pandas as pd

@dataclass
class CostOptimizationRule:
    """成本优化规则"""
    name: str
    resource_type: str
    condition: str
    action: str
    threshold: float
    enabled: bool = True

class CloudCostOptimizer:
    """云成本优化器"""
    
    def __init__(self, region: str = 'us-west-2'):
        self.ce = boto3.client('ce', region_name='us-east-1')  # Cost Explorer只在us-east-1可用
        self.ec2 = boto3.client('ec2', region_name=region)
        self.rds = boto3.client('rds', region_name=region)
        self.cloudwatch = boto3.client('cloudwatch', region_name=region)
        self.region = region
    
    def analyze_cost_trends(self, days: int = 30) -> Dict:
        """分析成本趋势"""
        try:
            end_date = datetime.now().strftime('%Y-%m-%d')
            start_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
            
            # 获取总成本
            response = self.ce.get_cost_and_usage(
                TimePeriod={
                    'Start': start_date,
                    'End': end_date
                },
                Granularity='DAILY',
                Metrics=['BlendedCost'],
                GroupBy=[
                    {'Type': 'DIMENSION', 'Key': 'SERVICE'}
                ]
            )
            
            cost_data = {}
            total_cost = 0
            
            for result in response['ResultsByTime']:
                date = result['TimePeriod']['Start']
                daily_total = 0
                
                for group in result['Groups']:
                    service = group['Keys'][0]
                    cost = float(group['Metrics']['BlendedCost']['Amount'])
                    daily_total += cost
                    
                    if service not in cost_data:
                        cost_data[service] = []
                    cost_data[service].append({'date': date, 'cost': cost})
                
                total_cost += daily_total
            
            return {
                'total_cost': round(total_cost, 2),
                'daily_average': round(total_cost / days, 2),
                'service_breakdown': cost_data
            }
            
        except Exception as e:
            print(f"分析成本趋势失败: {str(e)}")
            return {}
    
    def identify_unused_resources(self) -> Dict:
        """识别未使用的资源"""
        unused_resources = {
            'ec2_instances': [],
            'ebs_volumes': [],
            'elastic_ips': [],
            'load_balancers': []
        }
        
        try:
            # 检查停止的EC2实例
            ec2_response = self.ec2.describe_instances(
                Filters=[
                    {'Name': 'instance-state-name', 'Values': ['stopped']}
                ]
            )
            
            for reservation in ec2_response['Reservations']:
                for instance in reservation['Instances']:
                    # 检查实例停止时间
                    if 'StateTransitionReason' in instance:
                        unused_resources['ec2_instances'].append({
                            'instance_id': instance['InstanceId'],
                            'instance_type': instance['InstanceType'],
                            'state': instance['State']['Name'],
                            'launch_time': instance['LaunchTime']
                        })
            
            # 检查未附加的EBS卷
            ebs_response = self.ec2.describe_volumes(
                Filters=[
                    {'Name': 'status', 'Values': ['available']}
                ]
            )
            
            for volume in ebs_response['Volumes']:
                unused_resources['ebs_volumes'].append({
                    'volume_id': volume['VolumeId'],
                    'size': volume['Size'],
                    'volume_type': volume['VolumeType'],
                    'create_time': volume['CreateTime']
                })
            
            # 检查未关联的弹性IP
            eip_response = self.ec2.describe_addresses()
            
            for address in eip_response['Addresses']:
                if 'InstanceId' not in address and 'NetworkInterfaceId' not in address:
                    unused_resources['elastic_ips'].append({
                        'allocation_id': address['AllocationId'],
                        'public_ip': address['PublicIp']
                    })
            
            print("未使用资源识别完成")
            
        except Exception as e:
            print(f"识别未使用资源失败: {str(e)}")
        
        return unused_resources
    
    def get_rightsizing_recommendations(self) -> List[Dict]:
        """获取实例规格优化建议"""
        recommendations = []
        
        try:
            # 获取过去14天的CPU利用率数据
            end_time = datetime.utcnow()
            start_time = end_time - timedelta(days=14)
            
            # 获取所有运行中的实例
            instances_response = self.ec2.describe_instances(
                Filters=[
                    {'Name': 'instance-state-name', 'Values': ['running']}
                ]
            )
            
            for reservation in instances_response['Reservations']:
                for instance in reservation['Instances']:
                    instance_id = instance['InstanceId']
                    instance_type = instance['InstanceType']
                    
                    # 获取CPU利用率指标
                    cpu_response = self.cloudwatch.get_metric_statistics(
                        Namespace='AWS/EC2',
                        MetricName='CPUUtilization',
                        Dimensions=[
                            {'Name': 'InstanceId', 'Value': instance_id}
                        ],
                        StartTime=start_time,
                        EndTime=end_time,
                        Period=3600,  # 1小时
                        Statistics=['Average', 'Maximum']
                    )
                    
                    if cpu_response['Datapoints']:
                        avg_cpu = sum(dp['Average'] for dp in cpu_response['Datapoints']) / len(cpu_response['Datapoints'])
                        max_cpu = max(dp['Maximum'] for dp in cpu_response['Datapoints'])
                        
                        # 生成优化建议
                        recommendation = self._generate_rightsizing_recommendation(
                            instance_id, instance_type, avg_cpu, max_cpu
                        )
                        
                        if recommendation:
                            recommendations.append(recommendation)
            
        except Exception as e:
            print(f"获取规格优化建议失败: {str(e)}")
        
        return recommendations
    
    def _generate_rightsizing_recommendation(self, instance_id: str, current_type: str, 
                                           avg_cpu: float, max_cpu: float) -> Optional[Dict]:
        """生成实例规格优化建议"""
        # 实例类型映射(简化版)
        instance_families = {
            't3.nano': {'cpu': 2, 'memory': 0.5, 'cost_factor': 1},
            't3.micro': {'cpu': 2, 'memory': 1, 'cost_factor': 2},
            't3.small': {'cpu': 2, 'memory': 2, 'cost_factor': 4},
            't3.medium': {'cpu': 2, 'memory': 4, 'cost_factor': 8},
            't3.large': {'cpu': 2, 'memory': 8, 'cost_factor': 16},
            't3.xlarge': {'cpu': 4, 'memory': 16, 'cost_factor': 32}
        }
        
        if current_type not in instance_families:
            return None
        
        current_specs = instance_families[current_type]
        
        # 优化逻辑
        if avg_cpu < 10 and max_cpu < 30:
            # CPU使用率很低,建议降级
            for instance_type, specs in instance_families.items():
                if (specs['cost_factor'] < current_specs['cost_factor'] and 
                    specs['cpu'] >= 2 and specs['memory'] >= 1):
                    
                    potential_savings = (current_specs['cost_factor'] - specs['cost_factor']) / current_specs['cost_factor'] * 100
                    
                    return {
                        'instance_id': instance_id,
                        'current_type': current_type,
                        'recommended_type': instance_type,
                        'avg_cpu_utilization': round(avg_cpu, 2),
                        'max_cpu_utilization': round(max_cpu, 2),
                        'potential_savings_percent': round(potential_savings, 2),
                        'reason': 'Low CPU utilization detected'
                    }
        
        elif avg_cpu > 70 or max_cpu > 90:
            # CPU使用率很高,建议升级
            for instance_type, specs in instance_families.items():
                if specs['cost_factor'] > current_specs['cost_factor']:
                    return {
                        'instance_id': instance_id,
                        'current_type': current_type,
                        'recommended_type': instance_type,
                        'avg_cpu_utilization': round(avg_cpu, 2),
                        'max_cpu_utilization': round(max_cpu, 2),
                        'reason': 'High CPU utilization detected - performance risk'
                    }
        
        return None
    
    def create_cost_budget(self, budget_name: str, budget_amount: float, 
                          notification_email: str) -> bool:
        """创建成本预算"""
        try:
            budgets_client = boto3.client('budgets', region_name='us-east-1')
            
            budget = {
                'BudgetName': budget_name,
                'BudgetLimit': {
                    'Amount': str(budget_amount),
                    'Unit': 'USD'
                },
                'TimeUnit': 'MONTHLY',
                'BudgetType': 'COST',
                'CostFilters': {
                    'Service': ['Amazon Elastic Compute Cloud - Compute']
                }
            }
            
            # 创建预算通知
            notifications = [
                {
                    'Notification': {
                        'NotificationType': 'ACTUAL',
                        'ComparisonOperator': 'GREATER_THAN',
                        'Threshold': 80.0,
                        'ThresholdType': 'PERCENTAGE'
                    },
                    'Subscribers': [
                        {
                            'SubscriptionType': 'EMAIL',
                            'Address': notification_email
                        }
                    ]
                },
                {
                    'Notification': {
                        'NotificationType': 'FORECASTED',
                        'ComparisonOperator': 'GREATER_THAN',
                        'Threshold': 100.0,
                        'ThresholdType': 'PERCENTAGE'
                    },
                    'Subscribers': [
                        {
                            'SubscriptionType': 'EMAIL',
                            'Address': notification_email
                        }
                    ]
                }
            ]
            
            budgets_client.create_budget(
                AccountId='123456789012',  # 替换为实际账户ID
                Budget=budget,
                NotificationsWithSubscribers=notifications
            )
            
            print(f"成本预算 {budget_name} 创建成功")
            return True
            
        except Exception as e:
            print(f"创建成本预算失败: {str(e)}")
            return False

def main():
    """主函数 - 演示成本优化"""
    # 成本优化器
    cost_optimizer = CloudCostOptimizer()
    
    # 分析成本趋势
    print("分析成本趋势...")
    cost_trends = cost_optimizer.analyze_cost_trends(30)
    
    if cost_trends:
        print(f"过去30天总成本: ${cost_trends['total_cost']}")
        print(f"日均成本: ${cost_trends['daily_average']}")
    
    # 识别未使用资源
    print("\n识别未使用资源...")
    unused_resources = cost_optimizer.identify_unused_resources()
    
    for resource_type, resources in unused_resources.items():
        if resources:
            print(f"{resource_type}: {len(resources)} 个未使用资源")
    
    # 获取规格优化建议
    print("\n获取实例规格优化建议...")
    recommendations = cost_optimizer.get_rightsizing_recommendations()
    
    for rec in recommendations:
        print(f"实例 {rec['instance_id']}: {rec['current_type']} -> {rec['recommended_type']}")
        if 'potential_savings_percent' in rec:
            print(f"  潜在节省: {rec['potential_savings_percent']}%")
        print(f"  原因: {rec['reason']}")
    
    # 创建成本预算
    print("\n创建成本预算...")
    cost_optimizer.create_cost_budget(
        'monthly-infrastructure-budget',
        1000.0,  # $1000/月
        'admin@example.com'
    )
    
    print("\n=== 成本优化分析完成 ===")

if __name__ == "__main__":
    main()

总结

云基础设施架构设计是一个复杂的系统工程,需要综合考虑以下核心要素:

架构设计核心要素

  1. 网络架构

    • VPC设计和子网规划
    • 安全组和网络ACL配置
    • 负载均衡和流量分发
    • NAT Gateway和Internet Gateway管理
  2. 计算资源管理

    • 自动扩缩容策略
    • 实例类型选择和优化
    • 启动模板和配置管理
    • 容器化和编排平台
  3. 存储架构

    • 多层存储策略
    • 数据生命周期管理
    • 备份和灾难恢复
    • 性能优化和成本控制
  4. 安全架构

    • 零信任安全模型
    • 身份认证和访问控制
    • 数据加密和网络安全
    • 审计和合规性管理
  5. 监控和可观测性

    • 全方位监控指标
    • 智能告警和通知
    • 日志聚合和分析
    • 性能优化建议
  6. 基础设施即代码

    • Terraform模块化设计
    • 版本控制和变更管理
    • 自动化部署流水线
    • 环境一致性保证

最佳实践建议

  1. 设计原则

    • 采用模块化和可重用的架构设计
    • 实施多层防护和冗余机制
    • 优先考虑自动化和标准化
    • 持续优化和迭代改进
  2. 运维管理

    • 建立完善的监控和告警体系
    • 实施自动化的故障恢复机制
    • 定期进行安全审计和合规检查
    • 持续进行成本优化和资源调整
  3. 团队协作

    • 建立清晰的角色和责任分工
    • 实施代码审查和变更控制流程
    • 提供充分的文档和培训
    • 建立有效的沟通和协作机制

通过系统性的架构设计和实施,可以构建出高可用、高性能、安全可靠且成本优化的现代云基础设施平台,为业务发展提供强有力的技术支撑。


本文提供的代码示例和配置模板可以作为实际项目的起点,但在生产环境中使用前,请根据具体需求进行适当的调整和测试。 –port 3306
–source-group “$APP_SG_ID” || true

    aws ec2 authorize-security-group-ingress \
        --group-id "$DB_SG_ID" \
        --protocol tcp \

分享文章