跳转到主要内容

云原生微服务架构设计与实践

博主
38 分钟
8016 字
--

AI 导读

深刻理解和准确把握"云原生微服务架构设计与实践"这一重要概念的核心要义,本文从理论基础、实践应用和发展前景等多个维度进行了系统性阐述,为读者提供了全面而深入的分析视角。

内容由AI智能生成

云原生微服务架构设计与实践

引言

随着数字化转型的深入推进,传统的单体应用架构已经无法满足现代企业对敏捷性、可扩展性和高可用性的需求。云原生微服务架构作为现代应用开发的主流模式,通过将复杂的单体应用拆分为多个独立的微服务,实现了更好的可维护性、可扩展性和技术多样性。

本文将深入探讨云原生微服务架构的设计原则、实现方案和最佳实践,帮助企业构建高效、可靠的微服务系统。

目录

  1. 微服务架构概述
  2. 服务拆分策略
  3. 容器化与编排
  4. 服务网格架构
  5. API网关设计
  6. 数据管理策略
  7. 监控与可观测性
  8. 最佳实践与建议
  9. 总结

微服务架构概述

架构演进路径

graph TD
    A[单体应用] --> B[分层架构]
    B --> C[SOA架构]
    C --> D[微服务架构]
    D --> E[云原生微服务]
    
    A1[部署简单<br/>开发效率高<br/>技术栈统一] --> A
    B1[模块化<br/>职责分离<br/>可维护性提升] --> B
    C1[服务重用<br/>松耦合<br/>标准化接口] --> C
    D1[独立部署<br/>技术多样性<br/>团队自治] --> D
    E1[容器化<br/>自动化运维<br/>弹性伸缩] --> E

微服务架构分析器

from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
from enum import Enum
import json
import yaml

class ServiceType(Enum):
    BUSINESS = "business"
    INFRASTRUCTURE = "infrastructure"
    GATEWAY = "gateway"
    DATA = "data"

class CommunicationPattern(Enum):
    SYNCHRONOUS = "synchronous"
    ASYNCHRONOUS = "asynchronous"
    EVENT_DRIVEN = "event_driven"

@dataclass
class ServiceDefinition:
    name: str
    type: ServiceType
    responsibilities: List[str]
    dependencies: List[str]
    data_stores: List[str]
    communication_patterns: List[CommunicationPattern]
    scalability_requirements: Dict[str, any]
    
@dataclass
class ArchitectureRecommendation:
    service_count: int
    complexity_score: float
    recommended_patterns: List[str]
    technology_stack: Dict[str, str]
    deployment_strategy: str
    estimated_cost: Dict[str, float]

class MicroservicesArchitectureAnalyzer:
    def __init__(self):
        self.services = []
        self.domain_boundaries = {}
        
    def analyze_domain(self, business_capabilities: List[str], 
                      data_entities: List[str],
                      user_journeys: List[str]) -> Dict[str, any]:
        """分析业务域,识别服务边界"""
        
        # 基于业务能力分析
        capability_groups = self._group_capabilities(business_capabilities)
        
        # 基于数据实体分析
        data_clusters = self._cluster_data_entities(data_entities)
        
        # 基于用户旅程分析
        journey_flows = self._analyze_user_journeys(user_journeys)
        
        # 生成服务建议
        service_recommendations = self._generate_service_recommendations(
            capability_groups, data_clusters, journey_flows
        )
        
        return {
            "capability_groups": capability_groups,
            "data_clusters": data_clusters,
            "journey_flows": journey_flows,
            "service_recommendations": service_recommendations,
            "complexity_analysis": self._calculate_complexity(service_recommendations)
        }
    
    def _group_capabilities(self, capabilities: List[str]) -> Dict[str, List[str]]:
        """基于业务能力分组"""
        groups = {
            "user_management": [],
            "order_processing": [],
            "payment_handling": [],
            "inventory_management": [],
            "notification_services": [],
            "analytics_reporting": []
        }
        
        capability_mapping = {
            "user": "user_management",
            "auth": "user_management",
            "order": "order_processing",
            "cart": "order_processing",
            "payment": "payment_handling",
            "billing": "payment_handling",
            "inventory": "inventory_management",
            "product": "inventory_management",
            "notification": "notification_services",
            "email": "notification_services",
            "analytics": "analytics_reporting",
            "report": "analytics_reporting"
        }
        
        for capability in capabilities:
            for keyword, group in capability_mapping.items():
                if keyword in capability.lower():
                    groups[group].append(capability)
                    break
        
        return {k: v for k, v in groups.items() if v}
    
    def _cluster_data_entities(self, entities: List[str]) -> Dict[str, List[str]]:
        """基于数据实体聚类"""
        clusters = {}
        
        # 简化的聚类逻辑
        entity_relationships = {
            "user": ["profile", "preference", "authentication"],
            "order": ["item", "shipping", "status"],
            "product": ["catalog", "inventory", "pricing"],
            "payment": ["transaction", "billing", "refund"]
        }
        
        for entity in entities:
            for cluster_key, related_entities in entity_relationships.items():
                if any(related in entity.lower() for related in related_entities):
                    if cluster_key not in clusters:
                        clusters[cluster_key] = []
                    clusters[cluster_key].append(entity)
                    break
        
        return clusters
    
    def _analyze_user_journeys(self, journeys: List[str]) -> Dict[str, List[str]]:
        """分析用户旅程"""
        flows = {}
        
        for journey in journeys:
            # 简化的旅程分析
            if "registration" in journey.lower():
                flows["user_onboarding"] = ["authentication", "profile_creation", "verification"]
            elif "purchase" in journey.lower():
                flows["purchase_flow"] = ["product_search", "cart_management", "payment", "order_fulfillment"]
            elif "support" in journey.lower():
                flows["customer_support"] = ["ticket_creation", "communication", "resolution"]
        
        return flows
    
    def _generate_service_recommendations(self, capability_groups: Dict, 
                                        data_clusters: Dict, 
                                        journey_flows: Dict) -> List[ServiceDefinition]:
        """生成服务建议"""
        services = []
        
        # 基于能力组生成服务
        for group_name, capabilities in capability_groups.items():
            service = ServiceDefinition(
                name=f"{group_name}_service",
                type=ServiceType.BUSINESS,
                responsibilities=capabilities,
                dependencies=[],
                data_stores=[group_name + "_db"],
                communication_patterns=[CommunicationPattern.SYNCHRONOUS],
                scalability_requirements={"min_instances": 2, "max_instances": 10}
            )
            services.append(service)
        
        # 添加基础设施服务
        infrastructure_services = [
            ServiceDefinition(
                name="api_gateway",
                type=ServiceType.GATEWAY,
                responsibilities=["request_routing", "authentication", "rate_limiting"],
                dependencies=[],
                data_stores=["gateway_cache"],
                communication_patterns=[CommunicationPattern.SYNCHRONOUS],
                scalability_requirements={"min_instances": 3, "max_instances": 20}
            ),
            ServiceDefinition(
                name="event_bus",
                type=ServiceType.INFRASTRUCTURE,
                responsibilities=["event_routing", "message_persistence", "delivery_guarantee"],
                dependencies=[],
                data_stores=["event_store"],
                communication_patterns=[CommunicationPattern.ASYNCHRONOUS],
                scalability_requirements={"min_instances": 3, "max_instances": 15}
            )
        ]
        
        services.extend(infrastructure_services)
        return services
    
    def _calculate_complexity(self, services: List[ServiceDefinition]) -> Dict[str, any]:
        """计算架构复杂度"""
        service_count = len(services)
        
        # 计算复杂度分数
        complexity_factors = {
            "service_count": service_count * 0.3,
            "communication_complexity": sum(len(s.dependencies) for s in services) * 0.4,
            "data_complexity": sum(len(s.data_stores) for s in services) * 0.3
        }
        
        total_complexity = sum(complexity_factors.values())
        
        return {
            "total_score": total_complexity,
            "factors": complexity_factors,
            "recommendation": self._get_complexity_recommendation(total_complexity)
        }
    
    def _get_complexity_recommendation(self, score: float) -> str:
        """基于复杂度分数提供建议"""
        if score < 10:
            return "低复杂度:适合小团队,建议从简单的服务拆分开始"
        elif score < 25:
            return "中等复杂度:需要完善的DevOps流程和监控体系"
        else:
            return "高复杂度:需要专业的微服务治理平台和经验丰富的团队"
    
    def generate_architecture_blueprint(self, services: List[ServiceDefinition]) -> Dict[str, any]:
        """生成架构蓝图"""
        blueprint = {
            "version": "1.0",
            "services": {},
            "communication_matrix": {},
            "deployment_topology": {},
            "monitoring_strategy": {}
        }
        
        # 服务定义
        for service in services:
            blueprint["services"][service.name] = {
                "type": service.type.value,
                "responsibilities": service.responsibilities,
                "dependencies": service.dependencies,
                "data_stores": service.data_stores,
                "scaling": service.scalability_requirements
            }
        
        # 通信矩阵
        blueprint["communication_matrix"] = self._build_communication_matrix(services)
        
        # 部署拓扑
        blueprint["deployment_topology"] = self._design_deployment_topology(services)
        
        # 监控策略
        blueprint["monitoring_strategy"] = self._design_monitoring_strategy(services)
        
        return blueprint
    
    def _build_communication_matrix(self, services: List[ServiceDefinition]) -> Dict[str, Dict[str, str]]:
        """构建服务通信矩阵"""
        matrix = {}
        
        for service in services:
            matrix[service.name] = {}
            for dep in service.dependencies:
                # 简化的通信模式推断
                if service.type == ServiceType.GATEWAY:
                    matrix[service.name][dep] = "http_sync"
                elif "event" in dep:
                    matrix[service.name][dep] = "event_async"
                else:
                    matrix[service.name][dep] = "http_sync"
        
        return matrix
    
    def _design_deployment_topology(self, services: List[ServiceDefinition]) -> Dict[str, any]:
        """设计部署拓扑"""
        topology = {
            "clusters": {
                "production": {
                    "nodes": 6,
                    "node_type": "m5.large",
                    "availability_zones": 3
                },
                "staging": {
                    "nodes": 3,
                    "node_type": "m5.medium",
                    "availability_zones": 2
                }
            },
            "namespaces": {
                "gateway": ["api_gateway"],
                "business": [s.name for s in services if s.type == ServiceType.BUSINESS],
                "infrastructure": [s.name for s in services if s.type == ServiceType.INFRASTRUCTURE],
                "data": [s.name for s in services if s.type == ServiceType.DATA]
            },
            "ingress": {
                "load_balancer": "nginx",
                "ssl_termination": True,
                "rate_limiting": True
            }
        }
        
        return topology
    
    def _design_monitoring_strategy(self, services: List[ServiceDefinition]) -> Dict[str, any]:
        """设计监控策略"""
        strategy = {
            "metrics": {
                "application": ["request_rate", "error_rate", "response_time"],
                "infrastructure": ["cpu_usage", "memory_usage", "disk_io"],
                "business": ["transaction_volume", "user_activity", "revenue_metrics"]
            },
            "logging": {
                "centralized": True,
                "structured": True,
                "retention_days": 30
            },
            "tracing": {
                "distributed": True,
                "sampling_rate": 0.1,
                "trace_retention_hours": 72
            },
            "alerting": {
                "sla_violations": True,
                "error_rate_threshold": 0.05,
                "response_time_threshold": "2s"
            }
        }
        
        return strategy

# 使用示例
def microservices_analysis_example():
    analyzer = MicroservicesArchitectureAnalyzer()
    
    # 业务能力
    business_capabilities = [
        "User Registration", "User Authentication", "User Profile Management",
        "Product Catalog", "Inventory Management", "Order Processing",
        "Payment Processing", "Shipping Management", "Notification Service",
        "Analytics and Reporting", "Customer Support"
    ]
    
    # 数据实体
    data_entities = [
        "User", "UserProfile", "Product", "Category", "Inventory",
        "Order", "OrderItem", "Payment", "Transaction", "Shipment",
        "Notification", "Report", "SupportTicket"
    ]
    
    # 用户旅程
    user_journeys = [
        "User Registration and Onboarding",
        "Product Discovery and Purchase",
        "Order Tracking and Support",
        "Account Management"
    ]
    
    # 分析域
    analysis_result = analyzer.analyze_domain(
        business_capabilities, data_entities, user_journeys
    )
    
    print("=== 微服务架构分析结果 ===")
    print(f"识别的能力组: {list(analysis_result['capability_groups'].keys())}")
    print(f"数据聚类: {list(analysis_result['data_clusters'].keys())}")
    print(f"用户旅程: {list(analysis_result['journey_flows'].keys())}")
    print(f"复杂度分析: {analysis_result['complexity_analysis']['recommendation']}")
    
    # 生成架构蓝图
    services = analysis_result['service_recommendations']
    blueprint = analyzer.generate_architecture_blueprint(services)
    
    print(f"\n=== 架构蓝图 ===")
    print(f"服务数量: {len(blueprint['services'])}")
    print(f"部署集群: {list(blueprint['deployment_topology']['clusters'].keys())}")
    print(f"监控指标: {blueprint['monitoring_strategy']['metrics']['application']}")
    
    return analysis_result, blueprint

if __name__ == "__main__":
    microservices_analysis_example()

服务拆分策略

领域驱动设计(DDD)

from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
import uuid

class BoundedContextType(Enum):
    CORE = "core"
    SUPPORTING = "supporting"
    GENERIC = "generic"

@dataclass
class DomainEvent:
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    event_type: str = ""
    aggregate_id: str = ""
    version: int = 1
    timestamp: str = ""
    data: Dict[str, Any] = field(default_factory=dict)

class AggregateRoot(ABC):
    def __init__(self, aggregate_id: str):
        self.aggregate_id = aggregate_id
        self.version = 0
        self.uncommitted_events: List[DomainEvent] = []
    
    def mark_events_as_committed(self):
        self.uncommitted_events.clear()
    
    def get_uncommitted_events(self) -> List[DomainEvent]:
        return self.uncommitted_events.copy()
    
    def apply_event(self, event: DomainEvent):
        self.uncommitted_events.append(event)
        self.version += 1

@dataclass
class BoundedContext:
    name: str
    type: BoundedContextType
    domain_services: List[str]
    aggregates: List[str]
    value_objects: List[str]
    repositories: List[str]
    domain_events: List[str]
    integration_events: List[str]
    
class ServiceDecompositionStrategy:
    def __init__(self):
        self.bounded_contexts = {}
        self.context_map = {}
    
    def define_bounded_context(self, context: BoundedContext):
        """定义限界上下文"""
        self.bounded_contexts[context.name] = context
        
    def analyze_context_relationships(self) -> Dict[str, Dict[str, str]]:
        """分析上下文关系"""
        relationships = {}
        
        # 定义上下文间的关系模式
        relationship_patterns = {
            ("user_management", "order_processing"): "customer_supplier",
            ("order_processing", "payment_processing"): "partnership",
            ("order_processing", "inventory_management"): "shared_kernel",
            ("notification_service", "order_processing"): "conformist",
            ("analytics", "order_processing"): "anticorruption_layer"
        }
        
        for (upstream, downstream), pattern in relationship_patterns.items():
            if upstream not in relationships:
                relationships[upstream] = {}
            relationships[upstream][downstream] = pattern
            
        return relationships
    
    def generate_service_boundaries(self) -> Dict[str, Dict[str, Any]]:
        """生成服务边界"""
        service_boundaries = {}
        
        for context_name, context in self.bounded_contexts.items():
            # 基于聚合根确定服务边界
            if context.type == BoundedContextType.CORE:
                # 核心域:每个聚合一个服务
                for aggregate in context.aggregates:
                    service_name = f"{context_name}_{aggregate}_service"
                    service_boundaries[service_name] = {
                        "bounded_context": context_name,
                        "primary_aggregate": aggregate,
                        "responsibilities": [aggregate],
                        "data_ownership": [aggregate.lower() + "_data"],
                        "api_contracts": self._generate_api_contract(aggregate),
                        "integration_patterns": ["event_sourcing", "cqrs"]
                    }
            else:
                # 支撑域和通用域:可以合并多个聚合
                service_name = f"{context_name}_service"
                service_boundaries[service_name] = {
                    "bounded_context": context_name,
                    "aggregates": context.aggregates,
                    "responsibilities": context.aggregates,
                    "data_ownership": [agg.lower() + "_data" for agg in context.aggregates],
                    "api_contracts": self._generate_composite_api_contract(context.aggregates),
                    "integration_patterns": ["crud", "shared_database"]
                }
        
        return service_boundaries
    
    def _generate_api_contract(self, aggregate: str) -> Dict[str, List[str]]:
        """生成API契约"""
        base_operations = ["create", "update", "delete", "get", "list"]
        
        return {
            "commands": [f"{op}_{aggregate}" for op in base_operations[:3]],
            "queries": [f"{op}_{aggregate}" for op in base_operations[3:]],
            "events": [f"{aggregate}_created", f"{aggregate}_updated", f"{aggregate}_deleted"]
        }
    
    def _generate_composite_api_contract(self, aggregates: List[str]) -> Dict[str, List[str]]:
        """生成复合API契约"""
        commands = []
        queries = []
        events = []
        
        for aggregate in aggregates:
            contract = self._generate_api_contract(aggregate)
            commands.extend(contract["commands"])
            queries.extend(contract["queries"])
            events.extend(contract["events"])
        
        return {
            "commands": commands,
            "queries": queries,
            "events": events
        }
    
    def validate_service_design(self, service_boundaries: Dict[str, Dict[str, Any]]) -> Dict[str, List[str]]:
        """验证服务设计"""
        validation_results = {
            "warnings": [],
            "recommendations": [],
            "violations": []
        }
        
        # 检查服务数量
        service_count = len(service_boundaries)
        if service_count > 20:
            validation_results["warnings"].append(f"服务数量过多({service_count}),可能导致管理复杂度过高")
        elif service_count < 3:
            validation_results["warnings"].append(f"服务数量过少({service_count}),可能未充分利用微服务优势")
        
        # 检查数据所有权
        data_ownership = {}
        for service_name, service_info in service_boundaries.items():
            for data in service_info.get("data_ownership", []):
                if data in data_ownership:
                    validation_results["violations"].append(
                        f"数据所有权冲突: {data}{service_name}{data_ownership[data]} 同时拥有"
                    )
                data_ownership[data] = service_name
        
        # 检查API设计
        for service_name, service_info in service_boundaries.items():
            api_contract = service_info.get("api_contracts", {})
            if not api_contract.get("events"):
                validation_results["recommendations"].append(
                    f"{service_name} 缺少领域事件,建议添加事件发布机制"
                )
        
        return validation_results

# 使用示例
def service_decomposition_example():
    strategy = ServiceDecompositionStrategy()
    
    # 定义限界上下文
    contexts = [
        BoundedContext(
            name="user_management",
            type=BoundedContextType.CORE,
            domain_services=["UserService", "AuthenticationService"],
            aggregates=["User", "UserProfile"],
            value_objects=["Email", "Password"],
            repositories=["UserRepository"],
            domain_events=["UserRegistered", "UserProfileUpdated"],
            integration_events=["UserCreated", "UserActivated"]
        ),
        BoundedContext(
            name="order_processing",
            type=BoundedContextType.CORE,
            domain_services=["OrderService", "OrderValidationService"],
            aggregates=["Order", "OrderItem"],
            value_objects=["Money", "Quantity"],
            repositories=["OrderRepository"],
            domain_events=["OrderPlaced", "OrderShipped"],
            integration_events=["OrderCreated", "OrderCompleted"]
        ),
        BoundedContext(
            name="payment_processing",
            type=BoundedContextType.SUPPORTING,
            domain_services=["PaymentService"],
            aggregates=["Payment", "Transaction"],
            value_objects=["CreditCard", "Amount"],
            repositories=["PaymentRepository"],
            domain_events=["PaymentProcessed", "PaymentFailed"],
            integration_events=["PaymentCompleted", "RefundIssued"]
        )
    ]
    
    # 注册上下文
    for context in contexts:
        strategy.define_bounded_context(context)
    
    # 分析关系
    relationships = strategy.analyze_context_relationships()
    print("=== 上下文关系分析 ===")
    for upstream, downstreams in relationships.items():
        for downstream, pattern in downstreams.items():
            print(f"{upstream} -> {downstream}: {pattern}")
    
    # 生成服务边界
    service_boundaries = strategy.generate_service_boundaries()
    print(f"\n=== 服务边界设计 ===")
    for service_name, service_info in service_boundaries.items():
        print(f"\n服务: {service_name}")
        print(f"  限界上下文: {service_info['bounded_context']}")
        print(f"  职责: {service_info['responsibilities']}")
        print(f"  数据所有权: {service_info['data_ownership']}")
    
    # 验证设计
    validation = strategy.validate_service_design(service_boundaries)
    print(f"\n=== 设计验证 ===")
    if validation["violations"]:
        print("违规项:")
        for violation in validation["violations"]:
            print(f"  - {violation}")
    
    if validation["warnings"]:
        print("警告项:")
        for warning in validation["warnings"]:
            print(f"  - {warning}")
    
    if validation["recommendations"]:
        print("建议项:")
        for recommendation in validation["recommendations"]:
            print(f"  - {recommendation}")
    
    return service_boundaries, validation

if __name__ == "__main__":
    service_decomposition_example()

容器化与编排

Kubernetes部署配置

import yaml
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum

class DeploymentStrategy(Enum):
    ROLLING_UPDATE = "RollingUpdate"
    RECREATE = "Recreate"
    BLUE_GREEN = "BlueGreen"
    CANARY = "Canary"

class ServiceType(Enum):
    CLUSTER_IP = "ClusterIP"
    NODE_PORT = "NodePort"
    LOAD_BALANCER = "LoadBalancer"

@dataclass
class ResourceRequirements:
    cpu_request: str = "100m"
    cpu_limit: str = "500m"
    memory_request: str = "128Mi"
    memory_limit: str = "512Mi"

@dataclass
class HealthCheck:
    path: str = "/health"
    port: int = 8080
    initial_delay: int = 30
    period: int = 10
    timeout: int = 5
    failure_threshold: int = 3

class KubernetesManifestGenerator:
    def __init__(self):
        self.namespace = "microservices"
        self.app_version = "v1.0.0"
    
    def generate_namespace(self) -> Dict[str, Any]:
        """生成命名空间配置"""
        return {
            "apiVersion": "v1",
            "kind": "Namespace",
            "metadata": {
                "name": self.namespace,
                "labels": {
                    "name": self.namespace,
                    "environment": "production"
                }
            }
        }
    
    def generate_deployment(self, 
                          service_name: str,
                          image: str,
                          replicas: int = 3,
                          resources: ResourceRequirements = None,
                          health_check: HealthCheck = None,
                          env_vars: Dict[str, str] = None,
                          strategy: DeploymentStrategy = DeploymentStrategy.ROLLING_UPDATE) -> Dict[str, Any]:
        """生成Deployment配置"""
        
        if resources is None:
            resources = ResourceRequirements()
        
        if health_check is None:
            health_check = HealthCheck()
        
        if env_vars is None:
            env_vars = {}
        
        deployment = {
            "apiVersion": "apps/v1",
            "kind": "Deployment",
            "metadata": {
                "name": service_name,
                "namespace": self.namespace,
                "labels": {
                    "app": service_name,
                    "version": self.app_version
                }
            },
            "spec": {
                "replicas": replicas,
                "strategy": {
                    "type": strategy.value
                },
                "selector": {
                    "matchLabels": {
                        "app": service_name
                    }
                },
                "template": {
                    "metadata": {
                        "labels": {
                            "app": service_name,
                            "version": self.app_version
                        }
                    },
                    "spec": {
                        "containers": [{
                            "name": service_name,
                            "image": image,
                            "ports": [{
                                "containerPort": health_check.port,
                                "name": "http"
                            }],
                            "env": [{"name": k, "value": v} for k, v in env_vars.items()],
                            "resources": {
                                "requests": {
                                    "cpu": resources.cpu_request,
                                    "memory": resources.memory_request
                                },
                                "limits": {
                                    "cpu": resources.cpu_limit,
                                    "memory": resources.memory_limit
                                }
                            },
                            "livenessProbe": {
                                "httpGet": {
                                    "path": health_check.path,
                                    "port": health_check.port
                                },
                                "initialDelaySeconds": health_check.initial_delay,
                                "periodSeconds": health_check.period,
                                "timeoutSeconds": health_check.timeout,
                                "failureThreshold": health_check.failure_threshold
                            },
                            "readinessProbe": {
                                "httpGet": {
                                    "path": health_check.path,
                                    "port": health_check.port
                                },
                                "initialDelaySeconds": 10,
                                "periodSeconds": 5,
                                "timeoutSeconds": health_check.timeout,
                                "failureThreshold": 2
                            }
                        }]
                    }
                }
            }
        }
        
        # 添加滚动更新策略配置
        if strategy == DeploymentStrategy.ROLLING_UPDATE:
            deployment["spec"]["strategy"]["rollingUpdate"] = {
                "maxUnavailable": "25%",
                "maxSurge": "25%"
            }
        
        return deployment
    
    def generate_service(self, 
                        service_name: str,
                        port: int = 80,
                        target_port: int = 8080,
                        service_type: ServiceType = ServiceType.CLUSTER_IP) -> Dict[str, Any]:
        """生成Service配置"""
        
        service = {
            "apiVersion": "v1",
            "kind": "Service",
            "metadata": {
                "name": service_name,
                "namespace": self.namespace,
                "labels": {
                    "app": service_name
                }
            },
            "spec": {
                "type": service_type.value,
                "ports": [{
                    "port": port,
                    "targetPort": target_port,
                    "protocol": "TCP",
                    "name": "http"
                }],
                "selector": {
                    "app": service_name
                }
            }
        }
        
        return service
    
    def generate_configmap(self, 
                          name: str,
                          data: Dict[str, str]) -> Dict[str, Any]:
        """生成ConfigMap配置"""
        
        return {
            "apiVersion": "v1",
            "kind": "ConfigMap",
            "metadata": {
                "name": name,
                "namespace": self.namespace
            },
            "data": data
        }
    
    def generate_secret(self, 
                       name: str,
                       data: Dict[str, str]) -> Dict[str, Any]:
        """生成Secret配置"""
        
        import base64
        
        # 对数据进行base64编码
        encoded_data = {k: base64.b64encode(v.encode()).decode() for k, v in data.items()}
        
        return {
            "apiVersion": "v1",
            "kind": "Secret",
            "metadata": {
                "name": name,
                "namespace": self.namespace
            },
            "type": "Opaque",
            "data": encoded_data
        }
    
    def generate_hpa(self, 
                     service_name: str,
                     min_replicas: int = 2,
                     max_replicas: int = 10,
                     cpu_threshold: int = 70) -> Dict[str, Any]:
        """生成HorizontalPodAutoscaler配置"""
        
        return {
            "apiVersion": "autoscaling/v2",
            "kind": "HorizontalPodAutoscaler",
            "metadata": {
                "name": f"{service_name}-hpa",
                "namespace": self.namespace
            },
            "spec": {
                "scaleTargetRef": {
                    "apiVersion": "apps/v1",
                    "kind": "Deployment",
                    "name": service_name
                },
                "minReplicas": min_replicas,
                "maxReplicas": max_replicas,
                "metrics": [{
                    "type": "Resource",
                    "resource": {
                        "name": "cpu",
                        "target": {
                            "type": "Utilization",
                            "averageUtilization": cpu_threshold
                        }
                    }
                }]
            }
        }
    
    def generate_ingress(self, 
                        name: str,
                        rules: List[Dict[str, Any]],
                        tls_enabled: bool = True) -> Dict[str, Any]:
        """生成Ingress配置"""
        
        ingress = {
            "apiVersion": "networking.k8s.io/v1",
            "kind": "Ingress",
            "metadata": {
                "name": name,
                "namespace": self.namespace,
                "annotations": {
                    "nginx.ingress.kubernetes.io/rewrite-target": "/",
                    "nginx.ingress.kubernetes.io/ssl-redirect": "true" if tls_enabled else "false"
                }
            },
            "spec": {
                "ingressClassName": "nginx",
                "rules": rules
            }
        }
        
        if tls_enabled:
            ingress["spec"]["tls"] = [{
                "hosts": [rule["host"] for rule in rules if "host" in rule],
                "secretName": f"{name}-tls"
            }]
        
        return ingress
    
    def generate_complete_manifest(self, service_configs: List[Dict[str, Any]]) -> str:
        """生成完整的Kubernetes清单"""
        
        manifests = []
        
        # 添加命名空间
        manifests.append(self.generate_namespace())
        
        # 为每个服务生成配置
        for config in service_configs:
            service_name = config["name"]
            image = config["image"]
            
            # Deployment
            deployment = self.generate_deployment(
                service_name=service_name,
                image=image,
                replicas=config.get("replicas", 3),
                resources=config.get("resources"),
                health_check=config.get("health_check"),
                env_vars=config.get("env_vars", {})
            )
            manifests.append(deployment)
            
            # Service
            service = self.generate_service(
                service_name=service_name,
                port=config.get("port", 80),
                target_port=config.get("target_port", 8080)
            )
            manifests.append(service)
            
            # HPA
            if config.get("auto_scaling", True):
                hpa = self.generate_hpa(
                    service_name=service_name,
                    min_replicas=config.get("min_replicas", 2),
                    max_replicas=config.get("max_replicas", 10)
                )
                manifests.append(hpa)
            
            # ConfigMap
            if config.get("config_data"):
                configmap = self.generate_configmap(
                    name=f"{service_name}-config",
                    data=config["config_data"]
                )
                manifests.append(configmap)
            
            # Secret
            if config.get("secret_data"):
                secret = self.generate_secret(
                    name=f"{service_name}-secret",
                    data=config["secret_data"]
                )
                manifests.append(secret)
        
        # 生成Ingress
        ingress_rules = []
        for config in service_configs:
            if config.get("external_access"):
                ingress_rules.append({
                    "host": config.get("host", f"{config['name']}.example.com"),
                    "http": {
                        "paths": [{
                            "path": config.get("path", "/"),
                            "pathType": "Prefix",
                            "backend": {
                                "service": {
                                    "name": config["name"],
                                    "port": {
                                        "number": config.get("port", 80)
                                    }
                                }
                            }
                        }]
                    }
                })
        
        if ingress_rules:
            ingress = self.generate_ingress(
                name="microservices-ingress",
                rules=ingress_rules
            )
            manifests.append(ingress)
        
        # 转换为YAML
        yaml_content = ""
        for manifest in manifests:
            yaml_content += "---\n"
            yaml_content += yaml.dump(manifest, default_flow_style=False, allow_unicode=True)
            yaml_content += "\n"
        
        return yaml_content

# 使用示例
def kubernetes_deployment_example():
    generator = KubernetesManifestGenerator()
    
    # 定义服务配置
    service_configs = [
        {
            "name": "user-service",
            "image": "myregistry/user-service:v1.0.0",
            "replicas": 3,
            "port": 80,
            "target_port": 8080,
            "external_access": True,
            "host": "api.example.com",
            "path": "/users",
            "resources": ResourceRequirements(
                cpu_request="200m",
                cpu_limit="1000m",
                memory_request="256Mi",
                memory_limit="1Gi"
            ),
            "health_check": HealthCheck(
                path="/health",
                port=8080
            ),
            "env_vars": {
                "SPRING_PROFILES_ACTIVE": "production",
                "DATABASE_URL": "postgresql://db:5432/userdb"
            },
            "config_data": {
                "application.yml": """
server:
  port: 8080
spring:
  datasource:
    url: ${DATABASE_URL}
    username: ${DB_USERNAME}
    password: ${DB_PASSWORD}
logging:
  level:
    com.example: INFO
                """.strip()
            },
            "secret_data": {
                "DB_USERNAME": "user_service",
                "DB_PASSWORD": "secure_password_123"
            }
        },
        {
            "name": "order-service",
            "image": "myregistry/order-service:v1.0.0",
            "replicas": 5,
            "port": 80,
            "target_port": 8080,
            "external_access": True,
            "host": "api.example.com",
            "path": "/orders",
            "min_replicas": 3,
            "max_replicas": 15,
            "env_vars": {
                "SPRING_PROFILES_ACTIVE": "production",
                "KAFKA_BROKERS": "kafka:9092"
            }
        },
        {
            "name": "payment-service",
            "image": "myregistry/payment-service:v1.0.0",
            "replicas": 2,
            "port": 80,
            "target_port": 8080,
            "external_access": False,
            "env_vars": {
                "PAYMENT_GATEWAY_URL": "https://payment-gateway.example.com",
                "ENCRYPTION_KEY": "payment_encryption_key"
            }
        }
    ]
    
    # 生成完整清单
    manifest_yaml = generator.generate_complete_manifest(service_configs)
    
    print("=== Kubernetes部署清单 ===")
    print(manifest_yaml)
    
    # 保存到文件
    with open("microservices-deployment.yaml", "w", encoding="utf-8") as f:
        f.write(manifest_yaml)
    
    print("清单已保存到 microservices-deployment.yaml")
    
    return manifest_yaml

if __name__ == "__main__":
    kubernetes_deployment_example()

服务网格架构

Istio服务网格配置

import yaml
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum

class TrafficPolicy(Enum):
    ROUND_ROBIN = "ROUND_ROBIN"
    LEAST_CONN = "LEAST_CONN"
    RANDOM = "RANDOM"
    PASSTHROUGH = "PASSTHROUGH"

class TLSMode(Enum):
    DISABLE = "DISABLE"
    SIMPLE = "SIMPLE"
    MUTUAL = "MUTUAL"
    ISTIO_MUTUAL = "ISTIO_MUTUAL"

@dataclass
class CircuitBreakerConfig:
    consecutive_errors: int = 5
    interval: str = "30s"
    base_ejection_time: str = "30s"
    max_ejection_percent: int = 50

@dataclass
class RetryConfig:
    attempts: int = 3
    per_try_timeout: str = "2s"
    retry_on: str = "5xx,reset,connect-failure,refused-stream"

class IstioServiceMeshManager:
    def __init__(self, namespace: str = "microservices"):
        self.namespace = namespace
    
    def generate_virtual_service(self, 
                                service_name: str,
                                hosts: List[str],
                                routes: List[Dict[str, Any]],
                                fault_injection: Optional[Dict[str, Any]] = None,
                                timeout: Optional[str] = None,
                                retry: Optional[RetryConfig] = None) -> Dict[str, Any]:
        """生成VirtualService配置"""
        
        virtual_service = {
            "apiVersion": "networking.istio.io/v1beta1",
            "kind": "VirtualService",
            "metadata": {
                "name": f"{service_name}-vs",
                "namespace": self.namespace
            },
            "spec": {
                "hosts": hosts,
                "http": []
            }
        }
        
        for route in routes:
            http_route = {
                "match": route.get("match", [{"uri": {"prefix": "/"}}]),
                "route": route["destinations"]
            }
            
            # 添加超时配置
            if timeout:
                http_route["timeout"] = timeout
            
            # 添加重试配置
            if retry:
                http_route["retries"] = {
                    "attempts": retry.attempts,
                    "perTryTimeout": retry.per_try_timeout,
                    "retryOn": retry.retry_on
                }
            
            # 添加故障注入
            if fault_injection:
                http_route["fault"] = fault_injection
            
            virtual_service["spec"]["http"].append(http_route)
        
        return virtual_service
    
    def generate_destination_rule(self, 
                                 service_name: str,
                                 host: str,
                                 traffic_policy: TrafficPolicy = TrafficPolicy.ROUND_ROBIN,
                                 circuit_breaker: Optional[CircuitBreakerConfig] = None,
                                 tls_mode: TLSMode = TLSMode.ISTIO_MUTUAL,
                                 subsets: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]:
        """生成DestinationRule配置"""
        
        destination_rule = {
            "apiVersion": "networking.istio.io/v1beta1",
            "kind": "DestinationRule",
            "metadata": {
                "name": f"{service_name}-dr",
                "namespace": self.namespace
            },
            "spec": {
                "host": host,
                "trafficPolicy": {
                    "loadBalancer": {
                        "simple": traffic_policy.value
                    },
                    "tls": {
                        "mode": tls_mode.value
                    }
                }
            }
        }
        
        # 添加熔断器配置
        if circuit_breaker:
            destination_rule["spec"]["trafficPolicy"]["outlierDetection"] = {
                "consecutiveErrors": circuit_breaker.consecutive_errors,
                "interval": circuit_breaker.interval,
                "baseEjectionTime": circuit_breaker.base_ejection_time,
                "maxEjectionPercent": circuit_breaker.max_ejection_percent
            }
        
        # 添加子集配置(用于金丝雀部署)
        if subsets:
            destination_rule["spec"]["subsets"] = subsets
        
        return destination_rule
    
    def generate_gateway(self, 
                        name: str,
                        hosts: List[str],
                        port: int = 80,
                        https_port: int = 443,
                        tls_enabled: bool = True) -> Dict[str, Any]:
        """生成Gateway配置"""
        
        servers = [{
            "port": {
                "number": port,
                "name": "http",
                "protocol": "HTTP"
            },
            "hosts": hosts
        }]
        
        if tls_enabled:
            servers.append({
                "port": {
                    "number": https_port,
                    "name": "https",
                    "protocol": "HTTPS"
                },
                "tls": {
                    "mode": "SIMPLE",
                    "credentialName": f"{name}-tls"
                },
                "hosts": hosts
            })
        
        return {
            "apiVersion": "networking.istio.io/v1beta1",
            "kind": "Gateway",
            "metadata": {
                "name": name,
                "namespace": self.namespace
            },
            "spec": {
                "selector": {
                    "istio": "ingressgateway"
                },
                "servers": servers
            }
        }
    
    def generate_service_entry(self, 
                              name: str,
                              hosts: List[str],
                              ports: List[Dict[str, Any]],
                              location: str = "MESH_EXTERNAL") -> Dict[str, Any]:
        """生成ServiceEntry配置(用于外部服务)"""
        
        return {
            "apiVersion": "networking.istio.io/v1beta1",
            "kind": "ServiceEntry",
            "metadata": {
                "name": name,
                "namespace": self.namespace
            },
            "spec": {
                "hosts": hosts,
                "ports": ports,
                "location": location,
                "resolution": "DNS"
            }
        }
    
    def generate_authorization_policy(self, 
                                    name: str,
                                    selector: Dict[str, str],
                                    rules: List[Dict[str, Any]]) -> Dict[str, Any]:
        """生成AuthorizationPolicy配置"""
        
        return {
            "apiVersion": "security.istio.io/v1beta1",
            "kind": "AuthorizationPolicy",
            "metadata": {
                "name": name,
                "namespace": self.namespace
            },
            "spec": {
                "selector": {
                    "matchLabels": selector
                },
                "rules": rules
            }
        }
    
    def generate_peer_authentication(self, 
                                   name: str,
                                   selector: Optional[Dict[str, str]] = None,
                                   mtls_mode: str = "STRICT") -> Dict[str, Any]:
        """生成PeerAuthentication配置"""
        
        policy = {
            "apiVersion": "security.istio.io/v1beta1",
            "kind": "PeerAuthentication",
            "metadata": {
                "name": name,
                "namespace": self.namespace
            },
            "spec": {
                "mtls": {
                    "mode": mtls_mode
                }
            }
        }
        
        if selector:
            policy["spec"]["selector"] = {
                "matchLabels": selector
            }
        
        return policy
    
    def generate_telemetry_config(self, 
                                 name: str,
                                 metrics_config: Optional[Dict[str, Any]] = None,
                                 tracing_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """生成Telemetry配置"""
        
        telemetry = {
            "apiVersion": "telemetry.istio.io/v1alpha1",
            "kind": "Telemetry",
            "metadata": {
                "name": name,
                "namespace": self.namespace
            },
            "spec": {}
        }
        
        if metrics_config:
            telemetry["spec"]["metrics"] = [metrics_config]
        
        if tracing_config:
            telemetry["spec"]["tracing"] = [tracing_config]
        
        return telemetry
    
    def generate_canary_deployment_config(self, 
                                        service_name: str,
                                        stable_version: str,
                                        canary_version: str,
                                        canary_weight: int = 10) -> List[Dict[str, Any]]:
        """生成金丝雀部署配置"""
        
        configs = []
        
        # DestinationRule with subsets
        destination_rule = self.generate_destination_rule(
            service_name=service_name,
            host=service_name,
            subsets=[
                {
                    "name": "stable",
                    "labels": {"version": stable_version}
                },
                {
                    "name": "canary",
                    "labels": {"version": canary_version}
                }
            ]
        )
        configs.append(destination_rule)
        
        # VirtualService for traffic splitting
        virtual_service = self.generate_virtual_service(
            service_name=service_name,
            hosts=[service_name],
            routes=[{
                "match": [{"uri": {"prefix": "/"}}],
                "destinations": [
                    {
                        "destination": {
                            "host": service_name,
                            "subset": "stable"
                        },
                        "weight": 100 - canary_weight
                    },
                    {
                        "destination": {
                            "host": service_name,
                            "subset": "canary"
                        },
                        "weight": canary_weight
                    }
                ]
            }]
        )
        configs.append(virtual_service)
        
        return configs
    
    def generate_complete_service_mesh_config(self, services: List[Dict[str, Any]]) -> str:
        """生成完整的服务网格配置"""
        
        manifests = []
        
        # 生成Gateway
        gateway = self.generate_gateway(
            name="microservices-gateway",
            hosts=["api.example.com", "*.example.com"]
        )
        manifests.append(gateway)
        
        # 为每个服务生成配置
        for service in services:
            service_name = service["name"]
            
            # VirtualService
            virtual_service = self.generate_virtual_service(
                service_name=service_name,
                hosts=[f"api.example.com"],
                routes=[{
                    "match": [{"uri": {"prefix": service.get("path", f"/{service_name}")}}],
                    "destinations": [{
                        "destination": {
                            "host": service_name,
                            "port": {"number": service.get("port", 80)}
                        }
                    }]
                }],
                timeout=service.get("timeout", "30s"),
                retry=RetryConfig(
                    attempts=service.get("retry_attempts", 3),
                    per_try_timeout=service.get("per_try_timeout", "2s")
                )
            )
            manifests.append(virtual_service)
            
            # DestinationRule
            destination_rule = self.generate_destination_rule(
                service_name=service_name,
                host=service_name,
                traffic_policy=TrafficPolicy.ROUND_ROBIN,
                circuit_breaker=CircuitBreakerConfig(
                    consecutive_errors=service.get("circuit_breaker_errors", 5),
                    interval="30s",
                    base_ejection_time="30s",
                    max_ejection_percent=50
                )
            )
            manifests.append(destination_rule)
            
            # Authorization Policy
            if service.get("auth_required", True):
                auth_policy = self.generate_authorization_policy(
                    name=f"{service_name}-auth",
                    selector={"app": service_name},
                    rules=[{
                        "from": [{
                            "source": {
                                "principals": ["cluster.local/ns/istio-system/sa/istio-ingressgateway-service-account"]
                            }
                        }],
                        "to": [{
                            "operation": {
                                "methods": ["GET", "POST", "PUT", "DELETE"]
                            }
                        }]
                    }]
                )
                manifests.append(auth_policy)
        
        # 全局mTLS策略
        peer_auth = self.generate_peer_authentication(
            name="default",
            mtls_mode="STRICT"
        )
        manifests.append(peer_auth)
        
        # 遥测配置
        telemetry = self.generate_telemetry_config(
            name="default-metrics",
            metrics_config={
                "providers": [{
                    "name": "prometheus"
                }]
            },
            tracing_config={
                "providers": [{
                    "name": "jaeger"
                }]
            }
        )
        manifests.append(telemetry)
        
        # 转换为YAML
        yaml_content = ""
        for manifest in manifests:
            yaml_content += "---\n"
            yaml_content += yaml.dump(manifest, default_flow_style=False, allow_unicode=True)
            yaml_content += "\n"
        
        return yaml_content

# 使用示例
def istio_service_mesh_example():
    mesh_manager = IstioServiceMeshManager()
    
    # 定义服务配置
    services = [
        {
            "name": "user-service",
            "path": "/users",
            "port": 80,
            "timeout": "10s",
            "retry_attempts": 3,
            "per_try_timeout": "3s",
            "circuit_breaker_errors": 5,
            "auth_required": True
        },
        {
            "name": "order-service",
            "path": "/orders",
            "port": 80,
            "timeout": "15s",
            "retry_attempts": 2,
            "per_try_timeout": "5s",
            "circuit_breaker_errors": 3,
            "auth_required": True
        },
        {
            "name": "payment-service",
            "path": "/payments",
            "port": 80,
            "timeout": "20s",
            "retry_attempts": 5,
            "per_try_timeout": "4s",
            "circuit_breaker_errors": 2,
            "auth_required": True
        }
    ]
    
    # 生成完整配置
    mesh_config = mesh_manager.generate_complete_service_mesh_config(services)
    
    print("=== Istio服务网格配置 ===")
    print(mesh_config)
    
    # 生成金丝雀部署示例
    print("\n=== 金丝雀部署配置示例 ===")
    canary_configs = mesh_manager.generate_canary_deployment_config(
        service_name="user-service",
        stable_version="v1.0.0",
        canary_version="v1.1.0",
        canary_weight=20
    )
    
    for config in canary_configs:
        print("---")
        print(yaml.dump(config, default_flow_style=False, allow_unicode=True))
    
    # 保存配置
    with open("istio-service-mesh.yaml", "w", encoding="utf-8") as f:
        f.write(mesh_config)
    
    print("服务网格配置已保存到 istio-service-mesh.yaml")
    
    return mesh_config

if __name__ == "__main__":
    istio_service_mesh_example()

API网关设计

统一API网关实现

from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
import time
import json
import hashlib
import jwt
from abc import ABC, abstractmethod

class RateLimitStrategy(Enum):
    FIXED_WINDOW = "fixed_window"
    SLIDING_WINDOW = "sliding_window"
    TOKEN_BUCKET = "token_bucket"

class AuthenticationMethod(Enum):
    JWT = "jwt"
    API_KEY = "api_key"
    OAUTH2 = "oauth2"
    BASIC = "basic"

@dataclass
class RouteConfig:
    path: str
    method: str
    upstream_service: str
    upstream_path: str
    timeout: int = 30
    retries: int = 3
    auth_required: bool = True
    rate_limit: Optional[Dict[str, Any]] = None
    cache_ttl: Optional[int] = None
    transform_request: Optional[Callable] = None
    transform_response: Optional[Callable] = None

@dataclass
class RateLimitConfig:
    strategy: RateLimitStrategy
    requests_per_window: int
    window_size_seconds: int
    burst_capacity: Optional[int] = None

@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5
    recovery_timeout: int = 60
    success_threshold: int = 3

class APIGateway:
    def __init__(self):
        self.routes: Dict[str, RouteConfig] = {}
        self.rate_limiters: Dict[str, 'RateLimiter'] = {}
        self.circuit_breakers: Dict[str, 'CircuitBreaker'] = {}
        self.cache: Dict[str, Any] = {}
        self.middleware_stack: List[Callable] = []
        self.auth_providers: Dict[AuthenticationMethod, 'AuthProvider'] = {}
        
    def add_route(self, route: RouteConfig):
        """添加路由配置"""
        route_key = f"{route.method}:{route.path}"
        self.routes[route_key] = route
        
        # 初始化速率限制器
        if route.rate_limit:
            rate_limit_config = RateLimitConfig(**route.rate_limit)
            self.rate_limiters[route_key] = RateLimiter(rate_limit_config)
        
        # 初始化熔断器
        self.circuit_breakers[route.upstream_service] = CircuitBreaker(
            CircuitBreakerConfig()
        )
    
    def add_middleware(self, middleware: Callable):
        """添加中间件"""
        self.middleware_stack.append(middleware)
    
    def add_auth_provider(self, method: AuthenticationMethod, provider: 'AuthProvider'):
        """添加认证提供者"""
        self.auth_providers[method] = provider
    
    def process_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """处理请求"""
        try:
            # 路由匹配
            route = self._match_route(request)
            if not route:
                return self._error_response(404, "Route not found")
            
            # 执行中间件
            for middleware in self.middleware_stack:
                result = middleware(request)
                if result.get("error"):
                    return result
            
            # 认证检查
            if route.auth_required:
                auth_result = self._authenticate(request)
                if not auth_result.get("success"):
                    return self._error_response(401, "Authentication failed")
                request["user"] = auth_result.get("user")
            
            # 速率限制检查
            if not self._check_rate_limit(route, request):
                return self._error_response(429, "Rate limit exceeded")
            
            # 缓存检查
            cache_key = self._generate_cache_key(route, request)
            if route.cache_ttl and cache_key in self.cache:
                cached_response = self.cache[cache_key]
                if time.time() - cached_response["timestamp"] < route.cache_ttl:
                    return cached_response["data"]
            
            # 熔断器检查
            circuit_breaker = self.circuit_breakers.get(route.upstream_service)
            if circuit_breaker and not circuit_breaker.can_execute():
                return self._error_response(503, "Service temporarily unavailable")
            
            # 请求转换
            if route.transform_request:
                request = route.transform_request(request)
            
            # 转发请求到上游服务
            response = self._forward_request(route, request)
            
            # 记录熔断器成功
            if circuit_breaker:
                circuit_breaker.record_success()
            
            # 响应转换
            if route.transform_response:
                response = route.transform_response(response)
            
            # 缓存响应
            if route.cache_ttl and response.get("status") == 200:
                self.cache[cache_key] = {
                    "data": response,
                    "timestamp": time.time()
                }
            
            return response
            
        except Exception as e:
            # 记录熔断器失败
            if 'route' in locals() and route:
                circuit_breaker = self.circuit_breakers.get(route.upstream_service)
                if circuit_breaker:
                    circuit_breaker.record_failure()
            
            return self._error_response(500, f"Internal server error: {str(e)}")
    
    def _match_route(self, request: Dict[str, Any]) -> Optional[RouteConfig]:
        """匹配路由"""
        method = request.get("method", "GET")
        path = request.get("path", "/")
        
        # 精确匹配
        route_key = f"{method}:{path}"
        if route_key in self.routes:
            return self.routes[route_key]
        
        # 模式匹配(简化实现)
        for key, route in self.routes.items():
            route_method, route_path = key.split(":", 1)
            if route_method == method and self._path_matches(path, route_path):
                return route
        
        return None
    
    def _path_matches(self, request_path: str, route_path: str) -> bool:
        """路径匹配(支持通配符)"""
        if "*" in route_path:
            prefix = route_path.replace("*", "")
            return request_path.startswith(prefix)
        return request_path == route_path
    
    def _authenticate(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """认证处理"""
        auth_header = request.get("headers", {}).get("Authorization", "")
        
        if auth_header.startswith("Bearer "):
            token = auth_header[7:]
            jwt_provider = self.auth_providers.get(AuthenticationMethod.JWT)
            if jwt_provider:
                return jwt_provider.validate(token)
        
        elif auth_header.startswith("Basic "):
            basic_provider = self.auth_providers.get(AuthenticationMethod.BASIC)
            if basic_provider:
                return basic_provider.validate(auth_header[6:])
        
        api_key = request.get("headers", {}).get("X-API-Key")
        if api_key:
            api_key_provider = self.auth_providers.get(AuthenticationMethod.API_KEY)
            if api_key_provider:
                return api_key_provider.validate(api_key)
        
        return {"success": False, "error": "No valid authentication method found"}
    
    def _check_rate_limit(self, route: RouteConfig, request: Dict[str, Any]) -> bool:
        """检查速率限制"""
        route_key = f"{route.method}:{route.path}"
        rate_limiter = self.rate_limiters.get(route_key)
        
        if not rate_limiter:
            return True
        
        client_id = self._get_client_id(request)
        return rate_limiter.allow_request(client_id)
    
    def _get_client_id(self, request: Dict[str, Any]) -> str:
        """获取客户端标识"""
        # 优先使用用户ID
        if "user" in request and "id" in request["user"]:
            return f"user:{request['user']['id']}"
        
        # 使用IP地址
        return f"ip:{request.get('client_ip', 'unknown')}"
    
    def _generate_cache_key(self, route: RouteConfig, request: Dict[str, Any]) -> str:
        """生成缓存键"""
        key_parts = [
            route.method,
            route.path,
            json.dumps(request.get("query_params", {}), sort_keys=True),
            json.dumps(request.get("body", {}), sort_keys=True)
        ]
        key_string = "|".join(key_parts)
        return hashlib.md5(key_string.encode()).hexdigest()
    
    def _forward_request(self, route: RouteConfig, request: Dict[str, Any]) -> Dict[str, Any]:
        """转发请求到上游服务(模拟实现)"""
        # 这里应该实现实际的HTTP请求转发
        # 为了演示,返回模拟响应
        return {
            "status": 200,
            "headers": {"Content-Type": "application/json"},
            "body": {
                "message": f"Response from {route.upstream_service}",
                "path": route.upstream_path,
                "timestamp": time.time()
            }
        }
    
    def _error_response(self, status: int, message: str) -> Dict[str, Any]:
        """生成错误响应"""
        return {
            "status": status,
            "headers": {"Content-Type": "application/json"},
            "body": {
                "error": message,
                "timestamp": time.time()
            }
        }

class RateLimiter:
    def __init__(self, config: RateLimitConfig):
        self.config = config
        self.windows: Dict[str, Dict[str, Any]] = {}
    
    def allow_request(self, client_id: str) -> bool:
        """检查是否允许请求"""
        current_time = time.time()
        
        if self.config.strategy == RateLimitStrategy.FIXED_WINDOW:
            return self._fixed_window_check(client_id, current_time)
        elif self.config.strategy == RateLimitStrategy.SLIDING_WINDOW:
            return self._sliding_window_check(client_id, current_time)
        elif self.config.strategy == RateLimitStrategy.TOKEN_BUCKET:
            return self._token_bucket_check(client_id, current_time)
        
        return True
    
    def _fixed_window_check(self, client_id: str, current_time: float) -> bool:
        """固定窗口算法"""
        window_start = int(current_time // self.config.window_size_seconds) * self.config.window_size_seconds
        
        if client_id not in self.windows:
            self.windows[client_id] = {"window_start": window_start, "count": 0}
        
        client_window = self.windows[client_id]
        
        # 重置窗口
        if client_window["window_start"] < window_start:
            client_window["window_start"] = window_start
            client_window["count"] = 0
        
        # 检查限制
        if client_window["count"] >= self.config.requests_per_window:
            return False
        
        client_window["count"] += 1
        return True
    
    def _sliding_window_check(self, client_id: str, current_time: float) -> bool:
        """滑动窗口算法"""
        if client_id not in self.windows:
            self.windows[client_id] = {"requests": []}
        
        client_window = self.windows[client_id]
        
        # 清理过期请求
        window_start = current_time - self.config.window_size_seconds
        client_window["requests"] = [
            req_time for req_time in client_window["requests"] 
            if req_time > window_start
        ]
        
        # 检查限制
        if len(client_window["requests"]) >= self.config.requests_per_window:
            return False
        
        client_window["requests"].append(current_time)
        return True
    
    def _token_bucket_check(self, client_id: str, current_time: float) -> bool:
        """令牌桶算法"""
        if client_id not in self.windows:
            self.windows[client_id] = {
                "tokens": self.config.burst_capacity or self.config.requests_per_window,
                "last_refill": current_time
            }
        
        bucket = self.windows[client_id]
        
        # 计算需要添加的令牌
        time_passed = current_time - bucket["last_refill"]
        tokens_to_add = int(time_passed * (self.config.requests_per_window / self.config.window_size_seconds))
        
        if tokens_to_add > 0:
            max_tokens = self.config.burst_capacity or self.config.requests_per_window
            bucket["tokens"] = min(max_tokens, bucket["tokens"] + tokens_to_add)
            bucket["last_refill"] = current_time
        
        # 检查是否有可用令牌
        if bucket["tokens"] < 1:
            return False
        
        bucket["tokens"] -= 1
        return True

class CircuitBreaker:
    def __init__(self, config: CircuitBreakerConfig):
        self.config = config
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = 0
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
    
    def can_execute(self) -> bool:
        """检查是否可以执行请求"""
        current_time = time.time()
        
        if self.state == "CLOSED":
            return True
        elif self.state == "OPEN":
            if current_time - self.last_failure_time > self.config.recovery_timeout:
                self.state = "HALF_OPEN"
                self.success_count = 0
                return True
            return False
        elif self.state == "HALF_OPEN":
            return True
        
        return False
    
    def record_success(self):
        """记录成功"""
        if self.state == "HALF_OPEN":
            self.success_count += 1
            if self.success_count >= self.config.success_threshold:
                self.state = "CLOSED"
                self.failure_count = 0
        elif self.state == "CLOSED":
            self.failure_count = max(0, self.failure_count - 1)
    
    def record_failure(self):
        """记录失败"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.config.failure_threshold:
            self.state = "OPEN"

class AuthProvider(ABC):
    @abstractmethod
    def validate(self, credentials: str) -> Dict[str, Any]:
        pass

class JWTAuthProvider(AuthProvider):
    def __init__(self, secret_key: str, algorithm: str = "HS256"):
        self.secret_key = secret_key
        self.algorithm = algorithm
    
    def validate(self, token: str) -> Dict[str, Any]:
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
            return {
                "success": True,
                "user": {
                    "id": payload.get("user_id"),
                    "username": payload.get("username"),
                    "roles": payload.get("roles", [])
                }
            }
        except jwt.ExpiredSignatureError:
            return {"success": False, "error": "Token expired"}
        except jwt.InvalidTokenError:
            return {"success": False, "error": "Invalid token"}

class APIKeyAuthProvider(AuthProvider):
    def __init__(self, valid_keys: Dict[str, Dict[str, Any]]):
        self.valid_keys = valid_keys
    
    def validate(self, api_key: str) -> Dict[str, Any]:
        if api_key in self.valid_keys:
            return {
                "success": True,
                "user": self.valid_keys[api_key]
            }
        return {"success": False, "error": "Invalid API key"}

# 中间件示例
def logging_middleware(request: Dict[str, Any]) -> Dict[str, Any]:
    """日志中间件"""
    print(f"Request: {request.get('method')} {request.get('path')} from {request.get('client_ip')}")
    return {"success": True}

def cors_middleware(request: Dict[str, Any]) -> Dict[str, Any]:
    """CORS中间件"""
    # 添加CORS头
    if "headers" not in request:
        request["headers"] = {}
    
    request["headers"]["Access-Control-Allow-Origin"] = "*"
    request["headers"]["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS"
    request["headers"]["Access-Control-Allow-Headers"] = "Content-Type, Authorization"
    
    return {"success": True}

# 使用示例
def api_gateway_example():
    # 创建API网关
    gateway = APIGateway()
    
    # 添加认证提供者
    jwt_provider = JWTAuthProvider("your-secret-key")
    api_key_provider = APIKeyAuthProvider({
        "key123": {"id": "user1", "username": "admin", "roles": ["admin"]},
        "key456": {"id": "user2", "username": "user", "roles": ["user"]}
    })
    
    gateway.add_auth_provider(AuthenticationMethod.JWT, jwt_provider)
    gateway.add_auth_provider(AuthenticationMethod.API_KEY, api_key_provider)
    
    # 添加中间件
    gateway.add_middleware(logging_middleware)
    gateway.add_middleware(cors_middleware)
    
    # 添加路由
    routes = [
        RouteConfig(
            path="/api/users/*",
            method="GET",
            upstream_service="user-service",
            upstream_path="/users",
            timeout=10,
            retries=3,
            auth_required=True,
            rate_limit={
                "strategy": RateLimitStrategy.SLIDING_WINDOW,
                "requests_per_window": 100,
                "window_size_seconds": 60
            },
            cache_ttl=300
        ),
        RouteConfig(
            path="/api/orders/*",
            method="POST",
            upstream_service="order-service",
            upstream_path="/orders",
            timeout=30,
            retries=2,
            auth_required=True,
            rate_limit={
                "strategy": RateLimitStrategy.TOKEN_BUCKET,
                "requests_per_window": 10,
                "window_size_seconds": 60,
                "burst_capacity": 20
            }
        ),
        RouteConfig(
            path="/api/health",
            method="GET",
            upstream_service="health-service",
            upstream_path="/health",
            timeout=5,
            retries=1,
            auth_required=False,
            cache_ttl=60
        )
    ]
    
    for route in routes:
        gateway.add_route(route)
    
    # 模拟请求处理
    test_requests = [
        {
            "method": "GET",
            "path": "/api/users/123",
            "headers": {"Authorization": "Bearer valid-jwt-token", "X-API-Key": "key123"},
            "client_ip": "192.168.1.100"
        },
        {
            "method": "POST",
            "path": "/api/orders/create",
            "headers": {"Authorization": "Bearer valid-jwt-token"},
            "body": {"product_id": "123", "quantity": 2},
            "client_ip": "192.168.1.101"
        },
        {
            "method": "GET",
            "path": "/api/health",
            "client_ip": "192.168.1.102"
        }
    ]
    
    print("=== API网关请求处理示例 ===")
    for i, request in enumerate(test_requests, 1):
        print(f"\n请求 {i}:")
        response = gateway.process_request(request)
        print(f"响应状态: {response.get('status')}")
        print(f"响应体: {json.dumps(response.get('body', {}), indent=2, ensure_ascii=False)}")
    
    return gateway

if __name__ == "__main__":
    api_gateway_example()

5. 配置管理与服务发现

5.1 配置中心设计

import consul
import etcd3
from typing import Dict, Any, Optional, List, Callable
from dataclasses import dataclass
from enum import Enum
import json
import yaml
import threading
import time

class ConfigFormat(Enum):
    JSON = "json"
    YAML = "yaml"
    PROPERTIES = "properties"
    ENV = "env"

@dataclass
class ConfigItem:
    key: str
    value: Any
    version: int
    format: ConfigFormat
    encrypted: bool = False
    tags: List[str] = None

class ConfigChangeEvent:
    def __init__(self, key: str, old_value: Any, new_value: Any, action: str):
        self.key = key
        self.old_value = old_value
        self.new_value = new_value
        self.action = action  # CREATE, UPDATE, DELETE
        self.timestamp = time.time()

class ConfigCenter:
    def __init__(self, backend: str = "consul", **kwargs):
        self.backend = backend
        self.watchers: Dict[str, List[Callable]] = {}
        self.cache: Dict[str, ConfigItem] = {}
        self.encryption_key = kwargs.get("encryption_key")
        
        if backend == "consul":
            self.client = consul.Consul(
                host=kwargs.get("host", "localhost"),
                port=kwargs.get("port", 8500)
            )
        elif backend == "etcd":
            self.client = etcd3.client(
                host=kwargs.get("host", "localhost"),
                port=kwargs.get("port", 2379)
            )
        else:
            raise ValueError(f"Unsupported backend: {backend}")
    
    def put(self, key: str, value: Any, format: ConfigFormat = ConfigFormat.JSON, 
            encrypted: bool = False, tags: List[str] = None) -> bool:
        """存储配置项"""
        try:
            # 序列化值
            serialized_value = self._serialize_value(value, format)
            
            # 加密(如果需要)
            if encrypted and self.encryption_key:
                serialized_value = self._encrypt(serialized_value)
            
            # 存储到后端
            if self.backend == "consul":
                success, _ = self.client.kv.put(key, serialized_value)
            elif self.backend == "etcd":
                self.client.put(key, serialized_value)
                success = True
            
            if success:
                # 更新缓存
                old_item = self.cache.get(key)
                new_version = (old_item.version + 1) if old_item else 1
                
                config_item = ConfigItem(
                    key=key,
                    value=value,
                    version=new_version,
                    format=format,
                    encrypted=encrypted,
                    tags=tags or []
                )
                self.cache[key] = config_item
                
                # 触发变更事件
                action = "UPDATE" if old_item else "CREATE"
                old_value = old_item.value if old_item else None
                self._notify_watchers(key, old_value, value, action)
            
            return success
            
        except Exception as e:
            print(f"Failed to put config {key}: {e}")
            return False
    
    def get(self, key: str, default: Any = None) -> Any:
        """获取配置项"""
        try:
            # 先检查缓存
            if key in self.cache:
                return self.cache[key].value
            
            # 从后端获取
            if self.backend == "consul":
                _, data = self.client.kv.get(key)
                if data is None:
                    return default
                raw_value = data['Value'].decode('utf-8')
            elif self.backend == "etcd":
                result = self.client.get(key)
                if result[0] is None:
                    return default
                raw_value = result[0].decode('utf-8')
            
            # 解密(如果需要)
            if self._is_encrypted(raw_value):
                raw_value = self._decrypt(raw_value)
            
            # 反序列化
            value = self._deserialize_value(raw_value, ConfigFormat.JSON)
            
            # 更新缓存
            config_item = ConfigItem(
                key=key,
                value=value,
                version=1,
                format=ConfigFormat.JSON
            )
            self.cache[key] = config_item
            
            return value
            
        except Exception as e:
            print(f"Failed to get config {key}: {e}")
            return default
    
    def delete(self, key: str) -> bool:
        """删除配置项"""
        try:
            old_item = self.cache.get(key)
            
            if self.backend == "consul":
                success = self.client.kv.delete(key)
            elif self.backend == "etcd":
                self.client.delete(key)
                success = True
            
            if success:
                # 从缓存中删除
                if key in self.cache:
                    del self.cache[key]
                
                # 触发变更事件
                if old_item:
                    self._notify_watchers(key, old_item.value, None, "DELETE")
            
            return success
            
        except Exception as e:
            print(f"Failed to delete config {key}: {e}")
            return False
    
    def watch(self, key_prefix: str, callback: Callable[[ConfigChangeEvent], None]):
        """监听配置变更"""
        if key_prefix not in self.watchers:
            self.watchers[key_prefix] = []
        
        self.watchers[key_prefix].append(callback)
        
        # 启动监听线程
        if self.backend == "consul":
            threading.Thread(
                target=self._consul_watch,
                args=(key_prefix,),
                daemon=True
            ).start()
        elif self.backend == "etcd":
            threading.Thread(
                target=self._etcd_watch,
                args=(key_prefix,),
                daemon=True
            ).start()
    
    def _consul_watch(self, key_prefix: str):
        """Consul监听实现"""
        index = None
        while True:
            try:
                index, data = self.client.kv.get(key_prefix, index=index, wait='30s', recurse=True)
                # 处理变更事件
                # 这里简化实现,实际需要比较前后状态
                time.sleep(1)
            except Exception as e:
                print(f"Consul watch error: {e}")
                time.sleep(5)
    
    def _etcd_watch(self, key_prefix: str):
        """etcd监听实现"""
        try:
            events_iterator, cancel = self.client.watch_prefix(key_prefix)
            for event in events_iterator:
                # 处理etcd事件
                pass
        except Exception as e:
            print(f"etcd watch error: {e}")
    
    def _serialize_value(self, value: Any, format: ConfigFormat) -> str:
        """序列化值"""
        if format == ConfigFormat.JSON:
            return json.dumps(value, ensure_ascii=False)
        elif format == ConfigFormat.YAML:
            return yaml.dump(value, allow_unicode=True)
        elif format == ConfigFormat.PROPERTIES:
            # 简化实现
            if isinstance(value, dict):
                return '\n'.join([f"{k}={v}" for k, v in value.items()])
            return str(value)
        else:
            return str(value)
    
    def _deserialize_value(self, raw_value: str, format: ConfigFormat) -> Any:
        """反序列化值"""
        if format == ConfigFormat.JSON:
            return json.loads(raw_value)
        elif format == ConfigFormat.YAML:
            return yaml.safe_load(raw_value)
        elif format == ConfigFormat.PROPERTIES:
            # 简化实现
            result = {}
            for line in raw_value.split('\n'):
                if '=' in line:
                    k, v = line.split('=', 1)
                    result[k.strip()] = v.strip()
            return result
        else:
            return raw_value
    
    def _encrypt(self, value: str) -> str:
        """加密值(简化实现)"""
        # 实际应该使用AES等加密算法
        return f"encrypted:{value}"
    
    def _decrypt(self, encrypted_value: str) -> str:
        """解密值(简化实现)"""
        if encrypted_value.startswith("encrypted:"):
            return encrypted_value[10:]
        return encrypted_value
    
    def _is_encrypted(self, value: str) -> bool:
        """检查是否加密"""
        return value.startswith("encrypted:")
    
    def _notify_watchers(self, key: str, old_value: Any, new_value: Any, action: str):
        """通知监听器"""
        event = ConfigChangeEvent(key, old_value, new_value, action)
        
        for prefix, callbacks in self.watchers.items():
            if key.startswith(prefix):
                for callback in callbacks:
                    try:
                        callback(event)
                    except Exception as e:
                        print(f"Watcher callback error: {e}")

### 5.2 服务发现机制

class ServiceInstance:
    def __init__(self, service_id: str, service_name: str, host: str, port: int,
                 metadata: Dict[str, str] = None, health_check_url: str = None):
        self.service_id = service_id
        self.service_name = service_name
        self.host = host
        self.port = port
        self.metadata = metadata or {}
        self.health_check_url = health_check_url
        self.last_heartbeat = time.time()
        self.status = "UP"
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "service_id": self.service_id,
            "service_name": self.service_name,
            "host": self.host,
            "port": self.port,
            "metadata": self.metadata,
            "health_check_url": self.health_check_url,
            "last_heartbeat": self.last_heartbeat,
            "status": self.status
        }

class ServiceRegistry:
    def __init__(self, backend: str = "consul", **kwargs):
        self.backend = backend
        self.services: Dict[str, ServiceInstance] = {}
        self.health_check_interval = kwargs.get("health_check_interval", 30)
        
        if backend == "consul":
            self.client = consul.Consul(
                host=kwargs.get("host", "localhost"),
                port=kwargs.get("port", 8500)
            )
        elif backend == "etcd":
            self.client = etcd3.client(
                host=kwargs.get("host", "localhost"),
                port=kwargs.get("port", 2379)
            )
        
        # 启动健康检查
        threading.Thread(target=self._health_check_loop, daemon=True).start()
    
    def register(self, instance: ServiceInstance) -> bool:
        """注册服务实例"""
        try:
            service_key = f"services/{instance.service_name}/{instance.service_id}"
            service_data = json.dumps(instance.to_dict())
            
            if self.backend == "consul":
                # 注册到Consul
                success = self.client.agent.service.register(
                    name=instance.service_name,
                    service_id=instance.service_id,
                    address=instance.host,
                    port=instance.port,
                    tags=list(instance.metadata.keys()),
                    check=consul.Check.http(
                        instance.health_check_url,
                        interval="30s"
                    ) if instance.health_check_url else None
                )
            elif self.backend == "etcd":
                # 注册到etcd
                self.client.put(service_key, service_data, lease=self._create_lease())
                success = True
            
            if success:
                self.services[instance.service_id] = instance
                print(f"Service {instance.service_name}:{instance.service_id} registered")
            
            return success
            
        except Exception as e:
            print(f"Failed to register service: {e}")
            return False
    
    def deregister(self, service_id: str) -> bool:
        """注销服务实例"""
        try:
            if self.backend == "consul":
                self.client.agent.service.deregister(service_id)
            elif self.backend == "etcd":
                # 从etcd删除
                for service_name in self._get_all_service_names():
                    service_key = f"services/{service_name}/{service_id}"
                    self.client.delete(service_key)
            
            if service_id in self.services:
                service_name = self.services[service_id].service_name
                del self.services[service_id]
                print(f"Service {service_name}:{service_id} deregistered")
            
            return True
            
        except Exception as e:
            print(f"Failed to deregister service: {e}")
            return False
    
    def discover(self, service_name: str) -> List[ServiceInstance]:
        """发现服务实例"""
        try:
            instances = []
            
            if self.backend == "consul":
                _, services = self.client.health.service(service_name, passing=True)
                for service in services:
                    service_info = service['Service']
                    instance = ServiceInstance(
                        service_id=service_info['ID'],
                        service_name=service_info['Service'],
                        host=service_info['Address'],
                        port=service_info['Port'],
                        metadata={tag: "" for tag in service_info.get('Tags', [])}
                    )
                    instances.append(instance)
            
            elif self.backend == "etcd":
                service_prefix = f"services/{service_name}/"
                for value, _ in self.client.get_prefix(service_prefix):
                    service_data = json.loads(value.decode('utf-8'))
                    instance = ServiceInstance(**service_data)
                    if instance.status == "UP":
                        instances.append(instance)
            
            return instances
            
        except Exception as e:
            print(f"Failed to discover service {service_name}: {e}")
            return []
    
    def heartbeat(self, service_id: str) -> bool:
        """发送心跳"""
        if service_id in self.services:
            self.services[service_id].last_heartbeat = time.time()
            self.services[service_id].status = "UP"
            return True
        return False
    
    def _create_lease(self) -> int:
        """创建etcd租约"""
        if self.backend == "etcd":
            lease = self.client.lease(ttl=self.health_check_interval * 2)
            return lease.id
        return 0
    
    def _get_all_service_names(self) -> List[str]:
        """获取所有服务名称"""
        service_names = set()
        for instance in self.services.values():
            service_names.add(instance.service_name)
        return list(service_names)
    
    def _health_check_loop(self):
        """健康检查循环"""
        while True:
            try:
                current_time = time.time()
                timeout_threshold = current_time - (self.health_check_interval * 2)
                
                # 检查超时的服务
                timeout_services = []
                for service_id, instance in self.services.items():
                    if instance.last_heartbeat < timeout_threshold:
                        timeout_services.append(service_id)
                
                # 标记超时服务为DOWN
                for service_id in timeout_services:
                    self.services[service_id].status = "DOWN"
                    print(f"Service {service_id} marked as DOWN due to timeout")
                
                time.sleep(self.health_check_interval)
                
            except Exception as e:
                print(f"Health check error: {e}")
                time.sleep(5)

# 使用示例
def config_and_discovery_example():
    # 配置中心示例
    print("=== 配置中心示例 ===")
    config_center = ConfigCenter(backend="consul")
    
    # 存储配置
    app_config = {
        "database": {
            "host": "localhost",
            "port": 5432,
            "name": "myapp"
        },
        "redis": {
            "host": "localhost",
            "port": 6379
        },
        "feature_flags": {
            "new_ui": True,
            "beta_features": False
        }
    }
    
    config_center.put("app/config", app_config, ConfigFormat.JSON)
    config_center.put("app/secret", "sensitive-data", encrypted=True)
    
    # 读取配置
    retrieved_config = config_center.get("app/config")
    print(f"Retrieved config: {json.dumps(retrieved_config, indent=2, ensure_ascii=False)}")
    
    # 监听配置变更
    def config_change_handler(event: ConfigChangeEvent):
        print(f"Config changed: {event.key} -> {event.action}")
        print(f"Old value: {event.old_value}")
        print(f"New value: {event.new_value}")
    
    config_center.watch("app/", config_change_handler)
    
    # 服务发现示例
    print("\n=== 服务发现示例 ===")
    service_registry = ServiceRegistry(backend="consul")
    
    # 注册服务实例
    user_service_1 = ServiceInstance(
        service_id="user-service-1",
        service_name="user-service",
        host="192.168.1.10",
        port=8080,
        metadata={"version": "1.0", "zone": "us-east-1"},
        health_check_url="http://192.168.1.10:8080/health"
    )
    
    user_service_2 = ServiceInstance(
        service_id="user-service-2",
        service_name="user-service",
        host="192.168.1.11",
        port=8080,
        metadata={"version": "1.0", "zone": "us-east-1"},
        health_check_url="http://192.168.1.11:8080/health"
    )
    
    service_registry.register(user_service_1)
    service_registry.register(user_service_2)
    
    # 发现服务
    instances = service_registry.discover("user-service")
    print(f"Discovered {len(instances)} instances of user-service:")
    for instance in instances:
        print(f"  - {instance.host}:{instance.port} (ID: {instance.service_id})")
    
    # 模拟心跳
    service_registry.heartbeat("user-service-1")
    service_registry.heartbeat("user-service-2")
    
    return config_center, service_registry

## 6. 监控与日志

### 6.1 分布式链路追踪

```python
import uuid
import time
import json
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, asdict
from contextlib import contextmanager
import threading
from collections import defaultdict

@dataclass
class Span:
    trace_id: str
    span_id: str
    parent_span_id: Optional[str]
    operation_name: str
    start_time: float
    end_time: Optional[float] = None
    duration: Optional[float] = None
    tags: Dict[str, Any] = None
    logs: List[Dict[str, Any]] = None
    status: str = "OK"  # OK, ERROR, TIMEOUT
    
    def __post_init__(self):
        if self.tags is None:
            self.tags = {}
        if self.logs is None:
            self.logs = []
    
    def finish(self):
        """结束Span"""
        self.end_time = time.time()
        self.duration = self.end_time - self.start_time
    
    def set_tag(self, key: str, value: Any):
        """设置标签"""
        self.tags[key] = value
    
    def log(self, message: str, level: str = "INFO", **kwargs):
        """添加日志"""
        log_entry = {
            "timestamp": time.time(),
            "level": level,
            "message": message,
            **kwargs
        }
        self.logs.append(log_entry)
    
    def set_error(self, error: Exception):
        """设置错误状态"""
        self.status = "ERROR"
        self.set_tag("error", True)
        self.set_tag("error.kind", type(error).__name__)
        self.set_tag("error.message", str(error))
        self.log(f"Error occurred: {error}", level="ERROR")

class TraceContext:
    def __init__(self):
        self.trace_id: Optional[str] = None
        self.span_stack: List[Span] = []
    
    def get_current_span(self) -> Optional[Span]:
        """获取当前Span"""
        return self.span_stack[-1] if self.span_stack else None
    
    def push_span(self, span: Span):
        """压入Span"""
        self.span_stack.append(span)
        if not self.trace_id:
            self.trace_id = span.trace_id
    
    def pop_span(self) -> Optional[Span]:
        """弹出Span"""
        return self.span_stack.pop() if self.span_stack else None

class Tracer:
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.local = threading.local()
        self.spans: Dict[str, List[Span]] = defaultdict(list)
        self.samplers = []
        self.reporters = []
    
    def _get_context(self) -> TraceContext:
        """获取当前线程的追踪上下文"""
        if not hasattr(self.local, 'context'):
            self.local.context = TraceContext()
        return self.local.context
    
    def start_span(self, operation_name: str, parent_span: Optional[Span] = None,
                   tags: Dict[str, Any] = None) -> Span:
        """开始一个新的Span"""
        context = self._get_context()
        
        # 确定父Span
        if parent_span is None:
            parent_span = context.get_current_span()
        
        # 生成ID
        if parent_span:
            trace_id = parent_span.trace_id
            parent_span_id = parent_span.span_id
        else:
            trace_id = str(uuid.uuid4())
            parent_span_id = None
        
        span_id = str(uuid.uuid4())
        
        # 创建Span
        span = Span(
            trace_id=trace_id,
            span_id=span_id,
            parent_span_id=parent_span_id,
            operation_name=operation_name,
            start_time=time.time(),
            tags=tags or {}
        )
        
        # 设置服务标签
        span.set_tag("service.name", self.service_name)
        span.set_tag("span.kind", "internal")
        
        # 压入上下文
        context.push_span(span)
        
        return span
    
    def finish_span(self, span: Span):
        """结束Span"""
        span.finish()
        context = self._get_context()
        
        # 从上下文中移除
        if context.get_current_span() == span:
            context.pop_span()
        
        # 存储Span
        self.spans[span.trace_id].append(span)
        
        # 报告Span
        for reporter in self.reporters:
            try:
                reporter.report(span)
            except Exception as e:
                print(f"Reporter error: {e}")
    
    @contextmanager
    def span(self, operation_name: str, **kwargs):
        """Span上下文管理器"""
        span = self.start_span(operation_name, **kwargs)
        try:
            yield span
        except Exception as e:
            span.set_error(e)
            raise
        finally:
            self.finish_span(span)
    
    def inject(self, span: Span, carrier: Dict[str, str]):
        """注入追踪信息到载体(如HTTP头)"""
        carrier["X-Trace-ID"] = span.trace_id
        carrier["X-Span-ID"] = span.span_id
    
    def extract(self, carrier: Dict[str, str]) -> Optional[Span]:
        """从载体中提取追踪信息"""
        trace_id = carrier.get("X-Trace-ID")
        span_id = carrier.get("X-Span-ID")
        
        if trace_id and span_id:
            # 创建一个虚拟的父Span用于上下文传递
            return Span(
                trace_id=trace_id,
                span_id=span_id,
                parent_span_id=None,
                operation_name="extracted",
                start_time=time.time()
            )
        return None
    
    def get_trace(self, trace_id: str) -> List[Span]:
        """获取完整的追踪链路"""
        return self.spans.get(trace_id, [])
    
    def add_reporter(self, reporter: 'SpanReporter'):
        """添加Span报告器"""
        self.reporters.append(reporter)

class SpanReporter:
    def report(self, span: Span):
        """报告Span"""
        raise NotImplementedError

class ConsoleReporter(SpanReporter):
    def report(self, span: Span):
        """控制台报告器"""
        print(f"Span: {span.operation_name} [{span.trace_id}:{span.span_id}] "
              f"Duration: {span.duration:.3f}s Status: {span.status}")

class JaegerReporter(SpanReporter):
    def __init__(self, jaeger_endpoint: str):
        self.jaeger_endpoint = jaeger_endpoint
    
    def report(self, span: Span):
        """Jaeger报告器(模拟实现)"""
        # 实际实现需要发送到Jaeger Collector
        jaeger_span = {
            "traceID": span.trace_id,
            "spanID": span.span_id,
            "parentSpanID": span.parent_span_id,
            "operationName": span.operation_name,
            "startTime": int(span.start_time * 1000000),  # 微秒
            "duration": int((span.duration or 0) * 1000000),
            "tags": [{"key": k, "value": v} for k, v in span.tags.items()],
            "logs": span.logs
        }
        print(f"Sending to Jaeger: {json.dumps(jaeger_span, indent=2)}")

### 6.2 指标收集与监控

class MetricType(Enum):
    COUNTER = "counter"
    GAUGE = "gauge"
    HISTOGRAM = "histogram"
    SUMMARY = "summary"

@dataclass
class Metric:
    name: str
    type: MetricType
    value: float
    labels: Dict[str, str]
    timestamp: float
    help_text: str = ""

class MetricsCollector:
    def __init__(self):
        self.metrics: Dict[str, Metric] = {}
        self.counters: Dict[str, float] = defaultdict(float)
        self.gauges: Dict[str, float] = {}
        self.histograms: Dict[str, List[float]] = defaultdict(list)
    
    def counter(self, name: str, labels: Dict[str, str] = None, help_text: str = ""):
        """创建计数器"""
        return Counter(name, labels or {}, help_text, self)
    
    def gauge(self, name: str, labels: Dict[str, str] = None, help_text: str = ""):
        """创建仪表盘"""
        return Gauge(name, labels or {}, help_text, self)
    
    def histogram(self, name: str, buckets: List[float] = None, 
                  labels: Dict[str, str] = None, help_text: str = ""):
        """创建直方图"""
        return Histogram(name, buckets or [0.1, 0.5, 1.0, 2.5, 5.0, 10.0], 
                        labels or {}, help_text, self)
    
    def record_metric(self, metric: Metric):
        """记录指标"""
        metric_key = f"{metric.name}:{json.dumps(metric.labels, sort_keys=True)}"
        self.metrics[metric_key] = metric
    
    def get_metrics(self) -> List[Metric]:
        """获取所有指标"""
        return list(self.metrics.values())
    
    def export_prometheus(self) -> str:
        """导出Prometheus格式"""
        lines = []
        for metric in self.metrics.values():
            if metric.help_text:
                lines.append(f"# HELP {metric.name} {metric.help_text}")
            lines.append(f"# TYPE {metric.name} {metric.type.value}")
            
            labels_str = ""
            if metric.labels:
                label_pairs = [f'{k}="{v}"' for k, v in metric.labels.items()]
                labels_str = "{" + ",".join(label_pairs) + "}"
            
            lines.append(f"{metric.name}{labels_str} {metric.value}")
        
        return "\n".join(lines)

class Counter:
    def __init__(self, name: str, labels: Dict[str, str], help_text: str, collector: MetricsCollector):
        self.name = name
        self.labels = labels
        self.help_text = help_text
        self.collector = collector
        self.value = 0
    
    def inc(self, amount: float = 1):
        """增加计数"""
        self.value += amount
        metric = Metric(
            name=self.name,
            type=MetricType.COUNTER,
            value=self.value,
            labels=self.labels,
            timestamp=time.time(),
            help_text=self.help_text
        )
        self.collector.record_metric(metric)

class Gauge:
    def __init__(self, name: str, labels: Dict[str, str], help_text: str, collector: MetricsCollector):
        self.name = name
        self.labels = labels
        self.help_text = help_text
        self.collector = collector
        self.value = 0
    
    def set(self, value: float):
        """设置值"""
        self.value = value
        metric = Metric(
            name=self.name,
            type=MetricType.GAUGE,
            value=self.value,
            labels=self.labels,
            timestamp=time.time(),
            help_text=self.help_text
        )
        self.collector.record_metric(metric)
    
    def inc(self, amount: float = 1):
        """增加值"""
        self.set(self.value + amount)
    
    def dec(self, amount: float = 1):
        """减少值"""
        self.set(self.value - amount)

class Histogram:
    def __init__(self, name: str, buckets: List[float], labels: Dict[str, str], 
                 help_text: str, collector: MetricsCollector):
        self.name = name
        self.buckets = sorted(buckets)
        self.labels = labels
        self.help_text = help_text
        self.collector = collector
        self.observations = []
        self.count = 0
        self.sum = 0
    
    def observe(self, value: float):
        """观察值"""
        self.observations.append(value)
        self.count += 1
        self.sum += value
        
        # 记录直方图指标
        for bucket in self.buckets:
            bucket_count = sum(1 for obs in self.observations if obs <= bucket)
            bucket_labels = {**self.labels, "le": str(bucket)}
            metric = Metric(
                name=f"{self.name}_bucket",
                type=MetricType.HISTOGRAM,
                value=bucket_count,
                labels=bucket_labels,
                timestamp=time.time(),
                help_text=self.help_text
            )
            self.collector.record_metric(metric)
        
        # 记录总数和总和
        count_metric = Metric(
            name=f"{self.name}_count",
            type=MetricType.HISTOGRAM,
            value=self.count,
            labels=self.labels,
            timestamp=time.time()
        )
        self.collector.record_metric(count_metric)
        
        sum_metric = Metric(
            name=f"{self.name}_sum",
            type=MetricType.HISTOGRAM,
            value=self.sum,
            labels=self.labels,
            timestamp=time.time()
        )
        self.collector.record_metric(sum_metric)

# 使用示例
def monitoring_example():
    print("=== 分布式链路追踪示例 ===")
    
    # 创建追踪器
    tracer = Tracer("user-service")
    tracer.add_reporter(ConsoleReporter())
    tracer.add_reporter(JaegerReporter("http://jaeger:14268/api/traces"))
    
    # 模拟服务调用链路
    with tracer.span("handle_user_request") as root_span:
        root_span.set_tag("user.id", "12345")
        root_span.set_tag("http.method", "GET")
        root_span.set_tag("http.url", "/api/users/12345")
        
        # 模拟数据库查询
        with tracer.span("database_query", parent_span=root_span) as db_span:
            db_span.set_tag("db.type", "postgresql")
            db_span.set_tag("db.statement", "SELECT * FROM users WHERE id = $1")
            time.sleep(0.1)  # 模拟查询时间
            db_span.log("Query executed successfully")
        
        # 模拟缓存查询
        with tracer.span("cache_lookup", parent_span=root_span) as cache_span:
            cache_span.set_tag("cache.type", "redis")
            cache_span.set_tag("cache.key", "user:12345")
            time.sleep(0.02)  # 模拟缓存查询时间
            cache_span.log("Cache hit")
        
        # 模拟外部API调用
        with tracer.span("external_api_call", parent_span=root_span) as api_span:
            api_span.set_tag("http.method", "POST")
            api_span.set_tag("http.url", "https://api.external.com/validate")
            api_span.set_tag("span.kind", "client")
            time.sleep(0.3)  # 模拟API调用时间
            api_span.log("External API call completed")
        
        root_span.log("Request processed successfully")
    
    print("\n=== 指标收集示例 ===")
    
    # 创建指标收集器
    metrics = MetricsCollector()
    
    # 创建各种指标
    request_counter = metrics.counter(
        "http_requests_total",
        labels={"method": "GET", "endpoint": "/api/users"},
        help_text="Total number of HTTP requests"
    )
    
    response_time_histogram = metrics.histogram(
        "http_request_duration_seconds",
        buckets=[0.1, 0.5, 1.0, 2.5, 5.0],
        labels={"method": "GET", "endpoint": "/api/users"},
        help_text="HTTP request duration in seconds"
    )
    
    active_connections_gauge = metrics.gauge(
        "active_connections",
        help_text="Number of active connections"
    )
    
    # 模拟指标收集
    for i in range(10):
        request_counter.inc()
        response_time = 0.1 + (i * 0.05)  # 模拟响应时间
        response_time_histogram.observe(response_time)
        active_connections_gauge.set(50 + i)
        time.sleep(0.1)
    
    # 导出Prometheus格式
    print("Prometheus格式指标:")
    print(metrics.export_prometheus())
    
    return tracer, metrics

## 7. 部署策略

### 7.1 蓝绿部署

```python
import time
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum

class DeploymentStatus(Enum):
    PENDING = "pending"
    DEPLOYING = "deploying"
    DEPLOYED = "deployed"
    TESTING = "testing"
    ACTIVE = "active"
    FAILED = "failed"
    ROLLED_BACK = "rolled_back"

@dataclass
class Environment:
    name: str
    version: str
    replicas: int
    status: DeploymentStatus
    health_check_url: str
    traffic_percentage: float = 0
    deployment_time: Optional[float] = None

class BlueGreenDeployment:
    def __init__(self, service_name: str, load_balancer: 'LoadBalancer'):
        self.service_name = service_name
        self.load_balancer = load_balancer
        self.blue_env: Optional[Environment] = None
        self.green_env: Optional[Environment] = None
        self.active_env: Optional[str] = None
    
    def deploy(self, new_version: str, replicas: int = 3, 
               health_check_url: str = "/health") -> bool:
        """执行蓝绿部署"""
        print(f"Starting blue-green deployment for {self.service_name} version {new_version}")
        
        # 确定部署环境
        if self.active_env == "blue" or self.active_env is None:
            deploy_env = "green"
            standby_env = self.green_env
        else:
            deploy_env = "blue"
            standby_env = self.blue_env
        
        # 创建新环境
        new_env = Environment(
            name=deploy_env,
            version=new_version,
            replicas=replicas,
            status=DeploymentStatus.PENDING,
            health_check_url=health_check_url,
            deployment_time=time.time()
        )
        
        try:
            # 1. 部署新版本
            print(f"Deploying {new_version} to {deploy_env} environment")
            new_env.status = DeploymentStatus.DEPLOYING
            self._deploy_to_environment(new_env)
            new_env.status = DeploymentStatus.DEPLOYED
            
            # 2. 健康检查
            print(f"Running health checks for {deploy_env} environment")
            new_env.status = DeploymentStatus.TESTING
            if not self._health_check(new_env):
                new_env.status = DeploymentStatus.FAILED
                print(f"Health check failed for {deploy_env} environment")
                return False
            
            # 3. 切换流量
            print(f"Switching traffic to {deploy_env} environment")
            self._switch_traffic(deploy_env)
            new_env.status = DeploymentStatus.ACTIVE
            new_env.traffic_percentage = 100
            
            # 4. 更新环境状态
            if deploy_env == "blue":
                self.blue_env = new_env
                if self.green_env:
                    self.green_env.traffic_percentage = 0
                    self.green_env.status = DeploymentStatus.DEPLOYED
            else:
                self.green_env = new_env
                if self.blue_env:
                    self.blue_env.traffic_percentage = 0
                    self.blue_env.status = DeploymentStatus.DEPLOYED
            
            self.active_env = deploy_env
            print(f"Blue-green deployment completed successfully")
            return True
            
        except Exception as e:
            print(f"Deployment failed: {e}")
            new_env.status = DeploymentStatus.FAILED
            return False
    
    def rollback(self) -> bool:
        """回滚到上一个版本"""
        if not self.active_env:
            print("No active environment to rollback from")
            return False
        
        # 确定回滚目标
        if self.active_env == "blue":
            rollback_env = "green"
            target_env = self.green_env
        else:
            rollback_env = "blue"
            target_env = self.blue_env
        
        if not target_env or target_env.status != DeploymentStatus.DEPLOYED:
            print(f"No valid environment to rollback to")
            return False
        
        try:
            print(f"Rolling back to {rollback_env} environment (version {target_env.version})")
            
            # 切换流量
            self._switch_traffic(rollback_env)
            
            # 更新状态
            target_env.status = DeploymentStatus.ACTIVE
            target_env.traffic_percentage = 100
            
            current_env = self.blue_env if self.active_env == "blue" else self.green_env
            if current_env:
                current_env.status = DeploymentStatus.ROLLED_BACK
                current_env.traffic_percentage = 0
            
            self.active_env = rollback_env
            print(f"Rollback completed successfully")
            return True
            
        except Exception as e:
            print(f"Rollback failed: {e}")
            return False
    
    def _deploy_to_environment(self, env: Environment):
        """部署到指定环境(模拟实现)"""
        print(f"  Creating {env.replicas} replicas for version {env.version}")
        time.sleep(2)  # 模拟部署时间
        print(f"  Deployment to {env.name} environment completed")
    
    def _health_check(self, env: Environment) -> bool:
        """健康检查(模拟实现)"""
        print(f"  Checking health endpoint: {env.health_check_url}")
        time.sleep(1)  # 模拟健康检查时间
        
        # 模拟健康检查结果(90%成功率)
        import random
        success = random.random() > 0.1
        
        if success:
            print(f"  Health check passed for {env.name} environment")
        else:
            print(f"  Health check failed for {env.name} environment")
        
        return success
    
    def _switch_traffic(self, target_env: str):
        """切换流量(模拟实现)"""
        print(f"  Updating load balancer to route traffic to {target_env}")
        self.load_balancer.switch_target(self.service_name, target_env)
        time.sleep(0.5)  # 模拟切换时间
    
    def get_status(self) -> Dict[str, Any]:
        """获取部署状态"""
        return {
            "service_name": self.service_name,
            "active_environment": self.active_env,
            "blue_environment": asdict(self.blue_env) if self.blue_env else None,
            "green_environment": asdict(self.green_env) if self.green_env else None
        }

### 7.2 金丝雀部署

class CanaryDeployment:
    def __init__(self, service_name: str, load_balancer: 'LoadBalancer'):
        self.service_name = service_name
        self.load_balancer = load_balancer
        self.stable_env: Optional[Environment] = None
        self.canary_env: Optional[Environment] = None
        self.traffic_split_steps = [5, 10, 25, 50, 75, 100]  # 流量分配步骤
        self.current_step = 0
    
    def deploy(self, new_version: str, replicas: int = 1,
               health_check_url: str = "/health") -> bool:
        """执行金丝雀部署"""
        print(f"Starting canary deployment for {self.service_name} version {new_version}")
        
        # 创建金丝雀环境
        self.canary_env = Environment(
            name="canary",
            version=new_version,
            replicas=replicas,
            status=DeploymentStatus.PENDING,
            health_check_url=health_check_url,
            deployment_time=time.time()
        )
        
        try:
            # 1. 部署金丝雀版本
            print(f"Deploying canary version {new_version}")
            self.canary_env.status = DeploymentStatus.DEPLOYING
            self._deploy_to_environment(self.canary_env)
            self.canary_env.status = DeploymentStatus.DEPLOYED
            
            # 2. 健康检查
            if not self._health_check(self.canary_env):
                self.canary_env.status = DeploymentStatus.FAILED
                return False
            
            # 3. 逐步增加流量
            self.current_step = 0
            for step_percentage in self.traffic_split_steps:
                print(f"Increasing canary traffic to {step_percentage}%")
                
                # 更新流量分配
                self._update_traffic_split(step_percentage)
                
                # 监控指标
                if not self._monitor_canary_metrics():
                    print(f"Canary metrics failed at {step_percentage}% traffic")
                    self._rollback_canary()
                    return False
                
                # 等待观察期
                self._wait_observation_period()
                self.current_step += 1
            
            # 4. 完成部署
            self._promote_canary()
            print("Canary deployment completed successfully")
            return True
            
        except Exception as e:
            print(f"Canary deployment failed: {e}")
            self._rollback_canary()
            return False
    
    def _deploy_to_environment(self, env: Environment):
        """部署到环境"""
        print(f"  Creating {env.replicas} canary replicas")
        time.sleep(1)  # 模拟部署时间
    
    def _health_check(self, env: Environment) -> bool:
        """健康检查"""
        print(f"  Running health check for canary environment")
        time.sleep(0.5)
        return True  # 简化实现,总是返回成功
    
    def _update_traffic_split(self, canary_percentage: float):
        """更新流量分配"""
        stable_percentage = 100 - canary_percentage
        
        if self.stable_env:
            self.stable_env.traffic_percentage = stable_percentage
        
        self.canary_env.traffic_percentage = canary_percentage
        
        # 更新负载均衡器
        self.load_balancer.update_traffic_split(
            self.service_name,
            {"stable": stable_percentage, "canary": canary_percentage}
        )
    
    def _monitor_canary_metrics(self) -> bool:
        """监控金丝雀指标"""
        print("  Monitoring canary metrics...")
        time.sleep(1)  # 模拟监控时间
        
        # 模拟指标检查(错误率、响应时间等)
        import random
        
        # 模拟指标
        error_rate = random.uniform(0, 0.05)  # 0-5%错误率
        response_time = random.uniform(100, 500)  # 100-500ms响应时间
        
        print(f"    Error rate: {error_rate:.2%}")
        print(f"    Response time: {response_time:.0f}ms")
        
        # 检查阈值
        if error_rate > 0.03:  # 错误率超过3%
            print("    Error rate threshold exceeded")
            return False
        
        if response_time > 400:  # 响应时间超过400ms
            print("    Response time threshold exceeded")
            return False
        
        print("    Metrics within acceptable range")
        return True
    
    def _wait_observation_period(self):
        """等待观察期"""
        observation_time = 30  # 30秒观察期
        print(f"  Waiting {observation_time}s observation period...")
        time.sleep(2)  # 模拟等待时间(实际应该是30秒)
    
    def _rollback_canary(self):
        """回滚金丝雀部署"""
        print("Rolling back canary deployment")
        
        if self.canary_env:
            self.canary_env.status = DeploymentStatus.ROLLED_BACK
            self.canary_env.traffic_percentage = 0
        
        if self.stable_env:
            self.stable_env.traffic_percentage = 100
        
        # 更新负载均衡器
        self.load_balancer.update_traffic_split(
            self.service_name,
            {"stable": 100, "canary": 0}
        )
    
    def _promote_canary(self):
        """提升金丝雀为稳定版本"""
        print("Promoting canary to stable")
        
        # 将金丝雀环境设为稳定环境
        if self.canary_env:
            self.canary_env.status = DeploymentStatus.ACTIVE
            self.canary_env.traffic_percentage = 100
            
            # 更新稳定环境
            self.stable_env = Environment(
                name="stable",
                version=self.canary_env.version,
                replicas=self.canary_env.replicas * 3,  # 扩展副本数
                status=DeploymentStatus.ACTIVE,
                health_check_url=self.canary_env.health_check_url,
                traffic_percentage=100
            )
            
            # 清理金丝雀环境
            self.canary_env = None
    
    def get_status(self) -> Dict[str, Any]:
        """获取部署状态"""
        return {
            "service_name": self.service_name,
            "current_step": self.current_step,
            "total_steps": len(self.traffic_split_steps),
            "stable_environment": asdict(self.stable_env) if self.stable_env else None,
            "canary_environment": asdict(self.canary_env) if self.canary_env else None
        }

class LoadBalancer:
    def __init__(self):
        self.routes: Dict[str, Dict[str, float]] = {}
    
    def switch_target(self, service_name: str, target_env: str):
        """切换目标环境"""
        print(f"Load balancer: Switching {service_name} traffic to {target_env}")
        self.routes[service_name] = {target_env: 100}
    
    def update_traffic_split(self, service_name: str, traffic_split: Dict[str, float]):
        """更新流量分配"""
        print(f"Load balancer: Updating {service_name} traffic split: {traffic_split}")
        self.routes[service_name] = traffic_split

# 使用示例
def deployment_example():
    print("=== 蓝绿部署示例 ===")
    
    # 创建负载均衡器
    load_balancer = LoadBalancer()
    
    # 创建蓝绿部署管理器
    blue_green = BlueGreenDeployment("user-service", load_balancer)
    
    # 执行第一次部署
    blue_green.deploy("v1.0.0", replicas=3)
    print(f"Status: {json.dumps(blue_green.get_status(), indent=2, ensure_ascii=False)}")
    
    # 执行第二次部署
    time.sleep(1)
    blue_green.deploy("v1.1.0", replicas=3)
    print(f"Status: {json.dumps(blue_green.get_status(), indent=2, ensure_ascii=False)}")
    
    # 模拟回滚
    time.sleep(1)
    blue_green.rollback()
    print(f"Status after rollback: {json.dumps(blue_green.get_status(), indent=2, ensure_ascii=False)}")
    
    print("\n=== 金丝雀部署示例 ===")
    
    # 创建金丝雀部署管理器
    canary = CanaryDeployment("order-service", load_balancer)
    
    # 设置稳定版本
    canary.stable_env = Environment(
        name="stable",
        version="v2.0.0",
        replicas=5,
        status=DeploymentStatus.ACTIVE,
        health_check_url="/health",
        traffic_percentage=100
    )
    
    # 执行金丝雀部署
    canary.deploy("v2.1.0", replicas=1)
    print(f"Canary status: {json.dumps(canary.get_status(), indent=2, ensure_ascii=False)}")
    
    return blue_green, canary

## 8. 最佳实践与总结

### 8.1 架构设计原则

1. **单一职责原则**每个微服务应该只负责一个业务功能
2. **服务自治**服务应该能够独立开发部署和扩展
3. **去中心化治理**避免单点故障采用分布式治理
4. **容错设计**假设故障会发生设计容错机制
5. **可观测性**确保系统的可监控可追踪可调试

### 8.2 实施建议

1. **渐进式迁移**从单体应用逐步拆分为微服务
2. **团队组织**按照康威定律组织团队结构
3. **技术栈统一**在组织内保持适度的技术栈统一
4. **自动化优先**投资于CI/CD和自动化测试
5. **文档和培训**建立完善的文档和培训体系

### 8.3 常见陷阱

1. **过度拆分**避免创建过多的微服务
2. **分布式事务**尽量避免跨服务事务
3. **网络延迟**考虑服务间通信的网络开销
4. **数据一致性**采用最终一致性而非强一致性
5. **运维复杂性**准备好应对增加的运维复杂性

## 总结

云原生微服务架构是现代应用开发的重要趋势它通过服务拆分容器化服务网格等技术实现了应用的高可用高扩展和高效运维本文从架构设计技术实现到部署策略全面介绍了云原生微服务架构的核心概念和最佳实践

在实施微服务架构时需要综合考虑业务需求团队能力和技术成熟度采用渐进式的方法逐步构建和完善微服务体系同时要重视监控日志安全等非功能性需求确保系统的稳定性和可维护性

通过合理的架构设计和工程实践微服务架构能够帮助组织构建更加灵活可扩展的云原生应用支撑业务的快速发展和创新

if __name__ == "__main__":
    # 运行所有示例
    print("=== 云原生微服务架构完整示例 ===\n")
    
    # 配置管理和服务发现
    config_center, service_registry = config_and_discovery_example()
    
    print("\n" + "="*50 + "\n")
    
    # 监控和追踪
    tracer, metrics = monitoring_example()
    
    print("\n" + "="*50 + "\n")
    
    # 部署策略
    blue_green, canary = deployment_example()

分享文章