目录
云基础设施架构概述
现代云基础设施架构需要满足高可用性、可扩展性、安全性和成本效益等多重要求。本文将深入探讨如何设计和实施一个完整的云基础设施架构。
架构设计原则
# 云基础设施架构设计原则
architecture_principles:
scalability:
horizontal_scaling: true
auto_scaling: true
load_balancing: true
availability:
multi_az_deployment: true
disaster_recovery: true
fault_tolerance: true
security:
defense_in_depth: true
zero_trust_model: true
encryption_everywhere: true
cost_optimization:
resource_rightsizing: true
reserved_instances: true
spot_instances: true
operational_excellence:
infrastructure_as_code: true
automated_deployment: true
monitoring_alerting: true
架构层次模型
graph TB
subgraph "应用层"
A[Web应用] --> B[API服务]
B --> C[微服务]
end
subgraph "平台层"
D[容器编排] --> E[服务网格]
E --> F[API网关]
end
subgraph "基础设施层"
G[计算资源] --> H[存储系统]
H --> I[网络架构]
end
subgraph "安全层"
J[身份认证] --> K[访问控制]
K --> L[数据加密]
end
A --> D
D --> G
G --> J
网络架构设计
网络架构是云基础设施的核心组件,需要提供安全、高性能和可扩展的连接能力。
VPC网络设计
#!/usr/bin/env python3
"""
云网络架构管理器
提供VPC、子网、路由表和安全组的自动化管理功能
"""
import boto3
import json
from typing import Dict, List, Optional
from dataclasses import dataclass
import ipaddress
@dataclass
class SubnetConfig:
"""子网配置"""
name: str
cidr: str
availability_zone: str
subnet_type: str # public, private, database
route_table: str
@dataclass
class VPCConfig:
"""VPC配置"""
name: str
cidr: str
enable_dns_hostnames: bool = True
enable_dns_support: bool = True
subnets: List[SubnetConfig] = None
class CloudNetworkManager:
"""云网络架构管理器"""
def __init__(self, region: str = 'us-west-2'):
self.ec2 = boto3.client('ec2', region_name=region)
self.region = region
def create_vpc_architecture(self, config: VPCConfig) -> Dict:
"""创建完整的VPC架构"""
try:
# 创建VPC
vpc_response = self.ec2.create_vpc(
CidrBlock=config.cidr,
TagSpecifications=[{
'ResourceType': 'vpc',
'Tags': [{'Key': 'Name', 'Value': config.name}]
}]
)
vpc_id = vpc_response['Vpc']['VpcId']
# 启用DNS解析
self.ec2.modify_vpc_attribute(
VpcId=vpc_id,
EnableDnsHostnames={'Value': config.enable_dns_hostnames}
)
self.ec2.modify_vpc_attribute(
VpcId=vpc_id,
EnableDnsSupport={'Value': config.enable_dns_support}
)
# 创建Internet Gateway
igw_response = self.ec2.create_internet_gateway(
TagSpecifications=[{
'ResourceType': 'internet-gateway',
'Tags': [{'Key': 'Name', 'Value': f'{config.name}-igw'}]
}]
)
igw_id = igw_response['InternetGateway']['InternetGatewayId']
# 附加Internet Gateway到VPC
self.ec2.attach_internet_gateway(
InternetGatewayId=igw_id,
VpcId=vpc_id
)
# 创建子网
subnets = {}
for subnet_config in config.subnets:
subnet_id = self._create_subnet(vpc_id, subnet_config)
subnets[subnet_config.name] = subnet_id
# 创建路由表
route_tables = self._create_route_tables(vpc_id, config.name, igw_id)
# 关联子网到路由表
self._associate_subnets_to_route_tables(subnets, route_tables, config.subnets)
# 创建NAT Gateway
nat_gateways = self._create_nat_gateways(subnets, config.subnets)
# 更新私有路由表
self._update_private_route_tables(route_tables, nat_gateways)
return {
'vpc_id': vpc_id,
'internet_gateway_id': igw_id,
'subnets': subnets,
'route_tables': route_tables,
'nat_gateways': nat_gateways
}
except Exception as e:
print(f"创建VPC架构失败: {str(e)}")
return None
def _create_subnet(self, vpc_id: str, config: SubnetConfig) -> str:
"""创建子网"""
response = self.ec2.create_subnet(
VpcId=vpc_id,
CidrBlock=config.cidr,
AvailabilityZone=config.availability_zone,
TagSpecifications=[{
'ResourceType': 'subnet',
'Tags': [
{'Key': 'Name', 'Value': config.name},
{'Key': 'Type', 'Value': config.subnet_type}
]
}]
)
return response['Subnet']['SubnetId']
def _create_route_tables(self, vpc_id: str, vpc_name: str, igw_id: str) -> Dict:
"""创建路由表"""
route_tables = {}
# 公有路由表
public_rt_response = self.ec2.create_route_table(
VpcId=vpc_id,
TagSpecifications=[{
'ResourceType': 'route-table',
'Tags': [{'Key': 'Name', 'Value': f'{vpc_name}-public-rt'}]
}]
)
public_rt_id = public_rt_response['RouteTable']['RouteTableId']
# 添加到Internet Gateway的路由
self.ec2.create_route(
RouteTableId=public_rt_id,
DestinationCidrBlock='0.0.0.0/0',
GatewayId=igw_id
)
route_tables['public'] = public_rt_id
# 私有路由表
private_rt_response = self.ec2.create_route_table(
VpcId=vpc_id,
TagSpecifications=[{
'ResourceType': 'route-table',
'Tags': [{'Key': 'Name', 'Value': f'{vpc_name}-private-rt'}]
}]
)
route_tables['private'] = private_rt_response['RouteTable']['RouteTableId']
# 数据库路由表
db_rt_response = self.ec2.create_route_table(
VpcId=vpc_id,
TagSpecifications=[{
'ResourceType': 'route-table',
'Tags': [{'Key': 'Name', 'Value': f'{vpc_name}-database-rt'}]
}]
)
route_tables['database'] = db_rt_response['RouteTable']['RouteTableId']
return route_tables
def _create_nat_gateways(self, subnets: Dict, subnet_configs: List[SubnetConfig]) -> Dict:
"""创建NAT Gateway"""
nat_gateways = {}
# 为每个公有子网创建NAT Gateway
for config in subnet_configs:
if config.subnet_type == 'public':
# 分配弹性IP
eip_response = self.ec2.allocate_address(Domain='vpc')
allocation_id = eip_response['AllocationId']
# 创建NAT Gateway
nat_response = self.ec2.create_nat_gateway(
SubnetId=subnets[config.name],
AllocationId=allocation_id,
TagSpecifications=[{
'ResourceType': 'nat-gateway',
'Tags': [{'Key': 'Name', 'Value': f'{config.name}-nat'}]
}]
)
nat_gateways[config.availability_zone] = nat_response['NatGateway']['NatGatewayId']
return nat_gateways
def create_security_groups(self, vpc_id: str) -> Dict:
"""创建安全组"""
security_groups = {}
# Web层安全组
web_sg = self.ec2.create_security_group(
GroupName='web-tier-sg',
Description='Security group for web tier',
VpcId=vpc_id,
TagSpecifications=[{
'ResourceType': 'security-group',
'Tags': [{'Key': 'Name', 'Value': 'web-tier-sg'}]
}]
)
web_sg_id = web_sg['GroupId']
# Web层入站规则
self.ec2.authorize_security_group_ingress(
GroupId=web_sg_id,
IpPermissions=[
{
'IpProtocol': 'tcp',
'FromPort': 80,
'ToPort': 80,
'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
},
{
'IpProtocol': 'tcp',
'FromPort': 443,
'ToPort': 443,
'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
}
]
)
security_groups['web'] = web_sg_id
# 应用层安全组
app_sg = self.ec2.create_security_group(
GroupName='app-tier-sg',
Description='Security group for application tier',
VpcId=vpc_id,
TagSpecifications=[{
'ResourceType': 'security-group',
'Tags': [{'Key': 'Name', 'Value': 'app-tier-sg'}]
}]
)
app_sg_id = app_sg['GroupId']
# 应用层入站规则(仅允许来自Web层的流量)
self.ec2.authorize_security_group_ingress(
GroupId=app_sg_id,
IpPermissions=[
{
'IpProtocol': 'tcp',
'FromPort': 8080,
'ToPort': 8080,
'UserIdGroupPairs': [{'GroupId': web_sg_id}]
}
]
)
security_groups['app'] = app_sg_id
# 数据库层安全组
db_sg = self.ec2.create_security_group(
GroupName='database-tier-sg',
Description='Security group for database tier',
VpcId=vpc_id,
TagSpecifications=[{
'ResourceType': 'security-group',
'Tags': [{'Key': 'Name', 'Value': 'database-tier-sg'}]
}]
)
db_sg_id = db_sg['GroupId']
# 数据库层入站规则(仅允许来自应用层的流量)
self.ec2.authorize_security_group_ingress(
GroupId=db_sg_id,
IpPermissions=[
{
'IpProtocol': 'tcp',
'FromPort': 3306,
'ToPort': 3306,
'UserIdGroupPairs': [{'GroupId': app_sg_id}]
},
{
'IpProtocol': 'tcp',
'FromPort': 5432,
'ToPort': 5432,
'UserIdGroupPairs': [{'GroupId': app_sg_id}]
}
]
)
security_groups['database'] = db_sg_id
return security_groups
def main():
"""主函数 - 演示网络架构创建"""
# 网络管理器
network_manager = CloudNetworkManager()
# VPC配置
vpc_config = VPCConfig(
name='production-vpc',
cidr='10.0.0.0/16',
subnets=[
SubnetConfig('public-subnet-1a', '10.0.1.0/24', 'us-west-2a', 'public', 'public'),
SubnetConfig('public-subnet-1b', '10.0.2.0/24', 'us-west-2b', 'public', 'public'),
SubnetConfig('private-subnet-1a', '10.0.10.0/24', 'us-west-2a', 'private', 'private'),
SubnetConfig('private-subnet-1b', '10.0.11.0/24', 'us-west-2b', 'private', 'private'),
SubnetConfig('database-subnet-1a', '10.0.20.0/24', 'us-west-2a', 'database', 'database'),
SubnetConfig('database-subnet-1b', '10.0.21.0/24', 'us-west-2b', 'database', 'database')
]
)
# 创建VPC架构
print("创建VPC架构...")
vpc_result = network_manager.create_vpc_architecture(vpc_config)
if vpc_result:
print(f"VPC创建成功: {vpc_result['vpc_id']}")
# 创建安全组
print("创建安全组...")
security_groups = network_manager.create_security_groups(vpc_result['vpc_id'])
print(f"安全组创建完成: {security_groups}")
# 输出架构信息
architecture_info = {
'vpc': vpc_result,
'security_groups': security_groups
}
print("\n=== 网络架构创建完成 ===")
print(json.dumps(architecture_info, indent=2, default=str))
else:
print("VPC架构创建失败")
if __name__ == "__main__":
main()
计算资源架构
计算资源架构需要考虑性能、可扩展性、成本和管理复杂度等因素。
自动扩缩容管理
#!/usr/bin/env python3
"""
计算资源自动扩缩容管理器
提供基于指标的自动扩缩容、负载均衡和实例管理功能
"""
import boto3
import json
import time
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class AutoScalingConfig:
"""自动扩缩容配置"""
name: str
min_size: int
max_size: int
desired_capacity: int
target_group_arn: str
launch_template_id: str
subnet_ids: List[str]
health_check_type: str = 'ELB'
health_check_grace_period: int = 300
@dataclass
class ScalingPolicy:
"""扩缩容策略"""
name: str
policy_type: str
metric_name: str
target_value: float
scale_out_cooldown: int = 300
scale_in_cooldown: int = 300
class ComputeResourceManager:
"""计算资源管理器"""
def __init__(self, region: str = 'us-west-2'):
self.autoscaling = boto3.client('autoscaling', region_name=region)
self.ec2 = boto3.client('ec2', region_name=region)
self.elbv2 = boto3.client('elbv2', region_name=region)
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
self.region = region
def create_launch_template(self, name: str, config: Dict) -> str:
"""创建启动模板"""
try:
response = self.ec2.create_launch_template(
LaunchTemplateName=name,
LaunchTemplateData={
'ImageId': config['ami_id'],
'InstanceType': config['instance_type'],
'KeyName': config.get('key_name'),
'SecurityGroupIds': config['security_group_ids'],
'UserData': config.get('user_data', ''),
'IamInstanceProfile': {
'Name': config.get('iam_instance_profile', 'EC2-Default-Role')
},
'BlockDeviceMappings': [
{
'DeviceName': '/dev/xvda',
'Ebs': {
'VolumeSize': config.get('volume_size', 20),
'VolumeType': config.get('volume_type', 'gp3'),
'DeleteOnTermination': True,
'Encrypted': True
}
}
],
'TagSpecifications': [
{
'ResourceType': 'instance',
'Tags': [
{'Key': 'Name', 'Value': f'{name}-instance'},
{'Key': 'Environment', 'Value': config.get('environment', 'production')},
{'Key': 'Application', 'Value': config.get('application', 'web-app')}
]
}
]
}
)
return response['LaunchTemplate']['LaunchTemplateId']
except Exception as e:
print(f"创建启动模板失败: {str(e)}")
return None
def create_auto_scaling_group(self, config: AutoScalingConfig) -> bool:
"""创建自动扩缩容组"""
try:
response = self.autoscaling.create_auto_scaling_group(
AutoScalingGroupName=config.name,
LaunchTemplate={
'LaunchTemplateId': config.launch_template_id,
'Version': '$Latest'
},
MinSize=config.min_size,
MaxSize=config.max_size,
DesiredCapacity=config.desired_capacity,
VPCZoneIdentifier=','.join(config.subnet_ids),
TargetGroupARNs=[config.target_group_arn],
HealthCheckType=config.health_check_type,
HealthCheckGracePeriod=config.health_check_grace_period,
Tags=[
{
'Key': 'Name',
'Value': config.name,
'PropagateAtLaunch': True,
'ResourceId': config.name,
'ResourceType': 'auto-scaling-group'
}
]
)
print(f"自动扩缩容组 {config.name} 创建成功")
return True
except Exception as e:
print(f"创建自动扩缩容组失败: {str(e)}")
return False
def create_scaling_policies(self, asg_name: str, policies: List[ScalingPolicy]) -> Dict:
"""创建扩缩容策略"""
policy_arns = {}
for policy in policies:
try:
# 创建目标跟踪扩缩容策略
response = self.autoscaling.put_scaling_policy(
AutoScalingGroupName=asg_name,
PolicyName=policy.name,
PolicyType=policy.policy_type,
TargetTrackingConfiguration={
'PredefinedMetricSpecification': {
'PredefinedMetricType': policy.metric_name
},
'TargetValue': policy.target_value,
'ScaleOutCooldown': policy.scale_out_cooldown,
'ScaleInCooldown': policy.scale_in_cooldown
}
)
policy_arns[policy.name] = response['PolicyARN']
print(f"扩缩容策略 {policy.name} 创建成功")
except Exception as e:
print(f"创建扩缩容策略 {policy.name} 失败: {str(e)}")
return policy_arns
def create_load_balancer(self, name: str, subnet_ids: List[str], security_group_ids: List[str]) -> Dict:
"""创建应用负载均衡器"""
try:
# 创建负载均衡器
lb_response = self.elbv2.create_load_balancer(
Name=name,
Subnets=subnet_ids,
SecurityGroups=security_group_ids,
Scheme='internet-facing',
Type='application',
IpAddressType='ipv4',
Tags=[
{'Key': 'Name', 'Value': name},
{'Key': 'Environment', 'Value': 'production'}
]
)
lb_arn = lb_response['LoadBalancers'][0]['LoadBalancerArn']
lb_dns = lb_response['LoadBalancers'][0]['DNSName']
# 创建目标组
tg_response = self.elbv2.create_target_group(
Name=f'{name}-tg',
Protocol='HTTP',
Port=80,
VpcId=self._get_vpc_id_from_subnet(subnet_ids[0]),
HealthCheckProtocol='HTTP',
HealthCheckPath='/health',
HealthCheckIntervalSeconds=30,
HealthCheckTimeoutSeconds=5,
HealthyThresholdCount=2,
UnhealthyThresholdCount=3,
Tags=[
{'Key': 'Name', 'Value': f'{name}-tg'}
]
)
tg_arn = tg_response['TargetGroups'][0]['TargetGroupArn']
# 创建监听器
listener_response = self.elbv2.create_listener(
LoadBalancerArn=lb_arn,
Protocol='HTTP',
Port=80,
DefaultActions=[
{
'Type': 'forward',
'TargetGroupArn': tg_arn
}
]
)
return {
'load_balancer_arn': lb_arn,
'load_balancer_dns': lb_dns,
'target_group_arn': tg_arn,
'listener_arn': listener_response['Listeners'][0]['ListenerArn']
}
except Exception as e:
print(f"创建负载均衡器失败: {str(e)}")
return None
def _get_vpc_id_from_subnet(self, subnet_id: str) -> str:
"""从子网ID获取VPC ID"""
response = self.ec2.describe_subnets(SubnetIds=[subnet_id])
return response['Subnets'][0]['VpcId']
def monitor_scaling_activities(self, asg_name: str) -> List[Dict]:
"""监控扩缩容活动"""
try:
response = self.autoscaling.describe_scaling_activities(
AutoScalingGroupName=asg_name,
MaxRecords=10
)
activities = []
for activity in response['Activities']:
activities.append({
'activity_id': activity['ActivityId'],
'description': activity['Description'],
'cause': activity['Cause'],
'start_time': activity['StartTime'],
'end_time': activity.get('EndTime'),
'status_code': activity['StatusCode'],
'status_message': activity.get('StatusMessage', '')
})
return activities
except Exception as e:
print(f"获取扩缩容活动失败: {str(e)}")
return []
def get_instance_metrics(self, instance_ids: List[str], hours: int = 1) -> Dict:
"""获取实例指标"""
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=hours)
metrics = {}
for instance_id in instance_ids:
try:
# CPU利用率
cpu_response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
Dimensions=[
{'Name': 'InstanceId', 'Value': instance_id}
],
StartTime=start_time,
EndTime=end_time,
Period=300,
Statistics=['Average', 'Maximum']
)
# 网络输入
network_in_response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='NetworkIn',
Dimensions=[
{'Name': 'InstanceId', 'Value': instance_id}
],
StartTime=start_time,
EndTime=end_time,
Period=300,
Statistics=['Sum']
)
metrics[instance_id] = {
'cpu_utilization': cpu_response['Datapoints'],
'network_in': network_in_response['Datapoints']
}
except Exception as e:
print(f"获取实例 {instance_id} 指标失败: {str(e)}")
metrics[instance_id] = {}
return metrics
def main():
"""主函数 - 演示计算资源架构创建"""
# 计算资源管理器
compute_manager = ComputeResourceManager()
# 启动模板配置
launch_template_config = {
'ami_id': 'ami-0c02fb55956c7d316', # Amazon Linux 2
'instance_type': 't3.medium',
'security_group_ids': ['sg-12345678'], # 替换为实际的安全组ID
'key_name': 'my-key-pair',
'environment': 'production',
'application': 'web-app',
'volume_size': 20,
'user_data': '''#!/bin/bash
yum update -y
yum install -y httpd
systemctl start httpd
systemctl enable httpd
echo "<h1>Hello from Auto Scaling Instance</h1>" > /var/www/html/index.html
'''
}
# 创建启动模板
print("创建启动模板...")
launch_template_id = compute_manager.create_launch_template(
'web-app-template',
launch_template_config
)
if launch_template_id:
print(f"启动模板创建成功: {launch_template_id}")
# 创建负载均衡器
print("创建负载均衡器...")
lb_result = compute_manager.create_load_balancer(
'web-app-alb',
['subnet-12345678', 'subnet-87654321'], # 替换为实际的子网ID
['sg-12345678'] # 替换为实际的安全组ID
)
if lb_result:
print(f"负载均衡器创建成功: {lb_result['load_balancer_dns']}")
# 自动扩缩容组配置
asg_config = AutoScalingConfig(
name='web-app-asg',
min_size=2,
max_size=10,
desired_capacity=3,
target_group_arn=lb_result['target_group_arn'],
launch_template_id=launch_template_id,
subnet_ids=['subnet-12345678', 'subnet-87654321'] # 替换为实际的子网ID
)
# 创建自动扩缩容组
print("创建自动扩缩容组...")
if compute_manager.create_auto_scaling_group(asg_config):
# 扩缩容策略
scaling_policies = [
ScalingPolicy(
name='cpu-target-tracking',
policy_type='TargetTrackingScaling',
metric_name='ASGAverageCPUUtilization',
target_value=70.0
),
ScalingPolicy(
name='request-count-target-tracking',
policy_type='TargetTrackingScaling',
metric_name='ALBRequestCountPerTarget',
target_value=1000.0
)
]
# 创建扩缩容策略
print("创建扩缩容策略...")
policy_arns = compute_manager.create_scaling_policies(
asg_config.name,
scaling_policies
)
print("\n=== 计算资源架构创建完成 ===")
print(f"负载均衡器DNS: {lb_result['load_balancer_dns']}")
print(f"自动扩缩容组: {asg_config.name}")
print(f"扩缩容策略: {list(policy_arns.keys())}")
# 监控扩缩容活动
print("\n监控扩缩容活动...")
time.sleep(10) # 等待一段时间
activities = compute_manager.monitor_scaling_activities(asg_config.name)
for activity in activities[:3]: # 显示最近3个活动
print(f"活动: {activity['description']}")
print(f"状态: {activity['status_code']}")
print(f"时间: {activity['start_time']}")
print("---")
if __name__ == "__main__":
main()
存储架构设计
存储架构需要考虑性能、持久性、可用性和成本等因素,提供多层次的存储解决方案。
多层存储管理
#!/usr/bin/env python3
"""
云存储架构管理器
提供多层存储、数据生命周期管理和备份策略的自动化管理
"""
import boto3
import json
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class StorageClass:
"""存储类别配置"""
name: str
storage_class: str
transition_days: int
description: str
@dataclass
class LifecycleRule:
"""生命周期规则"""
rule_id: str
prefix: str
status: str
transitions: List[StorageClass]
expiration_days: Optional[int] = None
class CloudStorageManager:
"""云存储架构管理器"""
def __init__(self, region: str = 'us-west-2'):
self.s3 = boto3.client('s3', region_name=region)
self.efs = boto3.client('efs', region_name=region)
self.fsx = boto3.client('fsx', region_name=region)
self.region = region
def create_s3_storage_architecture(self, bucket_configs: List[Dict]) -> Dict:
"""创建S3存储架构"""
created_buckets = {}
for config in bucket_configs:
try:
bucket_name = config['name']
# 创建S3存储桶
if self.region == 'us-east-1':
self.s3.create_bucket(Bucket=bucket_name)
else:
self.s3.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={'LocationConstraint': self.region}
)
# 配置版本控制
if config.get('versioning', False):
self.s3.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={'Status': 'Enabled'}
)
# 配置服务器端加密
if config.get('encryption', True):
self.s3.put_bucket_encryption(
Bucket=bucket_name,
ServerSideEncryptionConfiguration={
'Rules': [
{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'AES256'
}
}
]
}
)
# 配置生命周期策略
if 'lifecycle_rules' in config:
self._configure_lifecycle_policy(bucket_name, config['lifecycle_rules'])
# 配置跨区域复制
if 'replication' in config:
self._configure_replication(bucket_name, config['replication'])
# 配置访问日志
if config.get('access_logging', False):
self._configure_access_logging(bucket_name, config.get('log_bucket'))
# 配置标签
if 'tags' in config:
self.s3.put_bucket_tagging(
Bucket=bucket_name,
Tagging={'TagSet': config['tags']}
)
created_buckets[bucket_name] = {
'region': self.region,
'versioning': config.get('versioning', False),
'encryption': config.get('encryption', True),
'lifecycle_configured': 'lifecycle_rules' in config
}
print(f"S3存储桶 {bucket_name} 创建成功")
except Exception as e:
print(f"创建S3存储桶 {config['name']} 失败: {str(e)}")
return created_buckets
def _configure_lifecycle_policy(self, bucket_name: str, lifecycle_rules: List[LifecycleRule]):
"""配置生命周期策略"""
rules = []
for rule in lifecycle_rules:
rule_config = {
'ID': rule.rule_id,
'Status': rule.status,
'Filter': {'Prefix': rule.prefix},
'Transitions': []
}
# 添加转换规则
for transition in rule.transitions:
rule_config['Transitions'].append({
'Days': transition.transition_days,
'StorageClass': transition.storage_class
})
# 添加过期规则
if rule.expiration_days:
rule_config['Expiration'] = {'Days': rule.expiration_days}
rules.append(rule_config)
try:
self.s3.put_bucket_lifecycle_configuration(
Bucket=bucket_name,
LifecycleConfiguration={'Rules': rules}
)
print(f"生命周期策略配置成功: {bucket_name}")
except Exception as e:
print(f"配置生命周期策略失败: {str(e)}")
def create_efs_file_system(self, name: str, config: Dict) -> Dict:
"""创建EFS文件系统"""
try:
# 创建EFS文件系统
response = self.efs.create_file_system(
CreationToken=f"{name}-{int(datetime.now().timestamp())}",
PerformanceMode=config.get('performance_mode', 'generalPurpose'),
ThroughputMode=config.get('throughput_mode', 'bursting'),
Encrypted=config.get('encrypted', True),
Tags=[
{'Key': 'Name', 'Value': name},
{'Key': 'Environment', 'Value': config.get('environment', 'production')}
]
)
file_system_id = response['FileSystemId']
# 创建挂载目标
mount_targets = []
for subnet_config in config.get('mount_targets', []):
mt_response = self.efs.create_mount_target(
FileSystemId=file_system_id,
SubnetId=subnet_config['subnet_id'],
SecurityGroups=subnet_config['security_groups']
)
mount_targets.append(mt_response['MountTargetId'])
# 配置生命周期策略
if 'lifecycle_policy' in config:
self.efs.put_lifecycle_configuration(
FileSystemId=file_system_id,
LifecyclePolicies=[
{
'TransitionToIA': config['lifecycle_policy']['transition_to_ia']
}
]
)
return {
'file_system_id': file_system_id,
'mount_targets': mount_targets,
'dns_name': f"{file_system_id}.efs.{self.region}.amazonaws.com"
}
except Exception as e:
print(f"创建EFS文件系统失败: {str(e)}")
return None
def create_fsx_file_system(self, name: str, config: Dict) -> Dict:
"""创建FSx文件系统"""
try:
# 根据文件系统类型创建FSx
if config['file_system_type'] == 'LUSTRE':
response = self.fsx.create_file_system(
FileSystemType='LUSTRE',
StorageCapacity=config['storage_capacity'],
SubnetIds=config['subnet_ids'],
SecurityGroupIds=config['security_group_ids'],
Tags=[
{'Key': 'Name', 'Value': name}
],
LustreConfiguration={
'DeploymentType': config.get('deployment_type', 'SCRATCH_2'),
'PerUnitStorageThroughput': config.get('throughput', 50)
}
)
elif config['file_system_type'] == 'WINDOWS':
response = self.fsx.create_file_system(
FileSystemType='WINDOWS',
StorageCapacity=config['storage_capacity'],
SubnetIds=config['subnet_ids'],
SecurityGroupIds=config['security_group_ids'],
Tags=[
{'Key': 'Name', 'Value': name}
],
WindowsConfiguration={
'ActiveDirectoryId': config.get('active_directory_id'),
'ThroughputCapacity': config.get('throughput_capacity', 8),
'DeploymentType': config.get('deployment_type', 'SINGLE_AZ_1')
}
)
return {
'file_system_id': response['FileSystem']['FileSystemId'],
'dns_name': response['FileSystem']['DNSName'],
'lifecycle_status': response['FileSystem']['Lifecycle']
}
except Exception as e:
print(f"创建FSx文件系统失败: {str(e)}")
return None
def create_backup_strategy(self, resources: Dict) -> Dict:
"""创建备份策略"""
backup_plans = {}
try:
backup_client = boto3.client('backup', region_name=self.region)
# 创建备份计划
backup_plan = {
'BackupPlanName': 'comprehensive-backup-plan',
'Rules': [
{
'RuleName': 'daily-backup',
'TargetBackupVault': 'default',
'ScheduleExpression': 'cron(0 2 * * ? *)', # 每天凌晨2点
'StartWindowMinutes': 60,
'CompletionWindowMinutes': 120,
'Lifecycle': {
'MoveToColdStorageAfterDays': 30,
'DeleteAfterDays': 365
},
'RecoveryPointTags': {
'BackupType': 'Daily',
'Environment': 'Production'
}
},
{
'RuleName': 'weekly-backup',
'TargetBackupVault': 'default',
'ScheduleExpression': 'cron(0 3 ? * SUN *)', # 每周日凌晨3点
'StartWindowMinutes': 60,
'CompletionWindowMinutes': 180,
'Lifecycle': {
'MoveToColdStorageAfterDays': 7,
'DeleteAfterDays': 2555 # 7年
},
'RecoveryPointTags': {
'BackupType': 'Weekly',
'Environment': 'Production'
}
}
]
}
plan_response = backup_client.create_backup_plan(BackupPlan=backup_plan)
backup_plan_id = plan_response['BackupPlanId']
# 创建备份选择
backup_selection = {
'SelectionName': 'production-resources',
'IamRoleArn': 'arn:aws:iam::123456789012:role/aws-backup-service-role', # 替换为实际的IAM角色
'Resources': [],
'Conditions': {
'StringEquals': {
'aws:ResourceTag/Environment': ['Production']
}
}
}
# 添加资源到备份选择
for resource_type, resource_list in resources.items():
if resource_type == 'ec2_instances':
for instance_id in resource_list:
backup_selection['Resources'].append(f'arn:aws:ec2:{self.region}:*:instance/{instance_id}')
elif resource_type == 'ebs_volumes':
for volume_id in resource_list:
backup_selection['Resources'].append(f'arn:aws:ec2:{self.region}:*:volume/{volume_id}')
elif resource_type == 'efs_file_systems':
for fs_id in resource_list:
backup_selection['Resources'].append(f'arn:aws:elasticfilesystem:{self.region}:*:file-system/{fs_id}')
selection_response = backup_client.create_backup_selection(
BackupPlanId=backup_plan_id,
BackupSelection=backup_selection
)
backup_plans['comprehensive'] = {
'backup_plan_id': backup_plan_id,
'backup_selection_id': selection_response['SelectionId']
}
print("备份策略创建成功")
except Exception as e:
print(f"创建备份策略失败: {str(e)}")
return backup_plans
def monitor_storage_costs(self, bucket_names: List[str]) -> Dict:
"""监控存储成本"""
cost_data = {}
try:
cloudwatch = boto3.client('cloudwatch', region_name=self.region)
for bucket_name in bucket_names:
# 获取存储桶大小指标
response = cloudwatch.get_metric_statistics(
Namespace='AWS/S3',
MetricName='BucketSizeBytes',
Dimensions=[
{'Name': 'BucketName', 'Value': bucket_name},
{'Name': 'StorageType', 'Value': 'StandardStorage'}
],
StartTime=datetime.utcnow() - timedelta(days=1),
EndTime=datetime.utcnow(),
Period=86400, # 1天
Statistics=['Average']
)
if response['Datapoints']:
size_bytes = response['Datapoints'][-1]['Average']
size_gb = size_bytes / (1024 ** 3)
# 估算成本(基于标准存储定价)
estimated_monthly_cost = size_gb * 0.023 # $0.023 per GB/month for Standard storage
cost_data[bucket_name] = {
'size_gb': round(size_gb, 2),
'estimated_monthly_cost': round(estimated_monthly_cost, 2)
}
except Exception as e:
print(f"获取存储成本数据失败: {str(e)}")
return cost_data
def main():
"""主函数 - 演示存储架构创建"""
# 存储管理器
storage_manager = CloudStorageManager()
# S3存储桶配置
s3_bucket_configs = [
{
'name': 'production-app-data',
'versioning': True,
'encryption': True,
'access_logging': True,
'lifecycle_rules': [
LifecycleRule(
rule_id='data-lifecycle',
prefix='data/',
status='Enabled',
transitions=[
StorageClass('IA', 'STANDARD_IA', 30, 'Infrequent Access'),
StorageClass('Glacier', 'GLACIER', 90, 'Glacier'),
StorageClass('DeepArchive', 'DEEP_ARCHIVE', 365, 'Deep Archive')
],
expiration_days=2555 # 7年
)
],
'tags': [
{'Key': 'Environment', 'Value': 'Production'},
{'Key': 'Application', 'Value': 'WebApp'},
{'Key': 'DataClassification', 'Value': 'Sensitive'}
]
},
{
'name': 'production-backup-data',
'versioning': True,
'encryption': True,
'lifecycle_rules': [
LifecycleRule(
rule_id='backup-lifecycle',
prefix='backups/',
status='Enabled',
transitions=[
StorageClass('Glacier', 'GLACIER', 1, 'Immediate Glacier'),
StorageClass('DeepArchive', 'DEEP_ARCHIVE', 30, 'Deep Archive')
]
)
],
'tags': [
{'Key': 'Environment', 'Value': 'Production'},
{'Key': 'Purpose', 'Value': 'Backup'}
]
}
]
# 创建S3存储架构
print("创建S3存储架构...")
s3_buckets = storage_manager.create_s3_storage_architecture(s3_bucket_configs)
# EFS文件系统配置
efs_config = {
'performance_mode': 'generalPurpose',
'throughput_mode': 'provisioned',
'provisioned_throughput': 100,
'encrypted': True,
'environment': 'production',
'mount_targets': [
{
'subnet_id': 'subnet-12345678', # 替换为实际的子网ID
'security_groups': ['sg-12345678'] # 替换为实际的安全组ID
}
],
'lifecycle_policy': {
'transition_to_ia': 'AFTER_30_DAYS'
}
}
# 创建EFS文件系统
print("创建EFS文件系统...")
efs_result = storage_manager.create_efs_file_system('production-efs', efs_config)
# FSx文件系统配置
fsx_config = {
'file_system_type': 'LUSTRE',
'storage_capacity': 1200, # GB
'subnet_ids': ['subnet-12345678'], # 替换为实际的子网ID
'security_group_ids': ['sg-12345678'], # 替换为实际的安全组ID
'deployment_type': 'PERSISTENT_1',
'throughput': 50
}
# 创建FSx文件系统
print("创建FSx文件系统...")
fsx_result = storage_manager.create_fsx_file_system('production-fsx', fsx_config)
# 创建备份策略
backup_resources = {
'ec2_instances': ['i-1234567890abcdef0'], # 替换为实际的实例ID
'ebs_volumes': ['vol-1234567890abcdef0'], # 替换为实际的卷ID
'efs_file_systems': [efs_result['file_system_id']] if efs_result else []
}
print("创建备份策略...")
backup_plans = storage_manager.create_backup_strategy(backup_resources)
# 监控存储成本
print("监控存储成本...")
cost_data = storage_manager.monitor_storage_costs(list(s3_buckets.keys()))
print("\n=== 存储架构创建完成 ===")
print(f"S3存储桶: {list(s3_buckets.keys())}")
if efs_result:
print(f"EFS文件系统: {efs_result['file_system_id']}")
if fsx_result:
print(f"FSx文件系统: {fsx_result['file_system_id']}")
print(f"备份计划: {list(backup_plans.keys())}")
if cost_data:
print("\n存储成本分析:")
for bucket, data in cost_data.items():
print(f"{bucket}: {data['size_gb']} GB, 预估月成本: ${data['estimated_monthly_cost']}")
if __name__ == "__main__":
main()
安全架构实施
云安全架构需要实现多层防护,包括网络安全、身份认证、数据保护和合规性管理。
零信任安全模型
#!/bin/bash
# 云安全架构部署脚本
# 实现零信任安全模型的自动化部署
set -euo pipefail
# 配置变量
REGION="us-west-2"
ENVIRONMENT="production"
PROJECT_NAME="secure-cloud-platform"
# 日志函数
log() {
echo "[$(date +'%Y-%m-%d %H:%M:%S')] $1"
}
# 错误处理
error_exit() {
log "错误: $1"
exit 1
}
# 创建IAM角色和策略
create_iam_security_roles() {
log "创建IAM安全角色和策略..."
# 创建零信任访问策略
cat > zero-trust-policy.json << 'EOF'
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": "*",
"Resource": "*",
"Condition": {
"Bool": {
"aws:SecureTransport": "false"
}
}
},
{
"Effect": "Deny",
"Action": "*",
"Resource": "*",
"Condition": {
"IpAddressIfExists": {
"aws:SourceIp": [
"0.0.0.0/0"
]
},
"StringNotEquals": {
"aws:RequestedRegion": [
"us-west-2",
"us-east-1"
]
}
}
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*"
}
]
}
EOF
# 创建策略
aws iam create-policy \
--policy-name ZeroTrustSecurityPolicy \
--policy-document file://zero-trust-policy.json \
--description "Zero Trust Security Policy" || true
# 创建安全审计角色
cat > security-audit-trust-policy.json << 'EOF'
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
aws iam create-role \
--role-name SecurityAuditRole \
--assume-role-policy-document file://security-audit-trust-policy.json \
--description "Role for security audit functions" || true
# 附加策略到角色
aws iam attach-role-policy \
--role-name SecurityAuditRole \
--policy-arn arn:aws:iam::aws:policy/SecurityAudit || true
log "IAM安全角色创建完成"
}
# 配置VPC安全组
configure_security_groups() {
log "配置VPC安全组..."
# 获取默认VPC ID
VPC_ID=$(aws ec2 describe-vpcs \
--filters "Name=is-default,Values=true" \
--query 'Vpcs[0].VpcId' \
--output text)
if [ "$VPC_ID" = "None" ]; then
error_exit "未找到默认VPC"
fi
# 创建Web层安全组
WEB_SG_ID=$(aws ec2 create-security-group \
--group-name "${PROJECT_NAME}-web-sg" \
--description "Web tier security group with zero trust principles" \
--vpc-id "$VPC_ID" \
--query 'GroupId' \
--output text 2>/dev/null || echo "exists")
if [ "$WEB_SG_ID" != "exists" ]; then
# 添加HTTPS入站规则
aws ec2 authorize-security-group-ingress \
--group-id "$WEB_SG_ID" \
--protocol tcp \
--port 443 \
--cidr 0.0.0.0/0 || true
# 添加HTTP重定向规则
aws ec2 authorize-security-group-ingress \
--group-id "$WEB_SG_ID" \
--protocol tcp \
--port 80 \
--cidr 0.0.0.0/0 || true
log "Web层安全组创建完成: $WEB_SG_ID"
fi
# 创建应用层安全组
APP_SG_ID=$(aws ec2 create-security-group \
--group-name "${PROJECT_NAME}-app-sg" \
--description "Application tier security group" \
--vpc-id "$VPC_ID" \
--query 'GroupId' \
--output text 2>/dev/null || echo "exists")
if [ "$APP_SG_ID" != "exists" ]; then
# 仅允许来自Web层的流量
aws ec2 authorize-security-group-ingress \
--group-id "$APP_SG_ID" \
--protocol tcp \
--port 8080 \
--source-group "$WEB_SG_ID" || true
log "应用层安全组创建完成: $APP_SG_ID"
fi
# 创建数据库层安全组
DB_SG_ID=$(aws ec2 create-security-group \
--group-name "${PROJECT_NAME}-db-sg" \
--description "Database tier security group" \
--vpc-id "$VPC_ID" \
--query 'GroupId' \
--output text 2>/dev/null || echo "exists")
if [ "$DB_SG_ID" != "exists" ]; then
# 仅允许来自应用层的数据库连接
aws ec2 authorize-security-group-ingress \
--group-id "$DB_SG_ID" \
--protocol tcp \
--port 3306 \
--source-group "$APP_SG_ID" || true
aws ec2 authorize-security-group-ingress \
--group-id "$DB_SG_ID" \
--protocol tcp \
--port 5432 \
--source-group "$APP_SG_ID" || true
log "数据库层安全组创建完成: $DB_SG_ID"
fi
}
# 配置WAF和Shield
configure_waf_protection() {
log "配置WAF和DDoS防护..."
# 创建WAF Web ACL
cat > waf-rules.json << 'EOF'
{
"Name": "ProductionWebACL",
"Scope": "CLOUDFRONT",
"DefaultAction": {
"Allow": {}
},
"Rules": [
{
"Name": "AWSManagedRulesCommonRuleSet",
"Priority": 1,
"OverrideAction": {
"None": {}
},
"Statement": {
"ManagedRuleGroupStatement": {
"VendorName": "AWS",
"Name": "AWSManagedRulesCommonRuleSet"
}
},
"VisibilityConfig": {
"SampledRequestsEnabled": true,
"CloudWatchMetricsEnabled": true,
"MetricName": "CommonRuleSetMetric"
}
},
{
"Name": "AWSManagedRulesKnownBadInputsRuleSet",
"Priority": 2,
"OverrideAction": {
"None": {}
},
"Statement": {
"ManagedRuleGroupStatement": {
"VendorName": "AWS",
"Name": "AWSManagedRulesKnownBadInputsRuleSet"
}
},
"VisibilityConfig": {
"SampledRequestsEnabled": true,
"CloudWatchMetricsEnabled": true,
"MetricName": "KnownBadInputsMetric"
}
}
],
"VisibilityConfig": {
"SampledRequestsEnabled": true,
"CloudWatchMetricsEnabled": true,
"MetricName": "ProductionWebACLMetric"
}
}
EOF
# 创建WAF Web ACL
aws wafv2 create-web-acl \
--scope CLOUDFRONT \
--cli-input-json file://waf-rules.json \
--region us-east-1 || true
log "WAF配置完成"
}
# 配置CloudTrail审计
configure_cloudtrail() {
log "配置CloudTrail审计..."
# 创建CloudTrail
aws cloudtrail create-trail \
--name "${PROJECT_NAME}-audit-trail" \
--s3-bucket-name "${PROJECT_NAME}-audit-logs" \
--include-global-service-events \
--is-multi-region-trail \
--enable-log-file-validation || true
# 启动CloudTrail
aws cloudtrail start-logging \
--name "${PROJECT_NAME}-audit-trail" || true
log "CloudTrail审计配置完成"
}
# 主函数
main() {
log "开始部署云安全架构..."
create_iam_security_roles
configure_security_groups
configure_waf_protection
configure_cloudtrail
log "云安全架构部署完成"
}
# 执行主函数
main "$@"
监控与可观测性
完善的监控体系是确保云基础设施稳定运行的关键。
综合监控系统
#!/usr/bin/env python3
"""
云基础设施监控系统
提供全方位的监控、告警和可观测性功能
"""
import boto3
import json
import time
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class MetricConfig:
"""监控指标配置"""
name: str
namespace: str
metric_name: str
dimensions: List[Dict]
statistic: str
threshold: float
comparison_operator: str
evaluation_periods: int = 2
period: int = 300
@dataclass
class AlarmConfig:
"""告警配置"""
alarm_name: str
alarm_description: str
metric_config: MetricConfig
alarm_actions: List[str]
ok_actions: List[str] = None
treat_missing_data: str = 'breaching'
class CloudMonitoringSystem:
"""云监控系统"""
def __init__(self, region: str = 'us-west-2'):
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
self.sns = boto3.client('sns', region_name=region)
self.logs = boto3.client('logs', region_name=region)
self.region = region
def create_monitoring_dashboard(self, dashboard_name: str, widgets: List[Dict]) -> bool:
"""创建监控仪表板"""
try:
dashboard_body = {
"widgets": widgets
}
self.cloudwatch.put_dashboard(
DashboardName=dashboard_name,
DashboardBody=json.dumps(dashboard_body)
)
print(f"监控仪表板 {dashboard_name} 创建成功")
return True
except Exception as e:
print(f"创建监控仪表板失败: {str(e)}")
return False
def create_comprehensive_alarms(self, resource_configs: Dict) -> Dict:
"""创建综合告警"""
created_alarms = {}
# 创建SNS主题用于告警通知
sns_topic_arn = self._create_sns_topic('infrastructure-alerts')
# EC2实例告警
if 'ec2_instances' in resource_configs:
for instance_id in resource_configs['ec2_instances']:
alarms = self._create_ec2_alarms(instance_id, sns_topic_arn)
created_alarms[f'ec2-{instance_id}'] = alarms
# RDS数据库告警
if 'rds_instances' in resource_configs:
for db_instance_id in resource_configs['rds_instances']:
alarms = self._create_rds_alarms(db_instance_id, sns_topic_arn)
created_alarms[f'rds-{db_instance_id}'] = alarms
# 负载均衡器告警
if 'load_balancers' in resource_configs:
for lb_arn in resource_configs['load_balancers']:
alarms = self._create_alb_alarms(lb_arn, sns_topic_arn)
created_alarms[f'alb-{lb_arn.split("/")[-1]}'] = alarms
return created_alarms
def _create_sns_topic(self, topic_name: str) -> str:
"""创建SNS主题"""
try:
response = self.sns.create_topic(Name=topic_name)
topic_arn = response['TopicArn']
# 订阅邮件通知(需要确认)
self.sns.subscribe(
TopicArn=topic_arn,
Protocol='email',
Endpoint='admin@example.com' # 替换为实际邮箱
)
return topic_arn
except Exception as e:
print(f"创建SNS主题失败: {str(e)}")
return None
def _create_ec2_alarms(self, instance_id: str, sns_topic_arn: str) -> List[str]:
"""创建EC2实例告警"""
alarm_names = []
# CPU使用率告警
cpu_alarm_config = AlarmConfig(
alarm_name=f'{instance_id}-high-cpu',
alarm_description=f'High CPU utilization for instance {instance_id}',
metric_config=MetricConfig(
name='CPUUtilization',
namespace='AWS/EC2',
metric_name='CPUUtilization',
dimensions=[{'Name': 'InstanceId', 'Value': instance_id}],
statistic='Average',
threshold=80.0,
comparison_operator='GreaterThanThreshold'
),
alarm_actions=[sns_topic_arn]
)
if self._create_alarm(cpu_alarm_config):
alarm_names.append(cpu_alarm_config.alarm_name)
# 状态检查告警
status_alarm_config = AlarmConfig(
alarm_name=f'{instance_id}-status-check',
alarm_description=f'Instance status check failed for {instance_id}',
metric_config=MetricConfig(
name='StatusCheckFailed',
namespace='AWS/EC2',
metric_name='StatusCheckFailed',
dimensions=[{'Name': 'InstanceId', 'Value': instance_id}],
statistic='Maximum',
threshold=0.0,
comparison_operator='GreaterThanThreshold'
),
alarm_actions=[sns_topic_arn]
)
if self._create_alarm(status_alarm_config):
alarm_names.append(status_alarm_config.alarm_name)
return alarm_names
def _create_alarm(self, config: AlarmConfig) -> bool:
"""创建CloudWatch告警"""
try:
self.cloudwatch.put_metric_alarm(
AlarmName=config.alarm_name,
AlarmDescription=config.alarm_description,
ActionsEnabled=True,
AlarmActions=config.alarm_actions,
OKActions=config.ok_actions or [],
MetricName=config.metric_config.metric_name,
Namespace=config.metric_config.namespace,
Statistic=config.metric_config.statistic,
Dimensions=config.metric_config.dimensions,
Period=config.metric_config.period,
EvaluationPeriods=config.metric_config.evaluation_periods,
Threshold=config.metric_config.threshold,
ComparisonOperator=config.metric_config.comparison_operator,
TreatMissingData=config.treat_missing_data
)
print(f"告警 {config.alarm_name} 创建成功")
return True
except Exception as e:
print(f"创建告警 {config.alarm_name} 失败: {str(e)}")
return False
def create_log_groups(self, log_group_configs: List[Dict]) -> Dict:
"""创建日志组"""
created_log_groups = {}
for config in log_group_configs:
try:
log_group_name = config['name']
self.logs.create_log_group(
logGroupName=log_group_name,
tags=config.get('tags', {})
)
# 设置保留期
if 'retention_days' in config:
self.logs.put_retention_policy(
logGroupName=log_group_name,
retentionInDays=config['retention_days']
)
created_log_groups[log_group_name] = {
'retention_days': config.get('retention_days'),
'tags': config.get('tags', {})
}
print(f"日志组 {log_group_name} 创建成功")
except Exception as e:
print(f"创建日志组 {config['name']} 失败: {str(e)}")
return created_log_groups
def setup_custom_metrics(self) -> bool:
"""设置自定义指标"""
try:
# 发送自定义指标示例
self.cloudwatch.put_metric_data(
Namespace='CustomApp/Performance',
MetricData=[
{
'MetricName': 'ApplicationResponseTime',
'Value': 150.0,
'Unit': 'Milliseconds',
'Timestamp': datetime.utcnow(),
'Dimensions': [
{'Name': 'Environment', 'Value': 'Production'},
{'Name': 'Service', 'Value': 'WebAPI'}
]
},
{
'MetricName': 'ActiveUserSessions',
'Value': 1250,
'Unit': 'Count',
'Timestamp': datetime.utcnow(),
'Dimensions': [
{'Name': 'Environment', 'Value': 'Production'},
{'Name': 'Application', 'Value': 'WebApp'}
]
}
]
)
print("自定义指标设置成功")
return True
except Exception as e:
print(f"设置自定义指标失败: {str(e)}")
return False
def main():
"""主函数 - 演示监控系统创建"""
# 监控系统
monitoring = CloudMonitoringSystem()
# 创建仪表板组件
dashboard_widgets = [
{
"type": "metric",
"x": 0, "y": 0,
"width": 12, "height": 6,
"properties": {
"metrics": [
["AWS/EC2", "CPUUtilization", "InstanceId", "i-1234567890abcdef0"],
["AWS/ApplicationELB", "TargetResponseTime", "LoadBalancer", "app/my-load-balancer/50dc6c495c0c9188"]
],
"period": 300,
"stat": "Average",
"region": "us-west-2",
"title": "EC2 and ALB Performance"
}
},
{
"type": "log",
"x": 0, "y": 6,
"width": 24, "height": 6,
"properties": {
"query": "SOURCE '/aws/lambda/my-function' | fields @timestamp, @message | sort @timestamp desc | limit 20",
"region": "us-west-2",
"title": "Recent Application Logs"
}
}
]
# 创建监控仪表板
print("创建监控仪表板...")
monitoring.create_monitoring_dashboard('infrastructure-overview', dashboard_widgets)
# 资源配置
resource_configs = {
'ec2_instances': ['i-1234567890abcdef0'], # 替换为实际实例ID
'rds_instances': ['mydb-instance'], # 替换为实际RDS实例ID
'load_balancers': ['arn:aws:elasticloadbalancing:us-west-2:123456789012:loadbalancer/app/my-load-balancer/50dc6c495c0c9188']
}
# 创建告警
print("创建综合告警...")
alarms = monitoring.create_comprehensive_alarms(resource_configs)
# 日志组配置
log_group_configs = [
{
'name': '/aws/ec2/application',
'retention_days': 30,
'tags': {'Environment': 'Production', 'Service': 'Application'}
},
{
'name': '/aws/lambda/security-audit',
'retention_days': 90,
'tags': {'Environment': 'Production', 'Service': 'Security'}
}
]
# 创建日志组
print("创建日志组...")
log_groups = monitoring.create_log_groups(log_group_configs)
# 设置自定义指标
print("设置自定义指标...")
monitoring.setup_custom_metrics()
print("\n=== 监控系统创建完成 ===")
print(f"告警数量: {sum(len(alarms) for alarms in alarms.values())}")
print(f"日志组: {list(log_groups.keys())}")
if __name__ == "__main__":
main()
基础设施即代码
使用Terraform实现基础设施的版本化管理和自动化部署。
Terraform配置模板
# main.tf - 主要的基础设施配置
terraform {
required_version = ">= 1.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
backend "s3" {
bucket = "terraform-state-bucket"
key = "infrastructure/terraform.tfstate"
region = "us-west-2"
encrypt = true
dynamodb_table = "terraform-locks"
}
}
provider "aws" {
region = var.aws_region
default_tags {
tags = {
Environment = var.environment
Project = var.project_name
ManagedBy = "Terraform"
}
}
}
# 变量定义
variable "aws_region" {
description = "AWS region"
type = string
default = "us-west-2"
}
variable "environment" {
description = "Environment name"
type = string
default = "production"
}
variable "project_name" {
description = "Project name"
type = string
default = "cloud-infrastructure"
}
variable "vpc_cidr" {
description = "VPC CIDR block"
type = string
default = "10.0.0.0/16"
}
# VPC模块
module "vpc" {
source = "./modules/vpc"
name = "${var.project_name}-vpc"
cidr = var.vpc_cidr
availability_zones = ["${var.aws_region}a", "${var.aws_region}b", "${var.aws_region}c"]
public_subnets = ["10.0.1.0/24", "10.0.2.0/24", "10.0.3.0/24"]
private_subnets = ["10.0.10.0/24", "10.0.11.0/24", "10.0.12.0/24"]
database_subnets = ["10.0.20.0/24", "10.0.21.0/24", "10.0.22.0/24"]
enable_nat_gateway = true
enable_vpn_gateway = false
enable_dns_hostnames = true
enable_dns_support = true
tags = {
Environment = var.environment
Project = var.project_name
}
}
# 安全组模块
module "security_groups" {
source = "./modules/security-groups"
vpc_id = module.vpc.vpc_id
web_ingress_rules = [
{
from_port = 80
to_port = 80
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
},
{
from_port = 443
to_port = 443
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
]
tags = {
Environment = var.environment
Project = var.project_name
}
}
# 计算资源模块
module "compute" {
source = "./modules/compute"
vpc_id = module.vpc.vpc_id
private_subnet_ids = module.vpc.private_subnets
public_subnet_ids = module.vpc.public_subnets
web_security_group_id = module.security_groups.web_security_group_id
app_security_group_id = module.security_groups.app_security_group_id
instance_type = "t3.medium"
min_size = 2
max_size = 10
desired_size = 3
tags = {
Environment = var.environment
Project = var.project_name
}
}
# 数据库模块
module "database" {
source = "./modules/database"
vpc_id = module.vpc.vpc_id
database_subnet_ids = module.vpc.database_subnets
database_security_group_id = module.security_groups.database_security_group_id
engine = "mysql"
engine_version = "8.0"
instance_class = "db.t3.medium"
allocated_storage = 100
db_name = "production_db"
username = "admin"
backup_retention_period = 7
backup_window = "03:00-04:00"
maintenance_window = "sun:04:00-sun:05:00"
multi_az = true
storage_encrypted = true
deletion_protection = true
tags = {
Environment = var.environment
Project = var.project_name
}
}
# 监控模块
module "monitoring" {
source = "./modules/monitoring"
project_name = var.project_name
environment = var.environment
ec2_instance_ids = module.compute.instance_ids
rds_instance_id = module.database.db_instance_id
alb_arn = module.compute.load_balancer_arn
notification_email = "admin@example.com"
tags = {
Environment = var.environment
Project = var.project_name
}
}
# 输出
output "vpc_id" {
description = "VPC ID"
value = module.vpc.vpc_id
}
output "load_balancer_dns" {
description = "Load balancer DNS name"
value = module.compute.load_balancer_dns_name
}
output "database_endpoint" {
description = "Database endpoint"
value = module.database.db_instance_endpoint
sensitive = true
}
output "monitoring_dashboard_url" {
description = "CloudWatch dashboard URL"
value = module.monitoring.dashboard_url
}
成本优化策略
实施有效的成本优化策略,确保云基础设施的经济性。
成本优化自动化
#!/usr/bin/env python3
"""
云成本优化管理器
提供成本分析、资源优化建议和自动化成本控制功能
"""
import boto3
import json
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import pandas as pd
@dataclass
class CostOptimizationRule:
"""成本优化规则"""
name: str
resource_type: str
condition: str
action: str
threshold: float
enabled: bool = True
class CloudCostOptimizer:
"""云成本优化器"""
def __init__(self, region: str = 'us-west-2'):
self.ce = boto3.client('ce', region_name='us-east-1') # Cost Explorer只在us-east-1可用
self.ec2 = boto3.client('ec2', region_name=region)
self.rds = boto3.client('rds', region_name=region)
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
self.region = region
def analyze_cost_trends(self, days: int = 30) -> Dict:
"""分析成本趋势"""
try:
end_date = datetime.now().strftime('%Y-%m-%d')
start_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
# 获取总成本
response = self.ce.get_cost_and_usage(
TimePeriod={
'Start': start_date,
'End': end_date
},
Granularity='DAILY',
Metrics=['BlendedCost'],
GroupBy=[
{'Type': 'DIMENSION', 'Key': 'SERVICE'}
]
)
cost_data = {}
total_cost = 0
for result in response['ResultsByTime']:
date = result['TimePeriod']['Start']
daily_total = 0
for group in result['Groups']:
service = group['Keys'][0]
cost = float(group['Metrics']['BlendedCost']['Amount'])
daily_total += cost
if service not in cost_data:
cost_data[service] = []
cost_data[service].append({'date': date, 'cost': cost})
total_cost += daily_total
return {
'total_cost': round(total_cost, 2),
'daily_average': round(total_cost / days, 2),
'service_breakdown': cost_data
}
except Exception as e:
print(f"分析成本趋势失败: {str(e)}")
return {}
def identify_unused_resources(self) -> Dict:
"""识别未使用的资源"""
unused_resources = {
'ec2_instances': [],
'ebs_volumes': [],
'elastic_ips': [],
'load_balancers': []
}
try:
# 检查停止的EC2实例
ec2_response = self.ec2.describe_instances(
Filters=[
{'Name': 'instance-state-name', 'Values': ['stopped']}
]
)
for reservation in ec2_response['Reservations']:
for instance in reservation['Instances']:
# 检查实例停止时间
if 'StateTransitionReason' in instance:
unused_resources['ec2_instances'].append({
'instance_id': instance['InstanceId'],
'instance_type': instance['InstanceType'],
'state': instance['State']['Name'],
'launch_time': instance['LaunchTime']
})
# 检查未附加的EBS卷
ebs_response = self.ec2.describe_volumes(
Filters=[
{'Name': 'status', 'Values': ['available']}
]
)
for volume in ebs_response['Volumes']:
unused_resources['ebs_volumes'].append({
'volume_id': volume['VolumeId'],
'size': volume['Size'],
'volume_type': volume['VolumeType'],
'create_time': volume['CreateTime']
})
# 检查未关联的弹性IP
eip_response = self.ec2.describe_addresses()
for address in eip_response['Addresses']:
if 'InstanceId' not in address and 'NetworkInterfaceId' not in address:
unused_resources['elastic_ips'].append({
'allocation_id': address['AllocationId'],
'public_ip': address['PublicIp']
})
print("未使用资源识别完成")
except Exception as e:
print(f"识别未使用资源失败: {str(e)}")
return unused_resources
def get_rightsizing_recommendations(self) -> List[Dict]:
"""获取实例规格优化建议"""
recommendations = []
try:
# 获取过去14天的CPU利用率数据
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=14)
# 获取所有运行中的实例
instances_response = self.ec2.describe_instances(
Filters=[
{'Name': 'instance-state-name', 'Values': ['running']}
]
)
for reservation in instances_response['Reservations']:
for instance in reservation['Instances']:
instance_id = instance['InstanceId']
instance_type = instance['InstanceType']
# 获取CPU利用率指标
cpu_response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
Dimensions=[
{'Name': 'InstanceId', 'Value': instance_id}
],
StartTime=start_time,
EndTime=end_time,
Period=3600, # 1小时
Statistics=['Average', 'Maximum']
)
if cpu_response['Datapoints']:
avg_cpu = sum(dp['Average'] for dp in cpu_response['Datapoints']) / len(cpu_response['Datapoints'])
max_cpu = max(dp['Maximum'] for dp in cpu_response['Datapoints'])
# 生成优化建议
recommendation = self._generate_rightsizing_recommendation(
instance_id, instance_type, avg_cpu, max_cpu
)
if recommendation:
recommendations.append(recommendation)
except Exception as e:
print(f"获取规格优化建议失败: {str(e)}")
return recommendations
def _generate_rightsizing_recommendation(self, instance_id: str, current_type: str,
avg_cpu: float, max_cpu: float) -> Optional[Dict]:
"""生成实例规格优化建议"""
# 实例类型映射(简化版)
instance_families = {
't3.nano': {'cpu': 2, 'memory': 0.5, 'cost_factor': 1},
't3.micro': {'cpu': 2, 'memory': 1, 'cost_factor': 2},
't3.small': {'cpu': 2, 'memory': 2, 'cost_factor': 4},
't3.medium': {'cpu': 2, 'memory': 4, 'cost_factor': 8},
't3.large': {'cpu': 2, 'memory': 8, 'cost_factor': 16},
't3.xlarge': {'cpu': 4, 'memory': 16, 'cost_factor': 32}
}
if current_type not in instance_families:
return None
current_specs = instance_families[current_type]
# 优化逻辑
if avg_cpu < 10 and max_cpu < 30:
# CPU使用率很低,建议降级
for instance_type, specs in instance_families.items():
if (specs['cost_factor'] < current_specs['cost_factor'] and
specs['cpu'] >= 2 and specs['memory'] >= 1):
potential_savings = (current_specs['cost_factor'] - specs['cost_factor']) / current_specs['cost_factor'] * 100
return {
'instance_id': instance_id,
'current_type': current_type,
'recommended_type': instance_type,
'avg_cpu_utilization': round(avg_cpu, 2),
'max_cpu_utilization': round(max_cpu, 2),
'potential_savings_percent': round(potential_savings, 2),
'reason': 'Low CPU utilization detected'
}
elif avg_cpu > 70 or max_cpu > 90:
# CPU使用率很高,建议升级
for instance_type, specs in instance_families.items():
if specs['cost_factor'] > current_specs['cost_factor']:
return {
'instance_id': instance_id,
'current_type': current_type,
'recommended_type': instance_type,
'avg_cpu_utilization': round(avg_cpu, 2),
'max_cpu_utilization': round(max_cpu, 2),
'reason': 'High CPU utilization detected - performance risk'
}
return None
def create_cost_budget(self, budget_name: str, budget_amount: float,
notification_email: str) -> bool:
"""创建成本预算"""
try:
budgets_client = boto3.client('budgets', region_name='us-east-1')
budget = {
'BudgetName': budget_name,
'BudgetLimit': {
'Amount': str(budget_amount),
'Unit': 'USD'
},
'TimeUnit': 'MONTHLY',
'BudgetType': 'COST',
'CostFilters': {
'Service': ['Amazon Elastic Compute Cloud - Compute']
}
}
# 创建预算通知
notifications = [
{
'Notification': {
'NotificationType': 'ACTUAL',
'ComparisonOperator': 'GREATER_THAN',
'Threshold': 80.0,
'ThresholdType': 'PERCENTAGE'
},
'Subscribers': [
{
'SubscriptionType': 'EMAIL',
'Address': notification_email
}
]
},
{
'Notification': {
'NotificationType': 'FORECASTED',
'ComparisonOperator': 'GREATER_THAN',
'Threshold': 100.0,
'ThresholdType': 'PERCENTAGE'
},
'Subscribers': [
{
'SubscriptionType': 'EMAIL',
'Address': notification_email
}
]
}
]
budgets_client.create_budget(
AccountId='123456789012', # 替换为实际账户ID
Budget=budget,
NotificationsWithSubscribers=notifications
)
print(f"成本预算 {budget_name} 创建成功")
return True
except Exception as e:
print(f"创建成本预算失败: {str(e)}")
return False
def main():
"""主函数 - 演示成本优化"""
# 成本优化器
cost_optimizer = CloudCostOptimizer()
# 分析成本趋势
print("分析成本趋势...")
cost_trends = cost_optimizer.analyze_cost_trends(30)
if cost_trends:
print(f"过去30天总成本: ${cost_trends['total_cost']}")
print(f"日均成本: ${cost_trends['daily_average']}")
# 识别未使用资源
print("\n识别未使用资源...")
unused_resources = cost_optimizer.identify_unused_resources()
for resource_type, resources in unused_resources.items():
if resources:
print(f"{resource_type}: {len(resources)} 个未使用资源")
# 获取规格优化建议
print("\n获取实例规格优化建议...")
recommendations = cost_optimizer.get_rightsizing_recommendations()
for rec in recommendations:
print(f"实例 {rec['instance_id']}: {rec['current_type']} -> {rec['recommended_type']}")
if 'potential_savings_percent' in rec:
print(f" 潜在节省: {rec['potential_savings_percent']}%")
print(f" 原因: {rec['reason']}")
# 创建成本预算
print("\n创建成本预算...")
cost_optimizer.create_cost_budget(
'monthly-infrastructure-budget',
1000.0, # $1000/月
'admin@example.com'
)
print("\n=== 成本优化分析完成 ===")
if __name__ == "__main__":
main()
总结
云基础设施架构设计是一个复杂的系统工程,需要综合考虑以下核心要素:
架构设计核心要素
-
网络架构
- VPC设计和子网规划
- 安全组和网络ACL配置
- 负载均衡和流量分发
- NAT Gateway和Internet Gateway管理
-
计算资源管理
- 自动扩缩容策略
- 实例类型选择和优化
- 启动模板和配置管理
- 容器化和编排平台
-
存储架构
- 多层存储策略
- 数据生命周期管理
- 备份和灾难恢复
- 性能优化和成本控制
-
安全架构
- 零信任安全模型
- 身份认证和访问控制
- 数据加密和网络安全
- 审计和合规性管理
-
监控和可观测性
- 全方位监控指标
- 智能告警和通知
- 日志聚合和分析
- 性能优化建议
-
基础设施即代码
- Terraform模块化设计
- 版本控制和变更管理
- 自动化部署流水线
- 环境一致性保证
最佳实践建议
-
设计原则
- 采用模块化和可重用的架构设计
- 实施多层防护和冗余机制
- 优先考虑自动化和标准化
- 持续优化和迭代改进
-
运维管理
- 建立完善的监控和告警体系
- 实施自动化的故障恢复机制
- 定期进行安全审计和合规检查
- 持续进行成本优化和资源调整
-
团队协作
- 建立清晰的角色和责任分工
- 实施代码审查和变更控制流程
- 提供充分的文档和培训
- 建立有效的沟通和协作机制
通过系统性的架构设计和实施,可以构建出高可用、高性能、安全可靠且成本优化的现代云基础设施平台,为业务发展提供强有力的技术支撑。
本文提供的代码示例和配置模板可以作为实际项目的起点,但在生产环境中使用前,请根据具体需求进行适当的调整和测试。
–port 3306
–source-group “$APP_SG_ID” || true
aws ec2 authorize-security-group-ingress \
--group-id "$DB_SG_ID" \
--protocol tcp \