跳转到主要内容

微服务架构在云环境中的设计模式:构建可扩展、弹性的分布式系统

博主
39 分钟
8126 字
--

AI 导读

深刻理解和准确把握"微服务架构在云环境中的设计模式:构建可扩展、弹性的分布式系统"这一重要概念的核心要义,本文从理论基础、实践应用和发展前景等多个维度进行了系统性阐述,为读者提供了全面而深入的分析视角。

内容由AI智能生成

目录

  1. 微服务架构概述
  2. 服务拆分与设计原则
  3. 服务通信模式
  4. 数据管理策略
  5. 服务发现与注册
  6. API网关设计
  7. 服务网格架构
  8. 监控与可观测性
  9. 部署与运维
  10. 总结

微服务架构概述

微服务架构是一种将单一应用程序开发为一套小型服务的方法,每个服务运行在自己的进程中,并使用轻量级机制(通常是HTTP API)进行通信。

微服务架构图

graph TB
    subgraph "客户端层"
        Web[Web应用]
        Mobile[移动应用]
        API[第三方API]
    end
    
    subgraph "API网关层"
        Gateway[API网关]
        LB[负载均衡器]
    end
    
    subgraph "微服务层"
        UserService[用户服务]
        OrderService[订单服务]
        PaymentService[支付服务]
        ProductService[商品服务]
        NotificationService[通知服务]
    end
    
    subgraph "数据层"
        UserDB[(用户数据库)]
        OrderDB[(订单数据库)]
        PaymentDB[(支付数据库)]
        ProductDB[(商品数据库)]
        Cache[(缓存层)]
    end
    
    subgraph "基础设施层"
        ServiceRegistry[服务注册中心]
        ConfigCenter[配置中心]
        MessageQueue[消息队列]
        Monitoring[监控系统]
    end
    
    Web --> Gateway
    Mobile --> Gateway
    API --> Gateway
    
    Gateway --> LB
    LB --> UserService
    LB --> OrderService
    LB --> PaymentService
    LB --> ProductService
    LB --> NotificationService
    
    UserService --> UserDB
    OrderService --> OrderDB
    PaymentService --> PaymentDB
    ProductService --> ProductDB
    
    UserService --> Cache
    OrderService --> Cache
    ProductService --> Cache
    
    UserService --> ServiceRegistry
    OrderService --> ServiceRegistry
    PaymentService --> ServiceRegistry
    ProductService --> ServiceRegistry
    NotificationService --> ServiceRegistry
    
    OrderService --> MessageQueue
    PaymentService --> MessageQueue
    NotificationService --> MessageQueue

微服务架构优势

  1. 技术多样性:每个服务可以使用最适合的技术栈
  2. 独立部署:服务可以独立开发、测试和部署
  3. 故障隔离:单个服务的故障不会影响整个系统
  4. 团队自治:小团队可以独立负责特定服务
  5. 可扩展性:可以根据需求独立扩展特定服务

服务拆分与设计原则

合理的服务拆分是微服务架构成功的关键。

服务拆分策略

#!/usr/bin/env python3
"""
微服务拆分分析器
基于业务领域、数据依赖和团队结构分析服务拆分策略
"""

from typing import Dict, List, Set, Tuple
from dataclasses import dataclass
from enum import Enum
import json

class CouplingType(Enum):
    """耦合类型"""
    DATA = "data"
    FUNCTIONAL = "functional"
    TEMPORAL = "temporal"
    SPATIAL = "spatial"

@dataclass
class BusinessCapability:
    """业务能力"""
    name: str
    description: str
    data_entities: List[str]
    operations: List[str]
    dependencies: List[str]
    team_ownership: str
    complexity_score: int

@dataclass
class ServiceBoundary:
    """服务边界"""
    service_name: str
    capabilities: List[BusinessCapability]
    data_ownership: List[str]
    api_contracts: List[str]
    coupling_score: float

class MicroserviceDecomposer:
    """微服务拆分器"""
    
    def __init__(self):
        self.business_capabilities = []
        self.coupling_matrix = {}
        self.team_structure = {}
    
    def add_business_capability(self, capability: BusinessCapability):
        """添加业务能力"""
        self.business_capabilities.append(capability)
    
    def analyze_coupling(self) -> Dict[str, Dict[str, float]]:
        """分析服务间耦合度"""
        coupling_matrix = {}
        
        for cap1 in self.business_capabilities:
            coupling_matrix[cap1.name] = {}
            
            for cap2 in self.business_capabilities:
                if cap1.name == cap2.name:
                    coupling_matrix[cap1.name][cap2.name] = 0.0
                    continue
                
                # 计算数据耦合
                data_coupling = self._calculate_data_coupling(cap1, cap2)
                
                # 计算功能耦合
                functional_coupling = self._calculate_functional_coupling(cap1, cap2)
                
                # 计算团队耦合
                team_coupling = self._calculate_team_coupling(cap1, cap2)
                
                # 综合耦合度
                total_coupling = (data_coupling * 0.4 + 
                                functional_coupling * 0.4 + 
                                team_coupling * 0.2)
                
                coupling_matrix[cap1.name][cap2.name] = total_coupling
        
        return coupling_matrix
    
    def _calculate_data_coupling(self, cap1: BusinessCapability, 
                               cap2: BusinessCapability) -> float:
        """计算数据耦合度"""
        shared_entities = set(cap1.data_entities) & set(cap2.data_entities)
        total_entities = set(cap1.data_entities) | set(cap2.data_entities)
        
        if not total_entities:
            return 0.0
        
        return len(shared_entities) / len(total_entities)
    
    def _calculate_functional_coupling(self, cap1: BusinessCapability, 
                                     cap2: BusinessCapability) -> float:
        """计算功能耦合度"""
        # 检查依赖关系
        if cap2.name in cap1.dependencies or cap1.name in cap2.dependencies:
            return 0.8
        
        # 检查操作相似性
        shared_operations = set(cap1.operations) & set(cap2.operations)
        total_operations = set(cap1.operations) | set(cap2.operations)
        
        if not total_operations:
            return 0.0
        
        return len(shared_operations) / len(total_operations)
    
    def _calculate_team_coupling(self, cap1: BusinessCapability, 
                               cap2: BusinessCapability) -> float:
        """计算团队耦合度"""
        if cap1.team_ownership == cap2.team_ownership:
            return 0.2  # 同一团队,低耦合
        else:
            return 0.8  # 不同团队,高耦合
    
    def suggest_service_boundaries(self, max_coupling_threshold: float = 0.3) -> List[ServiceBoundary]:
        """建议服务边界"""
        coupling_matrix = self.analyze_coupling()
        service_boundaries = []
        processed_capabilities = set()
        
        for capability in self.business_capabilities:
            if capability.name in processed_capabilities:
                continue
            
            # 找到低耦合的相关能力
            related_capabilities = [capability]
            processed_capabilities.add(capability.name)
            
            for other_cap in self.business_capabilities:
                if (other_cap.name not in processed_capabilities and
                    coupling_matrix[capability.name][other_cap.name] <= max_coupling_threshold):
                    
                    related_capabilities.append(other_cap)
                    processed_capabilities.add(other_cap.name)
            
            # 创建服务边界
            service_boundary = self._create_service_boundary(related_capabilities)
            service_boundaries.append(service_boundary)
        
        return service_boundaries
    
    def _create_service_boundary(self, capabilities: List[BusinessCapability]) -> ServiceBoundary:
        """创建服务边界"""
        # 生成服务名称
        service_name = f"{capabilities[0].name.replace('_', '-')}-service"
        
        # 收集数据所有权
        data_ownership = []
        for cap in capabilities:
            data_ownership.extend(cap.data_entities)
        data_ownership = list(set(data_ownership))
        
        # 生成API契约
        api_contracts = []
        for cap in capabilities:
            for operation in cap.operations:
                api_contracts.append(f"/{cap.name.lower()}/{operation}")
        
        # 计算平均耦合度
        coupling_score = sum(cap.complexity_score for cap in capabilities) / len(capabilities)
        
        return ServiceBoundary(
            service_name=service_name,
            capabilities=capabilities,
            data_ownership=data_ownership,
            api_contracts=api_contracts,
            coupling_score=coupling_score
        )
    
    def generate_decomposition_report(self) -> Dict:
        """生成拆分报告"""
        service_boundaries = self.suggest_service_boundaries()
        coupling_matrix = self.analyze_coupling()
        
        report = {
            "decomposition_summary": {
                "total_capabilities": len(self.business_capabilities),
                "suggested_services": len(service_boundaries),
                "average_coupling": self._calculate_average_coupling(coupling_matrix)
            },
            "service_boundaries": [],
            "coupling_analysis": coupling_matrix,
            "recommendations": self._generate_recommendations(service_boundaries)
        }
        
        for boundary in service_boundaries:
            report["service_boundaries"].append({
                "service_name": boundary.service_name,
                "capabilities": [cap.name for cap in boundary.capabilities],
                "data_ownership": boundary.data_ownership,
                "api_contracts": boundary.api_contracts,
                "coupling_score": boundary.coupling_score
            })
        
        return report
    
    def _calculate_average_coupling(self, coupling_matrix: Dict) -> float:
        """计算平均耦合度"""
        total_coupling = 0
        count = 0
        
        for service1, couplings in coupling_matrix.items():
            for service2, coupling in couplings.items():
                if service1 != service2:
                    total_coupling += coupling
                    count += 1
        
        return total_coupling / count if count > 0 else 0.0
    
    def _generate_recommendations(self, service_boundaries: List[ServiceBoundary]) -> List[str]:
        """生成建议"""
        recommendations = []
        
        # 检查服务数量
        if len(service_boundaries) > 15:
            recommendations.append("服务数量较多,考虑进一步合并相关服务")
        elif len(service_boundaries) < 3:
            recommendations.append("服务数量较少,可能存在过度聚合")
        
        # 检查耦合度
        high_coupling_services = [b for b in service_boundaries if b.coupling_score > 0.7]
        if high_coupling_services:
            recommendations.append(f"以下服务耦合度较高,建议重新设计: {[s.service_name for s in high_coupling_services]}")
        
        # 检查数据所有权
        data_conflicts = self._check_data_ownership_conflicts(service_boundaries)
        if data_conflicts:
            recommendations.append(f"存在数据所有权冲突: {data_conflicts}")
        
        return recommendations
    
    def _check_data_ownership_conflicts(self, service_boundaries: List[ServiceBoundary]) -> List[str]:
        """检查数据所有权冲突"""
        data_ownership_map = {}
        conflicts = []
        
        for boundary in service_boundaries:
            for data_entity in boundary.data_ownership:
                if data_entity in data_ownership_map:
                    conflicts.append(f"{data_entity} 被多个服务拥有: {data_ownership_map[data_entity]}, {boundary.service_name}")
                else:
                    data_ownership_map[data_entity] = boundary.service_name
        
        return conflicts

def main():
    """主函数 - 演示微服务拆分分析"""
    decomposer = MicroserviceDecomposer()
    
    # 定义业务能力
    user_management = BusinessCapability(
        name="user_management",
        description="用户管理",
        data_entities=["User", "Profile", "Authentication"],
        operations=["create_user", "update_profile", "authenticate"],
        dependencies=[],
        team_ownership="user_team",
        complexity_score=3
    )
    
    order_processing = BusinessCapability(
        name="order_processing",
        description="订单处理",
        data_entities=["Order", "OrderItem", "OrderStatus"],
        operations=["create_order", "update_order", "cancel_order"],
        dependencies=["user_management", "product_catalog", "payment_processing"],
        team_ownership="order_team",
        complexity_score=5
    )
    
    product_catalog = BusinessCapability(
        name="product_catalog",
        description="商品目录",
        data_entities=["Product", "Category", "Inventory"],
        operations=["add_product", "update_inventory", "search_products"],
        dependencies=[],
        team_ownership="product_team",
        complexity_score=4
    )
    
    payment_processing = BusinessCapability(
        name="payment_processing",
        description="支付处理",
        data_entities=["Payment", "Transaction", "PaymentMethod"],
        operations=["process_payment", "refund", "validate_payment"],
        dependencies=["user_management"],
        team_ownership="payment_team",
        complexity_score=6
    )
    
    notification_service = BusinessCapability(
        name="notification_service",
        description="通知服务",
        data_entities=["Notification", "Template", "DeliveryLog"],
        operations=["send_email", "send_sms", "push_notification"],
        dependencies=["user_management"],
        team_ownership="platform_team",
        complexity_score=2
    )
    
    # 添加业务能力
    for capability in [user_management, order_processing, product_catalog, 
                      payment_processing, notification_service]:
        decomposer.add_business_capability(capability)
    
    # 生成拆分报告
    print("生成微服务拆分报告...")
    report = decomposer.generate_decomposition_report()
    
    print(f"\n=== 微服务拆分分析报告 ===")
    print(f"总业务能力数: {report['decomposition_summary']['total_capabilities']}")
    print(f"建议服务数: {report['decomposition_summary']['suggested_services']}")
    print(f"平均耦合度: {report['decomposition_summary']['average_coupling']:.2f}")
    
    print(f"\n=== 建议的服务边界 ===")
    for service in report['service_boundaries']:
        print(f"\n服务名: {service['service_name']}")
        print(f"  业务能力: {service['capabilities']}")
        print(f"  数据所有权: {service['data_ownership']}")
        print(f"  耦合度评分: {service['coupling_score']:.2f}")
    
    print(f"\n=== 优化建议 ===")
    for recommendation in report['recommendations']:
        print(f"- {recommendation}")

if __name__ == "__main__":
    main()

服务通信模式

微服务之间的通信是架构设计的核心考虑因素。

同步通信模式

#!/usr/bin/env python3
"""
微服务同步通信实现
包括HTTP REST API、gRPC等同步通信模式
"""

import asyncio
import aiohttp
import grpc
from typing import Dict, Any, Optional
import json
import time
from dataclasses import dataclass
from enum import Enum
import logging

class CommunicationProtocol(Enum):
    """通信协议"""
    HTTP_REST = "http_rest"
    GRPC = "grpc"
    GRAPHQL = "graphql"

@dataclass
class ServiceEndpoint:
    """服务端点"""
    service_name: str
    host: str
    port: int
    protocol: CommunicationProtocol
    health_check_path: str = "/health"

@dataclass
class RequestContext:
    """请求上下文"""
    correlation_id: str
    user_id: Optional[str] = None
    tenant_id: Optional[str] = None
    timeout: int = 30
    retry_count: int = 3

class CircuitBreakerState(Enum):
    """熔断器状态"""
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    """熔断器实现"""
    
    def __init__(self, failure_threshold: int = 5, 
                 recovery_timeout: int = 60,
                 success_threshold: int = 3):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold
        
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None
        self.state = CircuitBreakerState.CLOSED
    
    def can_execute(self) -> bool:
        """检查是否可以执行请求"""
        if self.state == CircuitBreakerState.CLOSED:
            return True
        elif self.state == CircuitBreakerState.OPEN:
            if (time.time() - self.last_failure_time) > self.recovery_timeout:
                self.state = CircuitBreakerState.HALF_OPEN
                self.success_count = 0
                return True
            return False
        else:  # HALF_OPEN
            return True
    
    def record_success(self):
        """记录成功"""
        if self.state == CircuitBreakerState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitBreakerState.CLOSED
                self.failure_count = 0
        elif self.state == CircuitBreakerState.CLOSED:
            self.failure_count = 0
    
    def record_failure(self):
        """记录失败"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitBreakerState.OPEN

class ServiceCommunicator:
    """服务通信器"""
    
    def __init__(self):
        self.service_registry = {}
        self.circuit_breakers = {}
        self.session = None
        self.logger = logging.getLogger(__name__)
    
    async def __aenter__(self):
        """异步上下文管理器入口"""
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步上下文管理器出口"""
        if self.session:
            await self.session.close()
    
    def register_service(self, endpoint: ServiceEndpoint):
        """注册服务端点"""
        self.service_registry[endpoint.service_name] = endpoint
        self.circuit_breakers[endpoint.service_name] = CircuitBreaker()
    
    async def call_service(self, service_name: str, method: str, 
                          path: str, context: RequestContext,
                          data: Optional[Dict] = None) -> Dict[str, Any]:
        """调用服务"""
        if service_name not in self.service_registry:
            raise ValueError(f"Service {service_name} not registered")
        
        endpoint = self.service_registry[service_name]
        circuit_breaker = self.circuit_breakers[service_name]
        
        # 检查熔断器状态
        if not circuit_breaker.can_execute():
            raise Exception(f"Circuit breaker is OPEN for service {service_name}")
        
        try:
            if endpoint.protocol == CommunicationProtocol.HTTP_REST:
                result = await self._call_http_service(endpoint, method, path, context, data)
            elif endpoint.protocol == CommunicationProtocol.GRPC:
                result = await self._call_grpc_service(endpoint, method, path, context, data)
            else:
                raise ValueError(f"Unsupported protocol: {endpoint.protocol}")
            
            circuit_breaker.record_success()
            return result
            
        except Exception as e:
            circuit_breaker.record_failure()
            self.logger.error(f"Service call failed: {service_name}.{method} - {str(e)}")
            raise
    
    async def _call_http_service(self, endpoint: ServiceEndpoint, method: str,
                               path: str, context: RequestContext,
                               data: Optional[Dict] = None) -> Dict[str, Any]:
        """HTTP服务调用"""
        url = f"http://{endpoint.host}:{endpoint.port}{path}"
        
        headers = {
            'Content-Type': 'application/json',
            'X-Correlation-ID': context.correlation_id,
            'X-Request-Timeout': str(context.timeout)
        }
        
        if context.user_id:
            headers['X-User-ID'] = context.user_id
        if context.tenant_id:
            headers['X-Tenant-ID'] = context.tenant_id
        
        timeout = aiohttp.ClientTimeout(total=context.timeout)
        
        for attempt in range(context.retry_count):
            try:
                async with self.session.request(
                    method=method.upper(),
                    url=url,
                    headers=headers,
                    json=data,
                    timeout=timeout
                ) as response:
                    
                    if response.status == 200:
                        return await response.json()
                    elif response.status >= 500:
                        # 服务器错误,可以重试
                        if attempt < context.retry_count - 1:
                            await asyncio.sleep(2 ** attempt)  # 指数退避
                            continue
                    
                    # 客户端错误或最后一次重试失败
                    error_text = await response.text()
                    raise Exception(f"HTTP {response.status}: {error_text}")
                    
            except asyncio.TimeoutError:
                if attempt < context.retry_count - 1:
                    await asyncio.sleep(2 ** attempt)
                    continue
                raise Exception(f"Request timeout after {context.retry_count} attempts")
            except Exception as e:
                if attempt < context.retry_count - 1:
                    await asyncio.sleep(2 ** attempt)
                    continue
                raise
    
    async def _call_grpc_service(self, endpoint: ServiceEndpoint, method: str,
                               path: str, context: RequestContext,
                               data: Optional[Dict] = None) -> Dict[str, Any]:
        """gRPC服务调用"""
        # 这里是gRPC调用的示例实现
        # 实际实现需要根据具体的protobuf定义
        
        channel_address = f"{endpoint.host}:{endpoint.port}"
        
        try:
            async with grpc.aio.insecure_channel(channel_address) as channel:
                # 这里需要根据实际的gRPC服务定义来实现
                # stub = YourServiceStub(channel)
                # response = await stub.YourMethod(request, timeout=context.timeout)
                # return MessageToDict(response)
                
                # 示例返回
                return {"status": "success", "message": "gRPC call completed"}
                
        except grpc.RpcError as e:
            raise Exception(f"gRPC call failed: {e.code()} - {e.details()}")
    
    async def health_check(self, service_name: str) -> bool:
        """健康检查"""
        if service_name not in self.service_registry:
            return False
        
        endpoint = self.service_registry[service_name]
        
        try:
            context = RequestContext(
                correlation_id=f"health-check-{int(time.time())}",
                timeout=5,
                retry_count=1
            )
            
            await self.call_service(
                service_name=service_name,
                method="GET",
                path=endpoint.health_check_path,
                context=context
            )
            
            return True
            
        except Exception:
            return False
    
    async def batch_health_check(self) -> Dict[str, bool]:
        """批量健康检查"""
        health_status = {}
        
        tasks = []
        for service_name in self.service_registry.keys():
            task = asyncio.create_task(self.health_check(service_name))
            tasks.append((service_name, task))
        
        for service_name, task in tasks:
            try:
                health_status[service_name] = await task
            except Exception:
                health_status[service_name] = False
        
        return health_status

async def main():
    """主函数 - 演示服务通信"""
    async with ServiceCommunicator() as communicator:
        
        # 注册服务端点
        user_service = ServiceEndpoint(
            service_name="user-service",
            host="localhost",
            port=8001,
            protocol=CommunicationProtocol.HTTP_REST
        )
        
        order_service = ServiceEndpoint(
            service_name="order-service",
            host="localhost",
            port=8002,
            protocol=CommunicationProtocol.HTTP_REST
        )
        
        communicator.register_service(user_service)
        communicator.register_service(order_service)
        
        # 创建请求上下文
        context = RequestContext(
            correlation_id="req-12345",
            user_id="user-001",
            timeout=30
        )
        
        try:
            # 调用用户服务
            print("调用用户服务...")
            user_response = await communicator.call_service(
                service_name="user-service",
                method="GET",
                path="/users/user-001",
                context=context
            )
            print(f"用户服务响应: {user_response}")
            
            # 调用订单服务
            print("调用订单服务...")
            order_data = {
                "user_id": "user-001",
                "items": [
                    {"product_id": "prod-001", "quantity": 2},
                    {"product_id": "prod-002", "quantity": 1}
                ]
            }
            
            order_response = await communicator.call_service(
                service_name="order-service",
                method="POST",
                path="/orders",
                context=context,
                data=order_data
            )
            print(f"订单服务响应: {order_response}")
            
        except Exception as e:
            print(f"服务调用失败: {str(e)}")
        
        # 批量健康检查
        print("\n执行健康检查...")
        health_status = await communicator.batch_health_check()
        
        for service_name, is_healthy in health_status.items():
            status = "健康" if is_healthy else "不健康"
            print(f"{service_name}: {status}")

if __name__ == "__main__":
    asyncio.run(main())

服务网格架构

服务网格提供了微服务间通信的基础设施层,处理服务发现、负载均衡、故障恢复、指标收集和安全等功能。

Istio服务网格部署

#!/bin/bash
"""
Istio服务网格部署脚本
包括安装、配置和示例应用部署
"""

set -e

# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color

# 日志函数
log_info() {
    echo -e "${GREEN}[INFO]${NC} $1"
}

log_warn() {
    echo -e "${YELLOW}[WARN]${NC} $1"
}

log_error() {
    echo -e "${RED}[ERROR]${NC} $1"
}

# 检查依赖
check_dependencies() {
    log_info "检查依赖..."
    
    if ! command -v kubectl &> /dev/null; then
        log_error "kubectl 未安装"
        exit 1
    fi
    
    if ! command -v curl &> /dev/null; then
        log_error "curl 未安装"
        exit 1
    fi
    
    # 检查Kubernetes集群连接
    if ! kubectl cluster-info &> /dev/null; then
        log_error "无法连接到Kubernetes集群"
        exit 1
    fi
    
    log_info "依赖检查完成"
}

# 安装Istio
install_istio() {
    log_info "安装Istio..."
    
    # 下载Istio
    ISTIO_VERSION="1.19.0"
    curl -L https://istio.io/downloadIstio | ISTIO_VERSION=$ISTIO_VERSION sh -
    
    # 添加到PATH
    export PATH="$PWD/istio-$ISTIO_VERSION/bin:$PATH"
    
    # 安装Istio
    istioctl install --set values.defaultRevision=default -y
    
    # 启用自动注入
    kubectl label namespace default istio-injection=enabled
    
    log_info "Istio安装完成"
}

# 部署Istio网关
deploy_gateway() {
    log_info "部署Istio网关..."
    
    cat <<EOF | kubectl apply -f -
apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
  name: microservices-gateway
  namespace: default
spec:
  selector:
    istio: ingressgateway
  servers:
  - port:
      number: 80
      name: http
      protocol: HTTP
    hosts:
    - "*"
  - port:
      number: 443
      name: https
      protocol: HTTPS
    tls:
      mode: SIMPLE
      credentialName: microservices-tls
    hosts:
    - "*.example.com"
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: microservices-vs
  namespace: default
spec:
  hosts:
  - "*"
  gateways:
  - microservices-gateway
  http:
  - match:
    - uri:
        prefix: /api/users
    route:
    - destination:
        host: user-service
        port:
          number: 8080
    fault:
      delay:
        percentage:
          value: 0.1
        fixedDelay: 5s
    retries:
      attempts: 3
      perTryTimeout: 2s
  - match:
    - uri:
        prefix: /api/orders
    route:
    - destination:
        host: order-service
        port:
          number: 8080
      weight: 80
    - destination:
        host: order-service-v2
        port:
          number: 8080
      weight: 20
    timeout: 10s
  - match:
    - uri:
        prefix: /api/payments
    route:
    - destination:
        host: payment-service
        port:
          number: 8080
    headers:
      request:
        add:
          x-request-id: "%REQ(x-request-id)%"
          x-trace-id: "%REQ(x-trace-id)%"
EOF

    log_info "Istio网关部署完成"
}

# 配置流量管理
configure_traffic_management() {
    log_info "配置流量管理..."
    
    # 目标规则
    cat <<EOF | kubectl apply -f -
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: user-service-dr
  namespace: default
spec:
  host: user-service
  trafficPolicy:
    connectionPool:
      tcp:
        maxConnections: 100
      http:
        http1MaxPendingRequests: 50
        maxRequestsPerConnection: 10
    loadBalancer:
      simple: LEAST_CONN
    outlierDetection:
      consecutiveErrors: 3
      interval: 30s
      baseEjectionTime: 30s
      maxEjectionPercent: 50
  subsets:
  - name: v1
    labels:
      version: v1
  - name: v2
    labels:
      version: v2
---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: order-service-dr
  namespace: default
spec:
  host: order-service
  trafficPolicy:
    connectionPool:
      tcp:
        maxConnections: 50
      http:
        http1MaxPendingRequests: 25
        maxRequestsPerConnection: 5
    loadBalancer:
      simple: ROUND_ROBIN
    circuitBreaker:
      consecutiveErrors: 5
      interval: 10s
      baseEjectionTime: 30s
      maxEjectionPercent: 50
      minHealthPercent: 30
EOF

    log_info "流量管理配置完成"
}

# 配置安全策略
configure_security() {
    log_info "配置安全策略..."
    
    # 认证策略
    cat <<EOF | kubectl apply -f -
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
  name: default
  namespace: default
spec:
  mtls:
    mode: STRICT
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: user-service-authz
  namespace: default
spec:
  selector:
    matchLabels:
      app: user-service
  rules:
  - from:
    - source:
        principals: ["cluster.local/ns/default/sa/api-gateway"]
  - to:
    - operation:
        methods: ["GET", "POST"]
        paths: ["/api/users/*"]
  - when:
    - key: request.headers[authorization]
      values: ["Bearer *"]
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: order-service-authz
  namespace: default
spec:
  selector:
    matchLabels:
      app: order-service
  rules:
  - from:
    - source:
        principals: ["cluster.local/ns/default/sa/api-gateway"]
    - source:
        principals: ["cluster.local/ns/default/sa/user-service"]
  - to:
    - operation:
        methods: ["GET", "POST", "PUT", "DELETE"]
        paths: ["/api/orders/*"]
  - when:
    - key: request.headers[x-user-id]
      notValues: [""]
EOF

    log_info "安全策略配置完成"
}

# 部署监控组件
deploy_monitoring() {
    log_info "部署监控组件..."
    
    # 安装Prometheus
    kubectl apply -f https://raw.githubusercontent.com/istio/istio/release-1.19/samples/addons/prometheus.yaml
    
    # 安装Grafana
    kubectl apply -f https://raw.githubusercontent.com/istio/istio/release-1.19/samples/addons/grafana.yaml
    
    # 安装Jaeger
    kubectl apply -f https://raw.githubusercontent.com/istio/istio/release-1.19/samples/addons/jaeger.yaml
    
    # 安装Kiali
    kubectl apply -f https://raw.githubusercontent.com/istio/istio/release-1.19/samples/addons/kiali.yaml
    
    # 等待部署完成
    kubectl wait --for=condition=available --timeout=300s deployment/prometheus -n istio-system
    kubectl wait --for=condition=available --timeout=300s deployment/grafana -n istio-system
    kubectl wait --for=condition=available --timeout=300s deployment/jaeger -n istio-system
    kubectl wait --for=condition=available --timeout=300s deployment/kiali -n istio-system
    
    log_info "监控组件部署完成"
}

# 部署示例应用
deploy_sample_apps() {
    log_info "部署示例应用..."
    
    # 用户服务
    cat <<EOF | kubectl apply -f -
apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
  labels:
    app: user-service
    version: v1
spec:
  replicas: 2
  selector:
    matchLabels:
      app: user-service
      version: v1
  template:
    metadata:
      labels:
        app: user-service
        version: v1
    spec:
      containers:
      - name: user-service
        image: nginx:alpine
        ports:
        - containerPort: 80
        env:
        - name: SERVICE_NAME
          value: "user-service"
        - name: VERSION
          value: "v1"
---
apiVersion: v1
kind: Service
metadata:
  name: user-service
  labels:
    app: user-service
spec:
  ports:
  - port: 8080
    targetPort: 80
    name: http
  selector:
    app: user-service
EOF

    # 订单服务
    cat <<EOF | kubectl apply -f -
apiVersion: apps/v1
kind: Deployment
metadata:
  name: order-service
  labels:
    app: order-service
    version: v1
spec:
  replicas: 2
  selector:
    matchLabels:
      app: order-service
      version: v1
  template:
    metadata:
      labels:
        app: order-service
        version: v1
    spec:
      containers:
      - name: order-service
        image: nginx:alpine
        ports:
        - containerPort: 80
        env:
        - name: SERVICE_NAME
          value: "order-service"
        - name: VERSION
          value: "v1"
---
apiVersion: v1
kind: Service
metadata:
  name: order-service
  labels:
    app: order-service
spec:
  ports:
  - port: 8080
    targetPort: 80
    name: http
  selector:
    app: order-service
EOF

    log_info "示例应用部署完成"
}

# 验证部署
verify_deployment() {
    log_info "验证部署..."
    
    # 检查Istio组件
    kubectl get pods -n istio-system
    
    # 检查应用
    kubectl get pods -n default
    
    # 检查网关
    kubectl get gateway,virtualservice,destinationrule -n default
    
    # 获取Ingress Gateway地址
    INGRESS_HOST=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
    INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].port}')
    
    if [ -z "$INGRESS_HOST" ]; then
        INGRESS_HOST=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].hostname}')
    fi
    
    if [ -z "$INGRESS_HOST" ]; then
        INGRESS_HOST="localhost"
        INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
    fi
    
    GATEWAY_URL="$INGRESS_HOST:$INGRESS_PORT"
    
    log_info "网关地址: http://$GATEWAY_URL"
    log_info "Kiali仪表板: kubectl port-forward -n istio-system svc/kiali 20001:20001"
    log_info "Grafana仪表板: kubectl port-forward -n istio-system svc/grafana 3000:3000"
    log_info "Jaeger仪表板: kubectl port-forward -n istio-system svc/jaeger 16686:16686"
    
    log_info "部署验证完成"
}

# 清理资源
cleanup() {
    log_info "清理资源..."
    
    # 删除应用
    kubectl delete deployment,service user-service order-service 2>/dev/null || true
    
    # 删除Istio配置
    kubectl delete gateway,virtualservice,destinationrule,peerauthentication,authorizationpolicy --all 2>/dev/null || true
    
    # 删除监控组件
    kubectl delete -f https://raw.githubusercontent.com/istio/istio/release-1.19/samples/addons/prometheus.yaml 2>/dev/null || true
    kubectl delete -f https://raw.githubusercontent.com/istio/istio/release-1.19/samples/addons/grafana.yaml 2>/dev/null || true
    kubectl delete -f https://raw.githubusercontent.com/istio/istio/release-1.19/samples/addons/jaeger.yaml 2>/dev/null || true
    kubectl delete -f https://raw.githubusercontent.com/istio/istio/release-1.19/samples/addons/kiali.yaml 2>/dev/null || true
    
    # 卸载Istio
    istioctl uninstall --purge -y 2>/dev/null || true
    
    log_info "清理完成"
}

# 主函数
main() {
    case "${1:-deploy}" in
        "deploy")
            check_dependencies
            install_istio
            deploy_gateway
            configure_traffic_management
            configure_security
            deploy_monitoring
            deploy_sample_apps
            verify_deployment
            ;;
        "cleanup")
            cleanup
            ;;
        "verify")
            verify_deployment
            ;;
        *)
            echo "用法: $0 [deploy|cleanup|verify]"
            echo "  deploy  - 部署Istio服务网格"
            echo "  cleanup - 清理所有资源"
            echo "  verify  - 验证部署状态"
            exit 1
            ;;
    esac
}

main "$@"

监控与可观测性

微服务架构需要全面的监控和可观测性来确保系统的健康运行。

分布式监控系统

#!/usr/bin/env python3
"""
微服务分布式监控系统
包括指标收集、日志聚合、链路追踪和告警
"""

import asyncio
import aiohttp
import json
import time
import logging
import uuid
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge, CollectorRegistry
import jaeger_client
from jaeger_client import Config
import structlog
from elasticsearch import AsyncElasticsearch
import redis.asyncio as redis
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

class MetricType(Enum):
    """指标类型"""
    COUNTER = "counter"
    GAUGE = "gauge"
    HISTOGRAM = "histogram"
    SUMMARY = "summary"

class LogLevel(Enum):
    """日志级别"""
    DEBUG = "debug"
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

@dataclass
class MetricData:
    """指标数据"""
    name: str
    type: MetricType
    value: float
    labels: Dict[str, str]
    timestamp: datetime
    help_text: str = ""

@dataclass
class LogEntry:
    """日志条目"""
    timestamp: datetime
    level: LogLevel
    service: str
    message: str
    trace_id: Optional[str] = None
    span_id: Optional[str] = None
    user_id: Optional[str] = None
    request_id: Optional[str] = None
    extra_fields: Dict[str, Any] = None

@dataclass
class TraceSpan:
    """追踪跨度"""
    trace_id: str
    span_id: str
    parent_span_id: Optional[str]
    operation_name: str
    service_name: str
    start_time: datetime
    end_time: Optional[datetime]
    duration: Optional[float]
    tags: Dict[str, Any]
    logs: List[Dict[str, Any]]
    status: str = "ok"

class MetricsCollector:
    """指标收集器"""
    
    def __init__(self):
        self.registry = CollectorRegistry()
        self.metrics = {}
        
        # 预定义指标
        self._setup_default_metrics()
    
    def _setup_default_metrics(self):
        """设置默认指标"""
        # HTTP请求指标
        self.http_requests_total = Counter(
            'http_requests_total',
            'Total HTTP requests',
            ['method', 'endpoint', 'status_code', 'service'],
            registry=self.registry
        )
        
        self.http_request_duration = Histogram(
            'http_request_duration_seconds',
            'HTTP request duration',
            ['method', 'endpoint', 'service'],
            registry=self.registry
        )
        
        # 业务指标
        self.active_users = Gauge(
            'active_users_total',
            'Number of active users',
            ['service'],
            registry=self.registry
        )
        
        self.database_connections = Gauge(
            'database_connections_active',
            'Active database connections',
            ['database', 'service'],
            registry=self.registry
        )
        
        # 系统指标
        self.memory_usage = Gauge(
            'memory_usage_bytes',
            'Memory usage in bytes',
            ['service'],
            registry=self.registry
        )
        
        self.cpu_usage = Gauge(
            'cpu_usage_percent',
            'CPU usage percentage',
            ['service'],
            registry=self.registry
        )
    
    def record_http_request(self, method: str, endpoint: str, 
                          status_code: int, service: str, duration: float):
        """记录HTTP请求指标"""
        self.http_requests_total.labels(
            method=method,
            endpoint=endpoint,
            status_code=str(status_code),
            service=service
        ).inc()
        
        self.http_request_duration.labels(
            method=method,
            endpoint=endpoint,
            service=service
        ).observe(duration)
    
    def set_active_users(self, count: int, service: str):
        """设置活跃用户数"""
        self.active_users.labels(service=service).set(count)
    
    def set_database_connections(self, count: int, database: str, service: str):
        """设置数据库连接数"""
        self.database_connections.labels(
            database=database,
            service=service
        ).set(count)
    
    def set_memory_usage(self, bytes_used: int, service: str):
        """设置内存使用量"""
        self.memory_usage.labels(service=service).set(bytes_used)
    
    def set_cpu_usage(self, percentage: float, service: str):
        """设置CPU使用率"""
        self.cpu_usage.labels(service=service).set(percentage)
    
    def get_metrics(self) -> str:
        """获取Prometheus格式的指标"""
        return prometheus_client.generate_latest(self.registry).decode('utf-8')

class DistributedTracer:
    """分布式追踪器"""
    
    def __init__(self, service_name: str, jaeger_endpoint: str = "http://localhost:14268/api/traces"):
        self.service_name = service_name
        
        # 配置Jaeger
        config = Config(
            config={
                'sampler': {
                    'type': 'const',
                    'param': 1,
                },
                'logging': True,
                'reporter_batch_size': 1,
            },
            service_name=service_name,
            validate=True,
        )
        
        self.tracer = config.initialize_tracer()
        
        # 配置OpenTelemetry
        trace.set_tracer_provider(TracerProvider())
        jaeger_exporter = JaegerExporter(
            agent_host_name="localhost",
            agent_port=6831,
        )
        
        span_processor = BatchSpanProcessor(jaeger_exporter)
        trace.get_tracer_provider().add_span_processor(span_processor)
        
        self.otel_tracer = trace.get_tracer(__name__)
    
    def start_span(self, operation_name: str, parent_span=None, tags: Dict[str, Any] = None) -> Any:
        """开始一个新的跨度"""
        span = self.tracer.start_span(
            operation_name=operation_name,
            child_of=parent_span,
            tags=tags or {}
        )
        
        return span
    
    def finish_span(self, span: Any, tags: Dict[str, Any] = None, logs: List[Dict[str, Any]] = None):
        """结束跨度"""
        if tags:
            for key, value in tags.items():
                span.set_tag(key, value)
        
        if logs:
            for log_entry in logs:
                span.log_kv(log_entry)
        
        span.finish()
    
    def extract_span_context(self, headers: Dict[str, str]) -> Any:
        """从HTTP头中提取跨度上下文"""
        try:
            span_ctx = self.tracer.extract(
                format=jaeger_client.Format.HTTP_HEADERS,
                carrier=headers
            )
            return span_ctx
        except Exception:
            return None
    
    def inject_span_context(self, span: Any, headers: Dict[str, str]) -> Dict[str, str]:
        """将跨度上下文注入HTTP头"""
        self.tracer.inject(
            span_context=span.context,
            format=jaeger_client.Format.HTTP_HEADERS,
            carrier=headers
        )
        return headers

class LogAggregator:
    """日志聚合器"""
    
    def __init__(self, elasticsearch_url: str = "http://localhost:9200"):
        self.es_client = AsyncElasticsearch([elasticsearch_url])
        self.logger = structlog.get_logger()
        
        # 配置结构化日志
        structlog.configure(
            processors=[
                structlog.stdlib.filter_by_level,
                structlog.stdlib.add_logger_name,
                structlog.stdlib.add_log_level,
                structlog.stdlib.PositionalArgumentsFormatter(),
                structlog.processors.TimeStamper(fmt="iso"),
                structlog.processors.StackInfoRenderer(),
                structlog.processors.format_exc_info,
                structlog.processors.UnicodeDecoder(),
                structlog.processors.JSONRenderer()
            ],
            context_class=dict,
            logger_factory=structlog.stdlib.LoggerFactory(),
            wrapper_class=structlog.stdlib.BoundLogger,
            cache_logger_on_first_use=True,
        )
    
    async def log(self, entry: LogEntry):
        """记录日志"""
        # 发送到Elasticsearch
        await self._send_to_elasticsearch(entry)
        
        # 本地日志
        self._log_locally(entry)
    
    async def _send_to_elasticsearch(self, entry: LogEntry):
        """发送日志到Elasticsearch"""
        try:
            index_name = f"microservices-logs-{entry.timestamp.strftime('%Y.%m.%d')}"
            
            doc = {
                'timestamp': entry.timestamp.isoformat(),
                'level': entry.level.value,
                'service': entry.service,
                'message': entry.message,
                'trace_id': entry.trace_id,
                'span_id': entry.span_id,
                'user_id': entry.user_id,
                'request_id': entry.request_id,
                **entry.extra_fields or {}
            }
            
            await self.es_client.index(
                index=index_name,
                body=doc
            )
            
        except Exception as e:
            print(f"Failed to send log to Elasticsearch: {e}")
    
    def _log_locally(self, entry: LogEntry):
        """本地日志记录"""
        logger = self.logger.bind(
            service=entry.service,
            trace_id=entry.trace_id,
            span_id=entry.span_id,
            user_id=entry.user_id,
            request_id=entry.request_id,
            **(entry.extra_fields or {})
        )
        
        if entry.level == LogLevel.DEBUG:
            logger.debug(entry.message)
        elif entry.level == LogLevel.INFO:
            logger.info(entry.message)
        elif entry.level == LogLevel.WARNING:
            logger.warning(entry.message)
        elif entry.level == LogLevel.ERROR:
            logger.error(entry.message)
        elif entry.level == LogLevel.CRITICAL:
            logger.critical(entry.message)
    
    async def search_logs(self, query: str, service: str = None, 
                         start_time: datetime = None, end_time: datetime = None,
                         limit: int = 100) -> List[Dict[str, Any]]:
        """搜索日志"""
        try:
            search_body = {
                "query": {
                    "bool": {
                        "must": []
                    }
                },
                "sort": [
                    {"timestamp": {"order": "desc"}}
                ],
                "size": limit
            }
            
            # 添加查询条件
            if query:
                search_body["query"]["bool"]["must"].append({
                    "multi_match": {
                        "query": query,
                        "fields": ["message", "service"]
                    }
                })
            
            if service:
                search_body["query"]["bool"]["must"].append({
                    "term": {"service": service}
                })
            
            if start_time or end_time:
                time_range = {}
                if start_time:
                    time_range["gte"] = start_time.isoformat()
                if end_time:
                    time_range["lte"] = end_time.isoformat()
                
                search_body["query"]["bool"]["must"].append({
                    "range": {"timestamp": time_range}
                })
            
            response = await self.es_client.search(
                index="microservices-logs-*",
                body=search_body
            )
            
            return [hit["_source"] for hit in response["hits"]["hits"]]
            
        except Exception as e:
            print(f"Failed to search logs: {e}")
            return []

class AlertManager:
    """告警管理器"""
    
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis_client = redis.from_url(redis_url)
        self.alert_rules = []
        self.notification_channels = []
    
    def add_alert_rule(self, name: str, condition: str, threshold: float,
                      duration: int = 300, severity: str = "warning"):
        """添加告警规则"""
        rule = {
            'name': name,
            'condition': condition,
            'threshold': threshold,
            'duration': duration,
            'severity': severity,
            'last_triggered': None
        }
        self.alert_rules.append(rule)
    
    def add_notification_channel(self, channel_type: str, config: Dict[str, Any]):
        """添加通知渠道"""
        channel = {
            'type': channel_type,
            'config': config
        }
        self.notification_channels.append(channel)
    
    async def check_alerts(self, metrics: Dict[str, float]):
        """检查告警条件"""
        current_time = datetime.utcnow()
        
        for rule in self.alert_rules:
            try:
                # 简化的条件检查
                if self._evaluate_condition(rule['condition'], metrics, rule['threshold']):
                    # 检查是否需要触发告警
                    if await self._should_trigger_alert(rule, current_time):
                        await self._trigger_alert(rule, metrics)
                        rule['last_triggered'] = current_time
                        
            except Exception as e:
                print(f"Error checking alert rule {rule['name']}: {e}")
    
    def _evaluate_condition(self, condition: str, metrics: Dict[str, float], threshold: float) -> bool:
        """评估告警条件"""
        # 简化实现,实际应该支持更复杂的表达式
        if condition in metrics:
            value = metrics[condition]
            return value > threshold
        return False
    
    async def _should_trigger_alert(self, rule: Dict[str, Any], current_time: datetime) -> bool:
        """检查是否应该触发告警"""
        if rule['last_triggered'] is None:
            return True
        
        time_since_last = (current_time - rule['last_triggered']).total_seconds()
        return time_since_last >= rule['duration']
    
    async def _trigger_alert(self, rule: Dict[str, Any], metrics: Dict[str, float]):
        """触发告警"""
        alert = {
            'id': str(uuid.uuid4()),
            'rule_name': rule['name'],
            'severity': rule['severity'],
            'message': f"Alert: {rule['name']} triggered",
            'timestamp': datetime.utcnow().isoformat(),
            'metrics': metrics
        }
        
        # 存储告警
        await self.redis_client.lpush('alerts', json.dumps(alert))
        
        # 发送通知
        for channel in self.notification_channels:
            await self._send_notification(channel, alert)
    
    async def _send_notification(self, channel: Dict[str, Any], alert: Dict[str, Any]):
        """发送通知"""
        try:
            if channel['type'] == 'webhook':
                await self._send_webhook_notification(channel['config'], alert)
            elif channel['type'] == 'email':
                await self._send_email_notification(channel['config'], alert)
            elif channel['type'] == 'slack':
                await self._send_slack_notification(channel['config'], alert)
                
        except Exception as e:
            print(f"Failed to send notification via {channel['type']}: {e}")
    
    async def _send_webhook_notification(self, config: Dict[str, Any], alert: Dict[str, Any]):
        """发送Webhook通知"""
        async with aiohttp.ClientSession() as session:
            await session.post(config['url'], json=alert)
    
    async def _send_email_notification(self, config: Dict[str, Any], alert: Dict[str, Any]):
        """发送邮件通知"""
        # 实现邮件发送逻辑
        pass
    
    async def _send_slack_notification(self, config: Dict[str, Any], alert: Dict[str, Any]):
        """发送Slack通知"""
        # 实现Slack通知逻辑
        pass

class MonitoringSystem:
    """监控系统"""
    
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.metrics_collector = MetricsCollector()
        self.tracer = DistributedTracer(service_name)
        self.log_aggregator = LogAggregator()
        self.alert_manager = AlertManager()
        
        # 设置默认告警规则
        self._setup_default_alerts()
    
    def _setup_default_alerts(self):
        """设置默认告警规则"""
        self.alert_manager.add_alert_rule(
            name="high_error_rate",
            condition="error_rate",
            threshold=0.05,  # 5%
            duration=300,    # 5分钟
            severity="critical"
        )
        
        self.alert_manager.add_alert_rule(
            name="high_response_time",
            condition="avg_response_time",
            threshold=2.0,   # 2秒
            duration=600,    # 10分钟
            severity="warning"
        )
        
        self.alert_manager.add_alert_rule(
            name="high_memory_usage",
            condition="memory_usage_percent",
            threshold=80.0,  # 80%
            duration=300,    # 5分钟
            severity="warning"
        )
    
    async def start_monitoring(self):
        """启动监控"""
        # 启动指标收集循环
        asyncio.create_task(self._metrics_collection_loop())
        
        # 启动告警检查循环
        asyncio.create_task(self._alert_check_loop())
    
    async def _metrics_collection_loop(self):
        """指标收集循环"""
        while True:
            try:
                # 收集系统指标
                await self._collect_system_metrics()
                
                # 等待下一次收集
                await asyncio.sleep(30)  # 每30秒收集一次
                
            except Exception as e:
                await self.log_aggregator.log(LogEntry(
                    timestamp=datetime.utcnow(),
                    level=LogLevel.ERROR,
                    service=self.service_name,
                    message=f"Metrics collection failed: {str(e)}"
                ))
                await asyncio.sleep(10)
    
    async def _alert_check_loop(self):
        """告警检查循环"""
        while True:
            try:
                # 获取当前指标
                metrics = await self._get_current_metrics()
                
                # 检查告警
                await self.alert_manager.check_alerts(metrics)
                
                # 等待下一次检查
                await asyncio.sleep(60)  # 每分钟检查一次
                
            except Exception as e:
                await self.log_aggregator.log(LogEntry(
                    timestamp=datetime.utcnow(),
                    level=LogLevel.ERROR,
                    service=self.service_name,
                    message=f"Alert check failed: {str(e)}"
                ))
                await asyncio.sleep(30)
    
    async def _collect_system_metrics(self):
        """收集系统指标"""
        import psutil
        
        # CPU使用率
        cpu_percent = psutil.cpu_percent(interval=1)
        self.metrics_collector.set_cpu_usage(cpu_percent, self.service_name)
        
        # 内存使用量
        memory = psutil.virtual_memory()
        self.metrics_collector.set_memory_usage(memory.used, self.service_name)
    
    async def _get_current_metrics(self) -> Dict[str, float]:
        """获取当前指标"""
        import psutil
        
        return {
            'cpu_usage_percent': psutil.cpu_percent(),
            'memory_usage_percent': psutil.virtual_memory().percent,
            'disk_usage_percent': psutil.disk_usage('/').percent,
        }

# 使用示例
async def main():
    """主函数 - 监控系统示例"""
    monitoring = MonitoringSystem("user-service")
    
    # 启动监控
    await monitoring.start_monitoring()
    
    # 模拟一些指标
    monitoring.metrics_collector.record_http_request(
        method="GET",
        endpoint="/api/users",
        status_code=200,
        service="user-service",
        duration=0.5
    )
    
    # 记录日志
    await monitoring.log_aggregator.log(LogEntry(
        timestamp=datetime.utcnow(),
        level=LogLevel.INFO,
        service="user-service",
        message="User service started successfully",
        extra_fields={"version": "1.0.0"}
    ))
    
    print("监控系统启动成功")
    print("指标端点: http://localhost:8080/metrics")
    print("日志查询: http://localhost:9200/microservices-logs-*/_search")
    
    # 保持运行
    await asyncio.Event().wait()

if __name__ == "__main__":
    asyncio.run(main())

总结

微服务架构在云环境中的设计模式涵盖了多个关键领域:

核心架构要素

  1. 服务拆分与设计

    • 基于业务能力的服务边界划分
    • 数据库分离和事务管理
    • 服务间通信协议设计
  2. 通信模式

    • 同步通信(REST、gRPC)
    • 异步通信(消息队列、事件驱动)
    • 服务发现与注册机制
  3. API网关

    • 统一入口和路由管理
    • 认证授权和限流控制
    • 负载均衡和熔断保护
  4. 服务网格

    • 服务间通信基础设施
    • 流量管理和安全策略
    • 可观测性和监控集成
  5. 监控与可观测性

    • 分布式追踪和指标收集
    • 日志聚合和分析
    • 告警和通知机制

最佳实践

  1. 架构设计原则

    • 单一职责和松耦合
    • 故障隔离和容错设计
    • 数据一致性和事务管理
  2. 运维管理

    • 自动化部署和扩缩容
    • 配置管理和版本控制
    • 灾难恢复和备份策略
  3. 团队协作

    • DevOps文化和实践
    • API设计和文档管理
    • 持续集成和交付

通过合理的架构设计和工具选择,可以构建出高可用、高性能、易维护的微服务系统,为业务发展提供强有力的技术支撑。

异步通信模式

#!/usr/bin/env python3
"""
微服务异步通信实现
包括消息队列、事件驱动架构等异步通信模式
"""

import asyncio
import json
import uuid
from typing import Dict, Any, Callable, List, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
from enum import Enum
import logging

class MessageType(Enum):
    """消息类型"""
    COMMAND = "command"
    EVENT = "event"
    QUERY = "query"

class DeliveryMode(Enum):
    """投递模式"""
    AT_MOST_ONCE = "at_most_once"
    AT_LEAST_ONCE = "at_least_once"
    EXACTLY_ONCE = "exactly_once"

@dataclass
class Message:
    """消息"""
    id: str
    type: MessageType
    topic: str
    payload: Dict[str, Any]
    correlation_id: Optional[str] = None
    reply_to: Optional[str] = None
    timestamp: datetime = None
    headers: Dict[str, str] = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.utcnow()
        if self.headers is None:
            self.headers = {}

@dataclass
class EventMessage(Message):
    """事件消息"""
    event_type: str
    aggregate_id: str
    version: int
    
    def __post_init__(self):
        super().__post_init__()
        self.type = MessageType.EVENT

class MessageBroker:
    """消息代理"""
    
    def __init__(self):
        self.topics = {}
        self.subscribers = {}
        self.message_store = {}
        self.logger = logging.getLogger(__name__)
    
    def create_topic(self, topic_name: str, partitions: int = 1):
        """创建主题"""
        if topic_name not in self.topics:
            self.topics[topic_name] = {
                'partitions': partitions,
                'messages': [[] for _ in range(partitions)]
            }
            self.subscribers[topic_name] = []
            self.logger.info(f"Created topic: {topic_name} with {partitions} partitions")
    
    def subscribe(self, topic_name: str, handler: Callable[[Message], None],
                 consumer_group: str = "default"):
        """订阅主题"""
        if topic_name not in self.topics:
            self.create_topic(topic_name)
        
        subscriber = {
            'handler': handler,
            'consumer_group': consumer_group,
            'id': str(uuid.uuid4())
        }
        
        self.subscribers[topic_name].append(subscriber)
        self.logger.info(f"Subscribed to topic: {topic_name}, group: {consumer_group}")
        
        return subscriber['id']
    
    async def publish(self, message: Message, delivery_mode: DeliveryMode = DeliveryMode.AT_LEAST_ONCE):
        """发布消息"""
        topic_name = message.topic
        
        if topic_name not in self.topics:
            self.create_topic(topic_name)
        
        # 选择分区(简单的轮询策略)
        partition_count = self.topics[topic_name]['partitions']
        partition = hash(message.id) % partition_count
        
        # 存储消息
        self.topics[topic_name]['messages'][partition].append(message)
        self.message_store[message.id] = message
        
        # 异步投递给订阅者
        await self._deliver_message(message, delivery_mode)
        
        self.logger.info(f"Published message {message.id} to topic {topic_name}")
    
    async def _deliver_message(self, message: Message, delivery_mode: DeliveryMode):
        """投递消息给订阅者"""
        topic_name = message.topic
        
        if topic_name not in self.subscribers:
            return
        
        delivery_tasks = []
        
        for subscriber in self.subscribers[topic_name]:
            task = asyncio.create_task(
                self._deliver_to_subscriber(message, subscriber, delivery_mode)
            )
            delivery_tasks.append(task)
        
        # 等待所有投递完成
        if delivery_tasks:
            await asyncio.gather(*delivery_tasks, return_exceptions=True)
    
    async def _deliver_to_subscriber(self, message: Message, subscriber: Dict,
                                   delivery_mode: DeliveryMode):
        """投递消息给特定订阅者"""
        try:
            if delivery_mode == DeliveryMode.AT_LEAST_ONCE:
                # 至少一次投递,可能重复
                await self._invoke_handler(message, subscriber)
            elif delivery_mode == DeliveryMode.AT_MOST_ONCE:
                # 至多一次投递,可能丢失
                try:
                    await self._invoke_handler(message, subscriber)
                except Exception:
                    # 忽略错误,不重试
                    pass
            else:  # EXACTLY_ONCE
                # 精确一次投递(简化实现)
                await self._invoke_handler(message, subscriber)
                
        except Exception as e:
            self.logger.error(f"Failed to deliver message {message.id} to subscriber {subscriber['id']}: {str(e)}")
    
    async def _invoke_handler(self, message: Message, subscriber: Dict):
        """调用处理器"""
        handler = subscriber['handler']
        
        if asyncio.iscoroutinefunction(handler):
            await handler(message)
        else:
            # 在线程池中执行同步处理器
            loop = asyncio.get_event_loop()
            await loop.run_in_executor(None, handler, message)

class EventStore:
    """事件存储"""
    
    def __init__(self):
        self.events = {}
        self.snapshots = {}
        self.logger = logging.getLogger(__name__)
    
    async def append_events(self, aggregate_id: str, events: List[EventMessage],
                          expected_version: int = -1):
        """追加事件"""
        if aggregate_id not in self.events:
            self.events[aggregate_id] = []
        
        current_version = len(self.events[aggregate_id])
        
        if expected_version != -1 and current_version != expected_version:
            raise Exception(f"Concurrency conflict: expected version {expected_version}, actual {current_version}")
        
        # 设置事件版本
        for i, event in enumerate(events):
            event.version = current_version + i + 1
        
        self.events[aggregate_id].extend(events)
        self.logger.info(f"Appended {len(events)} events to aggregate {aggregate_id}")
        
        return current_version + len(events)
    
    async def get_events(self, aggregate_id: str, from_version: int = 0) -> List[EventMessage]:
        """获取事件"""
        if aggregate_id not in self.events:
            return []
        
        events = self.events[aggregate_id]
        return [event for event in events if event.version > from_version]
    
    async def save_snapshot(self, aggregate_id: str, snapshot: Dict[str, Any], version: int):
        """保存快照"""
        self.snapshots[aggregate_id] = {
            'data': snapshot,
            'version': version,
            'timestamp': datetime.utcnow()
        }
        self.logger.info(f"Saved snapshot for aggregate {aggregate_id} at version {version}")
    
    async def get_snapshot(self, aggregate_id: str) -> Optional[Dict[str, Any]]:
        """获取快照"""
        return self.snapshots.get(aggregate_id)

class SagaOrchestrator:
    """Saga编排器"""
    
    def __init__(self, message_broker: MessageBroker):
        self.message_broker = message_broker
        self.saga_instances = {}
        self.logger = logging.getLogger(__name__)
    
    async def start_saga(self, saga_id: str, saga_definition: Dict[str, Any],
                        initial_data: Dict[str, Any]):
        """启动Saga"""
        saga_instance = {
            'id': saga_id,
            'definition': saga_definition,
            'current_step': 0,
            'data': initial_data,
            'status': 'running',
            'completed_steps': [],
            'failed_steps': []
        }
        
        self.saga_instances[saga_id] = saga_instance
        
        # 执行第一步
        await self._execute_next_step(saga_instance)
        
        self.logger.info(f"Started saga {saga_id}")
    
    async def _execute_next_step(self, saga_instance: Dict[str, Any]):
        """执行下一步"""
        definition = saga_instance['definition']
        current_step = saga_instance['current_step']
        
        if current_step >= len(definition['steps']):
            # Saga完成
            saga_instance['status'] = 'completed'
            self.logger.info(f"Saga {saga_instance['id']} completed successfully")
            return
        
        step = definition['steps'][current_step]
        
        try:
            # 创建命令消息
            command_message = Message(
                id=str(uuid.uuid4()),
                type=MessageType.COMMAND,
                topic=step['service'],
                payload={
                    'command': step['action'],
                    'data': saga_instance['data'],
                    'saga_id': saga_instance['id'],
                    'step_id': current_step
                },
                correlation_id=saga_instance['id']
            )
            
            # 发布命令
            await self.message_broker.publish(command_message)
            
            self.logger.info(f"Executed step {current_step} of saga {saga_instance['id']}")
            
        except Exception as e:
            # 步骤失败,开始补偿
            saga_instance['failed_steps'].append(current_step)
            await self._start_compensation(saga_instance)
            self.logger.error(f"Step {current_step} of saga {saga_instance['id']} failed: {str(e)}")
    
    async def handle_step_completion(self, saga_id: str, step_id: int, 
                                   success: bool, result_data: Dict[str, Any]):
        """处理步骤完成"""
        if saga_id not in self.saga_instances:
            return
        
        saga_instance = self.saga_instances[saga_id]
        
        if success:
            saga_instance['completed_steps'].append(step_id)
            saga_instance['data'].update(result_data)
            saga_instance['current_step'] += 1
            
            # 执行下一步
            await self._execute_next_step(saga_instance)
        else:
            saga_instance['failed_steps'].append(step_id)
            await self._start_compensation(saga_instance)
    
    async def _start_compensation(self, saga_instance: Dict[str, Any]):
        """开始补偿"""
        saga_instance['status'] = 'compensating'
        
        # 按相反顺序执行补偿动作
        for step_id in reversed(saga_instance['completed_steps']):
            step = saga_instance['definition']['steps'][step_id]
            
            if 'compensation' in step:
                compensation_message = Message(
                    id=str(uuid.uuid4()),
                    type=MessageType.COMMAND,
                    topic=step['service'],
                    payload={
                        'command': step['compensation'],
                        'data': saga_instance['data'],
                        'saga_id': saga_instance['id'],
                        'step_id': step_id
                    },
                    correlation_id=saga_instance['id']
                )
                
                await self.message_broker.publish(compensation_message)
        
        saga_instance['status'] = 'compensated'
        self.logger.info(f"Saga {saga_instance['id']} compensated")

async def main():
    """主函数 - 演示异步通信"""
    # 创建消息代理
    broker = MessageBroker()
    
    # 创建事件存储
    event_store = EventStore()
    
    # 创建Saga编排器
    saga_orchestrator = SagaOrchestrator(broker)
    
    # 定义消息处理器
    async def order_created_handler(message: Message):
        print(f"处理订单创建事件: {message.payload}")
        
        # 发布库存检查命令
        inventory_command = Message(
            id=str(uuid.uuid4()),
            type=MessageType.COMMAND,
            topic="inventory-service",
            payload={
                "command": "check_inventory",
                "order_id": message.payload["order_id"],
                "items": message.payload["items"]
            },
            correlation_id=message.correlation_id
        )
        
        await broker.publish(inventory_command)
    
    async def inventory_checked_handler(message: Message):
        print(f"处理库存检查结果: {message.payload}")
        
        if message.payload["available"]:
            # 发布支付处理命令
            payment_command = Message(
                id=str(uuid.uuid4()),
                type=MessageType.COMMAND,
                topic="payment-service",
                payload={
                    "command": "process_payment",
                    "order_id": message.payload["order_id"],
                    "amount": message.payload["total_amount"]
                },
                correlation_id=message.correlation_id
            )
            
            await broker.publish(payment_command)
        else:
            # 发布订单取消事件
            order_cancelled_event = EventMessage(
                id=str(uuid.uuid4()),
                topic="order-events",
                payload={
                    "order_id": message.payload["order_id"],
                    "reason": "insufficient_inventory"
                },
                event_type="OrderCancelled",
                aggregate_id=message.payload["order_id"],
                version=1
            )
            
            await broker.publish(order_cancelled_event)
    
    # 订阅事件
    broker.subscribe("order-events", order_created_handler)
    broker.subscribe("inventory-events", inventory_checked_handler)
    
    # 模拟订单创建事件
    order_created_event = EventMessage(
        id=str(uuid.uuid4()),
        topic="order-events",
        payload={
            "order_id": "order-12345",
            "user_id": "user-001",
            "items": [
                {"product_id": "prod-001", "quantity": 2},
                {"product_id": "prod-002", "quantity": 1}
            ],
            "total_amount": 299.99
        },
        event_type="OrderCreated",
        aggregate_id="order-12345",
        version=1
    )
    
    print("发布订单创建事件...")
    await broker.publish(order_created_event)
    
    # 存储事件
    await event_store.append_events("order-12345", [order_created_event])
    
    # 模拟库存检查结果
    await asyncio.sleep(1)
    
    inventory_checked_event = Message(
        id=str(uuid.uuid4()),
        type=MessageType.EVENT,
        topic="inventory-events",
        payload={
            "order_id": "order-12345",
            "available": True,
            "total_amount": 299.99
        },
        correlation_id=order_created_event.id
    )
    
    print("发布库存检查结果...")
    await broker.publish(inventory_checked_event)
    
    # 演示Saga模式
    print("\n演示Saga编排...")
    
    saga_definition = {
        "steps": [
            {
                "service": "inventory-service",
                "action": "reserve_inventory",
                "compensation": "release_inventory"
            },
            {
                "service": "payment-service",
                "action": "charge_payment",
                "compensation": "refund_payment"
            },
            {
                "service": "shipping-service",
                "action": "create_shipment",
                "compensation": "cancel_shipment"
            }
        ]
    }
    
    await saga_orchestrator.start_saga(
        saga_id="saga-order-12345",
        saga_definition=saga_definition,
        initial_data={
            "order_id": "order-12345",
            "user_id": "user-001",
            "total_amount": 299.99
        }
    )
    
    # 等待异步处理完成
    await asyncio.sleep(2)
    
    print("\n=== 异步通信演示完成 ===")

if __name__ == "__main__":
    asyncio.run(main())

数据管理策略

微服务架构中的数据管理是一个复杂的挑战,需要平衡数据一致性、性能和服务自治性。

数据库分离模式

#!/usr/bin/env python3
"""
微服务数据管理策略
包括数据库分离、数据同步、分布式事务等
"""

import asyncio
import json
import uuid
from typing import Dict, Any, List, Optional, Union
from dataclasses import dataclass, asdict
from datetime import datetime
from enum import Enum
import logging
from abc import ABC, abstractmethod

class ConsistencyLevel(Enum):
    """一致性级别"""
    STRONG = "strong"
    EVENTUAL = "eventual"
    WEAK = "weak"

class TransactionStatus(Enum):
    """事务状态"""
    PENDING = "pending"
    COMMITTED = "committed"
    ABORTED = "aborted"
    COMPENSATED = "compensated"

@dataclass
class DataChangeEvent:
    """数据变更事件"""
    id: str
    aggregate_id: str
    event_type: str
    data: Dict[str, Any]
    timestamp: datetime
    version: int
    service_name: str

class DatabaseRepository(ABC):
    """数据库仓储抽象基类"""
    
    @abstractmethod
    async def save(self, entity: Dict[str, Any]) -> str:
        """保存实体"""
        pass
    
    @abstractmethod
    async def find_by_id(self, entity_id: str) -> Optional[Dict[str, Any]]:
        """根据ID查找实体"""
        pass
    
    @abstractmethod
    async def find_by_criteria(self, criteria: Dict[str, Any]) -> List[Dict[str, Any]]:
        """根据条件查找实体"""
        pass
    
    @abstractmethod
    async def update(self, entity_id: str, updates: Dict[str, Any]) -> bool:
        """更新实体"""
        pass
    
    @abstractmethod
    async def delete(self, entity_id: str) -> bool:
        """删除实体"""
        pass

class InMemoryRepository(DatabaseRepository):
    """内存数据库仓储实现"""
    
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.data = {}
        self.logger = logging.getLogger(__name__)
    
    async def save(self, entity: Dict[str, Any]) -> str:
        """保存实体"""
        entity_id = entity.get('id', str(uuid.uuid4()))
        entity['id'] = entity_id
        entity['created_at'] = datetime.utcnow().isoformat()
        entity['updated_at'] = datetime.utcnow().isoformat()
        
        self.data[entity_id] = entity.copy()
        self.logger.info(f"Saved entity {entity_id} in {self.service_name}")
        
        return entity_id
    
    async def find_by_id(self, entity_id: str) -> Optional[Dict[str, Any]]:
        """根据ID查找实体"""
        return self.data.get(entity_id)
    
    async def find_by_criteria(self, criteria: Dict[str, Any]) -> List[Dict[str, Any]]:
        """根据条件查找实体"""
        results = []
        
        for entity in self.data.values():
            match = True
            for key, value in criteria.items():
                if key not in entity or entity[key] != value:
                    match = False
                    break
            
            if match:
                results.append(entity.copy())
        
        return results
    
    async def update(self, entity_id: str, updates: Dict[str, Any]) -> bool:
        """更新实体"""
        if entity_id not in self.data:
            return False
        
        self.data[entity_id].update(updates)
        self.data[entity_id]['updated_at'] = datetime.utcnow().isoformat()
        
        self.logger.info(f"Updated entity {entity_id} in {self.service_name}")
        return True
    
    async def delete(self, entity_id: str) -> bool:
        """删除实体"""
        if entity_id in self.data:
            del self.data[entity_id]
            self.logger.info(f"Deleted entity {entity_id} from {self.service_name}")
            return True
        return False

class EventSourcingRepository:
    """事件溯源仓储"""
    
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.event_store = {}
        self.snapshots = {}
        self.logger = logging.getLogger(__name__)
    
    async def append_events(self, aggregate_id: str, events: List[DataChangeEvent],
                          expected_version: int = -1) -> int:
        """追加事件"""
        if aggregate_id not in self.event_store:
            self.event_store[aggregate_id] = []
        
        current_version = len(self.event_store[aggregate_id])
        
        if expected_version != -1 and current_version != expected_version:
            raise Exception(f"Concurrency conflict: expected {expected_version}, got {current_version}")
        
        # 设置事件版本
        for i, event in enumerate(events):
            event.version = current_version + i + 1
        
        self.event_store[aggregate_id].extend(events)
        self.logger.info(f"Appended {len(events)} events to aggregate {aggregate_id}")
        
        return current_version + len(events)
    
    async def get_events(self, aggregate_id: str, from_version: int = 0) -> List[DataChangeEvent]:
        """获取事件"""
        if aggregate_id not in self.event_store:
            return []
        
        events = self.event_store[aggregate_id]
        return [event for event in events if event.version > from_version]
    
    async def rebuild_aggregate(self, aggregate_id: str) -> Optional[Dict[str, Any]]:
        """重建聚合"""
        events = await self.get_events(aggregate_id)
        
        if not events:
            return None
        
        # 从快照开始(如果有)
        snapshot = self.snapshots.get(aggregate_id)
        if snapshot:
            aggregate = snapshot['data'].copy()
            from_version = snapshot['version']
            events = [e for e in events if e.version > from_version]
        else:
            aggregate = {'id': aggregate_id}
        
        # 应用事件
        for event in events:
            aggregate = self._apply_event(aggregate, event)
        
        return aggregate
    
    def _apply_event(self, aggregate: Dict[str, Any], event: DataChangeEvent) -> Dict[str, Any]:
        """应用事件到聚合"""
        if event.event_type == "Created":
            aggregate.update(event.data)
        elif event.event_type == "Updated":
            aggregate.update(event.data)
        elif event.event_type == "Deleted":
            aggregate['deleted'] = True
            aggregate['deleted_at'] = event.timestamp.isoformat()
        
        aggregate['version'] = event.version
        aggregate['last_modified'] = event.timestamp.isoformat()
        
        return aggregate
    
    async def save_snapshot(self, aggregate_id: str, aggregate: Dict[str, Any], version: int):
        """保存快照"""
        self.snapshots[aggregate_id] = {
            'data': aggregate.copy(),
            'version': version,
            'timestamp': datetime.utcnow()
        }
        self.logger.info(f"Saved snapshot for aggregate {aggregate_id} at version {version}")

class DataSynchronizer:
    """数据同步器"""
    
    def __init__(self):
        self.sync_rules = {}
        self.repositories = {}
        self.logger = logging.getLogger(__name__)
    
    def register_repository(self, service_name: str, repository: DatabaseRepository):
        """注册仓储"""
        self.repositories[service_name] = repository
    
    def add_sync_rule(self, source_service: str, target_service: str,
                     entity_type: str, mapping_function: callable,
                     consistency_level: ConsistencyLevel = ConsistencyLevel.EVENTUAL):
        """添加同步规则"""
        rule_id = f"{source_service}->{target_service}:{entity_type}"
        
        self.sync_rules[rule_id] = {
            'source_service': source_service,
            'target_service': target_service,
            'entity_type': entity_type,
            'mapping_function': mapping_function,
            'consistency_level': consistency_level
        }
        
        self.logger.info(f"Added sync rule: {rule_id}")
    
    async def sync_data_change(self, source_service: str, event: DataChangeEvent):
        """同步数据变更"""
        # 查找相关的同步规则
        relevant_rules = [
            rule for rule_id, rule in self.sync_rules.items()
            if rule['source_service'] == source_service and 
               rule['entity_type'] == event.event_type
        ]
        
        sync_tasks = []
        
        for rule in relevant_rules:
            task = asyncio.create_task(
                self._execute_sync_rule(rule, event)
            )
            sync_tasks.append(task)
        
        if sync_tasks:
            if rule['consistency_level'] == ConsistencyLevel.STRONG:
                # 强一致性:等待所有同步完成
                await asyncio.gather(*sync_tasks)
            else:
                # 最终一致性:异步执行
                asyncio.create_task(asyncio.gather(*sync_tasks, return_exceptions=True))
    
    async def _execute_sync_rule(self, rule: Dict[str, Any], event: DataChangeEvent):
        """执行同步规则"""
        try:
            target_service = rule['target_service']
            mapping_function = rule['mapping_function']
            
            if target_service not in self.repositories:
                self.logger.error(f"Target repository not found: {target_service}")
                return
            
            target_repo = self.repositories[target_service]
            
            # 应用映射函数
            mapped_data = mapping_function(event.data)
            
            if event.event_type in ["Created", "Updated"]:
                # 检查目标实体是否存在
                existing_entity = await target_repo.find_by_id(event.aggregate_id)
                
                if existing_entity:
                    await target_repo.update(event.aggregate_id, mapped_data)
                else:
                    mapped_data['id'] = event.aggregate_id
                    await target_repo.save(mapped_data)
                    
            elif event.event_type == "Deleted":
                await target_repo.delete(event.aggregate_id)
            
            self.logger.info(f"Synced data from {rule['source_service']} to {target_service}")
            
        except Exception as e:
            self.logger.error(f"Sync failed: {str(e)}")
            # 这里可以实现重试机制或死信队列

class DistributedTransactionManager:
    """分布式事务管理器(2PC实现)"""
    
    def __init__(self):
        self.transactions = {}
        self.participants = {}
        self.logger = logging.getLogger(__name__)
    
    def register_participant(self, service_name: str, repository: DatabaseRepository):
        """注册事务参与者"""
        self.participants[service_name] = repository
    
    async def begin_transaction(self, transaction_id: str, operations: List[Dict[str, Any]]) -> bool:
        """开始分布式事务"""
        transaction = {
            'id': transaction_id,
            'status': TransactionStatus.PENDING,
            'operations': operations,
            'participants': set(),
            'prepared_participants': set(),
            'start_time': datetime.utcnow()
        }
        
        # 收集参与者
        for operation in operations:
            service_name = operation['service']
            transaction['participants'].add(service_name)
        
        self.transactions[transaction_id] = transaction
        
        try:
            # Phase 1: Prepare
            prepare_success = await self._prepare_phase(transaction)
            
            if prepare_success:
                # Phase 2: Commit
                await self._commit_phase(transaction)
                transaction['status'] = TransactionStatus.COMMITTED
                self.logger.info(f"Transaction {transaction_id} committed successfully")
                return True
            else:
                # Phase 2: Abort
                await self._abort_phase(transaction)
                transaction['status'] = TransactionStatus.ABORTED
                self.logger.info(f"Transaction {transaction_id} aborted")
                return False
                
        except Exception as e:
            await self._abort_phase(transaction)
            transaction['status'] = TransactionStatus.ABORTED
            self.logger.error(f"Transaction {transaction_id} failed: {str(e)}")
            return False
    
    async def _prepare_phase(self, transaction: Dict[str, Any]) -> bool:
        """准备阶段"""
        prepare_tasks = []
        
        for participant in transaction['participants']:
            if participant in self.participants:
                task = asyncio.create_task(
                    self._prepare_participant(transaction, participant)
                )
                prepare_tasks.append(task)
        
        results = await asyncio.gather(*prepare_tasks, return_exceptions=True)
        
        # 检查所有参与者是否都准备好
        for result in results:
            if isinstance(result, Exception) or not result:
                return False
        
        return True
    
    async def _prepare_participant(self, transaction: Dict[str, Any], participant: str) -> bool:
        """准备参与者"""
        try:
            # 这里应该实现具体的准备逻辑
            # 例如:锁定资源、验证操作等
            
            # 模拟准备过程
            await asyncio.sleep(0.1)
            
            transaction['prepared_participants'].add(participant)
            self.logger.info(f"Participant {participant} prepared for transaction {transaction['id']}")
            return True
            
        except Exception as e:
            self.logger.error(f"Participant {participant} failed to prepare: {str(e)}")
            return False
    
    async def _commit_phase(self, transaction: Dict[str, Any]):
        """提交阶段"""
        commit_tasks = []
        
        for participant in transaction['prepared_participants']:
            task = asyncio.create_task(
                self._commit_participant(transaction, participant)
            )
            commit_tasks.append(task)
        
        await asyncio.gather(*commit_tasks, return_exceptions=True)
    
    async def _commit_participant(self, transaction: Dict[str, Any], participant: str):
        """提交参与者"""
        try:
            repository = self.participants[participant]
            
            # 执行该参与者的操作
            for operation in transaction['operations']:
                if operation['service'] == participant:
                    if operation['type'] == 'create':
                        await repository.save(operation['data'])
                    elif operation['type'] == 'update':
                        await repository.update(operation['entity_id'], operation['data'])
                    elif operation['type'] == 'delete':
                        await repository.delete(operation['entity_id'])
            
            self.logger.info(f"Participant {participant} committed transaction {transaction['id']}")
            
        except Exception as e:
            self.logger.error(f"Participant {participant} failed to commit: {str(e)}")
    
    async def _abort_phase(self, transaction: Dict[str, Any]):
        """中止阶段"""
        abort_tasks = []
        
        for participant in transaction['prepared_participants']:
            task = asyncio.create_task(
                self._abort_participant(transaction, participant)
            )
            abort_tasks.append(task)
        
        await asyncio.gather(*abort_tasks, return_exceptions=True)
    
    async def _abort_participant(self, transaction: Dict[str, Any], participant: str):
        """中止参与者"""
        try:
            # 这里应该实现回滚逻辑
            # 例如:释放锁定的资源等
            
            self.logger.info(f"Participant {participant} aborted transaction {transaction['id']}")
            
        except Exception as e:
            self.logger.error(f"Participant {participant} failed to abort: {str(e)}")

async def main():
    """主函数 - 演示数据管理策略"""
    
    # 创建仓储
    user_repo = InMemoryRepository("user-service")
    order_repo = InMemoryRepository("order-service")
    billing_repo = InMemoryRepository("billing-service")
    
    # 创建事件溯源仓储
    event_repo = EventSourcingRepository("event-store")
    
    # 创建数据同步器
    synchronizer = DataSynchronizer()
    synchronizer.register_repository("user-service", user_repo)
    synchronizer.register_repository("order-service", order_repo)
    synchronizer.register_repository("billing-service", billing_repo)
    
    # 定义数据映射函数
    def user_to_billing_mapping(user_data: Dict[str, Any]) -> Dict[str, Any]:
        """用户数据到账单服务的映射"""
        return {
            'customer_id': user_data['id'],
            'customer_name': user_data['name'],
            'customer_email': user_data['email'],
            'billing_address': user_data.get('address', '')
        }
    
    def order_to_billing_mapping(order_data: Dict[str, Any]) -> Dict[str, Any]:
        """订单数据到账单服务的映射"""
        return {
            'order_id': order_data['id'],
            'customer_id': order_data['user_id'],
            'amount': order_data['total_amount'],
            'items': order_data['items']
        }
    
    # 添加同步规则
    synchronizer.add_sync_rule(
        source_service="user-service",
        target_service="billing-service",
        entity_type="UserCreated",
        mapping_function=user_to_billing_mapping,
        consistency_level=ConsistencyLevel.EVENTUAL
    )
    
    synchronizer.add_sync_rule(
        source_service="order-service",
        target_service="billing-service",
        entity_type="OrderCreated",
        mapping_function=order_to_billing_mapping,
        consistency_level=ConsistencyLevel.STRONG
    )
    
    # 演示数据操作
    print("=== 数据管理策略演示 ===")
    
    # 1. 创建用户
    user_data = {
        'name': '张三',
        'email': 'zhangsan@example.com',
        'address': '北京市朝阳区'
    }
    
    user_id = await user_repo.save(user_data)
    print(f"创建用户: {user_id}")
    
    # 触发数据同步
    user_event = DataChangeEvent(
        id=str(uuid.uuid4()),
        aggregate_id=user_id,
        event_type="UserCreated",
        data=user_data,
        timestamp=datetime.utcnow(),
        version=1,
        service_name="user-service"
    )
    
    await synchronizer.sync_data_change("user-service", user_event)
    
    # 2. 创建订单
    order_data = {
        'user_id': user_id,
        'items': [
            {'product_id': 'prod-001', 'quantity': 2, 'price': 99.99},
            {'product_id': 'prod-002', 'quantity': 1, 'price': 199.99}
        ],
        'total_amount': 399.97,
        'status': 'pending'
    }
    
    order_id = await order_repo.save(order_data)
    print(f"创建订单: {order_id}")
    
    # 3. 演示事件溯源
    print("\n=== 事件溯源演示 ===")
    
    # 创建事件序列
    events = [
        DataChangeEvent(
            id=str(uuid.uuid4()),
            aggregate_id=order_id,
            event_type="Created",
            data=order_data,
            timestamp=datetime.utcnow(),
            version=1,
            service_name="order-service"
        ),
        DataChangeEvent(
            id=str(uuid.uuid4()),
            aggregate_id=order_id,
            event_type="Updated",
            data={'status': 'confirmed'},
            timestamp=datetime.utcnow(),
            version=2,
            service_name="order-service"
        ),
        DataChangeEvent(
            id=str(uuid.uuid4()),
            aggregate_id=order_id,
            event_type="Updated",
            data={'status': 'shipped'},
            timestamp=datetime.utcnow(),
            version=3,
            service_name="order-service"
        )
    ]
    
    # 存储事件
    final_version = await event_repo.append_events(order_id, events)
    print(f"存储了 {len(events)} 个事件,最终版本: {final_version}")
    
    # 重建聚合
    rebuilt_order = await event_repo.rebuild_aggregate(order_id)
    print(f"重建的订单状态: {rebuilt_order['status']}")
    
    # 4. 演示分布式事务
    print("\n=== 分布式事务演示 ===")
    
    tx_manager = DistributedTransactionManager()
    tx_manager.register_participant("user-service", user_repo)
    tx_manager.register_participant("order-service", order_repo)
    tx_manager.register_participant("billing-service", billing_repo)
    
    # 定义分布式事务操作
    transaction_operations = [
        {
            'service': 'user-service',
            'type': 'update',
            'entity_id': user_id,
            'data': {'last_order_date': datetime.utcnow().isoformat()}
        },
        {
            'service': 'order-service',
            'type': 'update',
            'entity_id': order_id,
            'data': {'status': 'processing'}
        },
        {
            'service': 'billing-service',
            'type': 'create',
            'data': {
                'order_id': order_id,
                'customer_id': user_id,
                'amount': 399.97,
                'status': 'pending'
            }
        }
    ]
    
    # 执行分布式事务
    transaction_id = str(uuid.uuid4())
    success = await tx_manager.begin_transaction(transaction_id, transaction_operations)
    
    if success:
        print(f"分布式事务 {transaction_id} 执行成功")
    else:
        print(f"分布式事务 {transaction_id} 执行失败")
    
    # 验证数据一致性
    print("\n=== 数据一致性验证 ===")
    
    updated_user = await user_repo.find_by_id(user_id)
    updated_order = await order_repo.find_by_id(order_id)
    billing_records = await billing_repo.find_by_criteria({'customer_id': user_id})
    
    print(f"用户最后订单日期: {updated_user.get('last_order_date', 'N/A')}")
    print(f"订单状态: {updated_order.get('status', 'N/A')}")
    print(f"账单记录数: {len(billing_records)}")

if __name__ == "__main__":
    asyncio.run(main())

服务发现与注册

服务发现是微服务架构中的关键组件,确保服务能够动态地找到和通信。

服务注册中心实现

#!/bin/bash
# 服务发现基础设施部署脚本

set -e

echo "部署服务发现基础设施..."

# 1. 部署Consul集群
deploy_consul() {
    echo "部署Consul服务注册中心..."
    
    # 创建Consul配置
    cat > consul-config.json << EOF
{
  "datacenter": "dc1",
  "data_dir": "/opt/consul/data",
  "log_level": "INFO",
  "server": true,
  "bootstrap_expect": 3,
  "bind_addr": "0.0.0.0",
  "client_addr": "0.0.0.0",
  "retry_join": ["consul-1", "consul-2", "consul-3"],
  "ui_config": {
    "enabled": true
  },
  "connect": {
    "enabled": true
  },
  "ports": {
    "grpc": 8502
  }
}
EOF

    # 部署Consul集群
    docker network create consul-network || true
    
    for i in {1..3}; do
        docker run -d \
            --name consul-$i \
            --network consul-network \
            -p $((8500 + i - 1)):8500 \
            -p $((8600 + i - 1)):8600/udp \
            -v $(pwd)/consul-config.json:/consul/config/config.json \
            consul:latest agent -config-file=/consul/config/config.json
    done
    
    echo "Consul集群部署完成"
}

# 2. 部署Eureka服务注册中心
deploy_eureka() {
    echo "部署Eureka服务注册中心..."
    
    # 创建Eureka配置
    cat > eureka-application.yml << EOF
server:
  port: 8761

eureka:
  instance:
    hostname: localhost
  client:
    registerWithEureka: false
    fetchRegistry: false
    serviceUrl:
      defaultZone: http://\${eureka.instance.hostname}:\${server.port}/eureka/
  server:
    enableSelfPreservation: false
    evictionIntervalTimerInMs: 5000

spring:
  application:
    name: eureka-server

management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics
EOF

    # 使用Docker部署Eureka
    docker run -d \
        --name eureka-server \
        -p 8761:8761 \
        -v $(pwd)/eureka-application.yml:/app/application.yml \
        springcloud/eureka:latest
    
    echo "Eureka服务注册中心部署完成"
}

# 3. 配置负载均衡器
configure_load_balancer() {
    echo "配置负载均衡器..."
    
    # 创建Nginx配置
    cat > nginx.conf << EOF
upstream user-service {
    least_conn;
    server user-service-1:8080;
    server user-service-2:8080;
    server user-service-3:8080;
}

upstream order-service {
    least_conn;
    server order-service-1:8080;
    server order-service-2:8080;
}

server {
    listen 80;
    server_name api.example.com;
    
    location /api/users/ {
        proxy_pass http://user-service/;
        proxy_set_header Host \$host;
        proxy_set_header X-Real-IP \$remote_addr;
        proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
        
        # 健康检查
        proxy_next_upstream error timeout invalid_header http_500 http_502 http_503;
        proxy_connect_timeout 5s;
        proxy_send_timeout 10s;
        proxy_read_timeout 10s;
    }
    
    location /api/orders/ {
        proxy_pass http://order-service/;
        proxy_set_header Host \$host;
        proxy_set_header X-Real-IP \$remote_addr;
        proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
        
        proxy_next_upstream error timeout invalid_header http_500 http_502 http_503;
        proxy_connect_timeout 5s;
        proxy_send_timeout 10s;
        proxy_read_timeout 10s;
    }
    
    # 健康检查端点
    location /health {
        access_log off;
        return 200 "healthy\n";
        add_header Content-Type text/plain;
    }
}
EOF

    # 部署Nginx负载均衡器
    docker run -d \
        --name api-gateway \
        -p 80:80 \
        -v $(pwd)/nginx.conf:/etc/nginx/conf.d/default.conf \
        nginx:alpine
    
    echo "负载均衡器配置完成"
}

# 4. 部署服务网格(Istio)
deploy_istio() {
    echo "部署Istio服务网格..."
    
    # 下载并安装Istio
    curl -L https://istio.io/downloadIstio | sh -
    cd istio-*
    export PATH=$PWD/bin:$PATH
    
    # 安装Istio
    istioctl install --set values.defaultRevision=default -y
    
    # 启用自动注入
    kubectl label namespace default istio-injection=enabled
    
    # 创建Gateway配置
    cat > gateway.yaml << EOF
apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
  name: microservices-gateway
spec:
  selector:
    istio: ingressgateway
  servers:
  - port:
      number: 80
      name: http
      protocol: HTTP
    hosts:
    - "*"
---
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: microservices-vs
spec:
  hosts:
  - "*"
  gateways:
  - microservices-gateway
  http:
  - match:
    - uri:
        prefix: /api/users
    route:
    - destination:
        host: user-service
        port:
          number: 8080
  - match:
    - uri:
        prefix: /api/orders
    route:
    - destination:
        host: order-service
        port:
          number: 8080
EOF

    kubectl apply -f gateway.yaml
    
    echo "Istio服务网格部署完成"
}

# 主函数
main() {
    echo "开始部署微服务基础设施..."
    
    # 选择部署方案
    read -p "选择部署方案 (1: Consul, 2: Eureka, 3: Istio): " choice
    
    case $choice in
        1)
            deploy_consul
            configure_load_balancer
            ;;
        2)
            deploy_eureka
            configure_load_balancer
            ;;
        3)
            deploy_istio
            ;;
        *)
            echo "无效选择,部署所有组件..."
            deploy_consul
            deploy_eureka
            configure_load_balancer
            ;;
    esac
    
    echo "微服务基础设施部署完成!"
    echo "访问地址:"
    echo "- Consul UI: http://localhost:8500"
    echo "- Eureka UI: http://localhost:8761"
    echo "- API Gateway: http://localhost"
}

main "$@"

API网关设计

API网关是微服务架构的入口点,负责请求路由、认证授权、限流等功能。

API网关实现

#!/usr/bin/env python3
"""
微服务API网关实现
包括路由、认证、限流、监控等功能
"""

import asyncio
import aiohttp
import jwt
import time
import json
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
import logging
import hashlib
import redis
from aiohttp import web, ClientSession
import yaml

class AuthenticationMethod(Enum):
    """认证方法"""
    JWT = "jwt"
    API_KEY = "api_key"
    OAUTH2 = "oauth2"
    BASIC = "basic"

class RateLimitStrategy(Enum):
    """限流策略"""
    FIXED_WINDOW = "fixed_window"
    SLIDING_WINDOW = "sliding_window"
    TOKEN_BUCKET = "token_bucket"
    LEAKY_BUCKET = "leaky_bucket"

@dataclass
class RouteConfig:
    """路由配置"""
    path: str
    method: str
    service_name: str
    service_url: str
    timeout: int = 30
    retries: int = 3
    auth_required: bool = True
    rate_limit: Optional[Dict[str, Any]] = None
    circuit_breaker: bool = True

@dataclass
class ServiceInstance:
    """服务实例"""
    id: str
    host: str
    port: int
    weight: int = 1
    healthy: bool = True
    last_health_check: datetime = None

class LoadBalancer:
    """负载均衡器"""
    
    def __init__(self, strategy: str = "round_robin"):
        self.strategy = strategy
        self.current_index = 0
    
    def select_instance(self, instances: List[ServiceInstance]) -> Optional[ServiceInstance]:
        """选择服务实例"""
        healthy_instances = [inst for inst in instances if inst.healthy]
        
        if not healthy_instances:
            return None
        
        if self.strategy == "round_robin":
            return self._round_robin(healthy_instances)
        elif self.strategy == "weighted_round_robin":
            return self._weighted_round_robin(healthy_instances)
        elif self.strategy == "least_connections":
            return self._least_connections(healthy_instances)
        else:
            return healthy_instances[0]
    
    def _round_robin(self, instances: List[ServiceInstance]) -> ServiceInstance:
        """轮询算法"""
        instance = instances[self.current_index % len(instances)]
        self.current_index += 1
        return instance
    
    def _weighted_round_robin(self, instances: List[ServiceInstance]) -> ServiceInstance:
        """加权轮询算法"""
        total_weight = sum(inst.weight for inst in instances)
        current_weight = self.current_index % total_weight
        
        cumulative_weight = 0
        for instance in instances:
            cumulative_weight += instance.weight
            if current_weight < cumulative_weight:
                self.current_index += 1
                return instance
        
        return instances[0]
    
    def _least_connections(self, instances: List[ServiceInstance]) -> ServiceInstance:
        """最少连接算法(简化实现)"""
        # 这里应该跟踪每个实例的连接数
        # 简化实现,返回第一个实例
        return instances[0]

class RateLimiter:
    """限流器"""
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
    
    async def is_allowed(self, key: str, limit: int, window: int, 
                        strategy: RateLimitStrategy = RateLimitStrategy.FIXED_WINDOW) -> bool:
        """检查是否允许请求"""
        if strategy == RateLimitStrategy.FIXED_WINDOW:
            return await self._fixed_window(key, limit, window)
        elif strategy == RateLimitStrategy.SLIDING_WINDOW:
            return await self._sliding_window(key, limit, window)
        elif strategy == RateLimitStrategy.TOKEN_BUCKET:
            return await self._token_bucket(key, limit, window)
        else:
            return True
    
    async def _fixed_window(self, key: str, limit: int, window: int) -> bool:
        """固定窗口限流"""
        current_window = int(time.time()) // window
        window_key = f"{key}:{current_window}"
        
        try:
            current_count = await self.redis.get(window_key)
            current_count = int(current_count) if current_count else 0
            
            if current_count >= limit:
                return False
            
            pipe = self.redis.pipeline()
            pipe.incr(window_key)
            pipe.expire(window_key, window)
            await pipe.execute()
            
            return True
            
        except Exception:
            # Redis异常时允许请求通过
            return True
    
    async def _sliding_window(self, key: str, limit: int, window: int) -> bool:
        """滑动窗口限流"""
        now = time.time()
        window_start = now - window
        
        try:
            # 清理过期记录
            await self.redis.zremrangebyscore(key, 0, window_start)
            
            # 检查当前窗口内的请求数
            current_count = await self.redis.zcard(key)
            
            if current_count >= limit:
                return False
            
            # 添加当前请求
            await self.redis.zadd(key, {str(now): now})
            await self.redis.expire(key, window)
            
            return True
            
        except Exception:
            return True
    
    async def _token_bucket(self, key: str, capacity: int, refill_rate: int) -> bool:
        """令牌桶限流"""
        now = time.time()
        
        try:
            bucket_data = await self.redis.hmget(key, 'tokens', 'last_refill')
            
            tokens = float(bucket_data[0]) if bucket_data[0] else capacity
            last_refill = float(bucket_data[1]) if bucket_data[1] else now
            
            # 计算需要添加的令牌数
            time_passed = now - last_refill
            tokens_to_add = time_passed * refill_rate
            tokens = min(capacity, tokens + tokens_to_add)
            
            if tokens < 1:
                return False
            
            # 消费一个令牌
            tokens -= 1
            
            # 更新桶状态
            await self.redis.hmset(key, {
                'tokens': tokens,
                'last_refill': now
            })
            await self.redis.expire(key, 3600)  # 1小时过期
            
            return True
            
        except Exception:
            return True

class CircuitBreaker:
    """熔断器"""
    
    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
    
    def can_execute(self) -> bool:
        """检查是否可以执行请求"""
        if self.state == "CLOSED":
            return True
        elif self.state == "OPEN":
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "HALF_OPEN"
                return True
            return False
        else:  # HALF_OPEN
            return True
    
    def record_success(self):
        """记录成功"""
        if self.state == "HALF_OPEN":
            self.state = "CLOSED"
            self.failure_count = 0
    
    def record_failure(self):
        """记录失败"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = "OPEN"

class APIGateway:
    """API网关"""
    
    def __init__(self, config_file: str):
        self.config = self._load_config(config_file)
        self.routes = {}
        self.services = {}
        self.load_balancer = LoadBalancer()
        self.rate_limiter = None
        self.circuit_breakers = {}
        self.session = None
        self.logger = logging.getLogger(__name__)
        
        # 初始化Redis连接
        if 'redis' in self.config:
            redis_config = self.config['redis']
            self.redis_client = redis.Redis(
                host=redis_config.get('host', 'localhost'),
                port=redis_config.get('port', 6379),
                db=redis_config.get('db', 0)
            )
            self.rate_limiter = RateLimiter(self.redis_client)
    
    def _load_config(self, config_file: str) -> Dict[str, Any]:
        """加载配置文件"""
        with open(config_file, 'r', encoding='utf-8') as f:
            return yaml.safe_load(f)
    
    async def start(self):
        """启动网关"""
        self.session = ClientSession()
        
        # 加载路由配置
        self._load_routes()
        
        # 加载服务配置
        self._load_services()
        
        # 创建Web应用
        app = web.Application(middlewares=[
            self._auth_middleware,
            self._rate_limit_middleware,
            self._logging_middleware
        ])
        
        # 添加路由
        app.router.add_route('*', '/{path:.*}', self._handle_request)
        
        # 启动健康检查
        asyncio.create_task(self._health_check_loop())
        
        return app
    
    def _load_routes(self):
        """加载路由配置"""
        for route_config in self.config.get('routes', []):
            route = RouteConfig(**route_config)
            route_key = f"{route.method}:{route.path}"
            self.routes[route_key] = route
            
            # 初始化熔断器
            if route.circuit_breaker:
                self.circuit_breakers[route_key] = CircuitBreaker()
    
    def _load_services(self):
        """加载服务配置"""
        for service_name, service_config in self.config.get('services', {}).items():
            instances = []
            for instance_config in service_config.get('instances', []):
                instance = ServiceInstance(**instance_config)
                instances.append(instance)
            
            self.services[service_name] = instances
    
    async def _auth_middleware(self, request: web.Request, handler):
        """认证中间件"""
        # 查找匹配的路由
        route = self._find_route(request)
        
        if not route or not route.auth_required:
            return await handler(request)
        
        # 检查认证
        auth_header = request.headers.get('Authorization')
        if not auth_header:
            return web.json_response(
                {'error': 'Missing authorization header'}, 
                status=401
            )
        
        try:
            # JWT认证示例
            if auth_header.startswith('Bearer '):
                token = auth_header[7:]
                payload = jwt.decode(
                    token, 
                    self.config.get('jwt_secret', 'secret'), 
                    algorithms=['HS256']
                )
                request['user'] = payload
            else:
                return web.json_response(
                    {'error': 'Invalid authorization header'}, 
                    status=401
                )
                
        except jwt.InvalidTokenError:
            return web.json_response(
                {'error': 'Invalid token'}, 
                status=401
            )
        
        return await handler(request)
    
    async def _rate_limit_middleware(self, request: web.Request, handler):
        """限流中间件"""
        if not self.rate_limiter:
            return await handler(request)
        
        route = self._find_route(request)
        if not route or not route.rate_limit:
            return await handler(request)
        
        # 生成限流键
        user_id = request.get('user', {}).get('user_id', 'anonymous')
        rate_limit_key = f"rate_limit:{route.service_name}:{user_id}"
        
        # 检查限流
        limit_config = route.rate_limit
        allowed = await self.rate_limiter.is_allowed(
            key=rate_limit_key,
            limit=limit_config['requests'],
            window=limit_config['window'],
            strategy=RateLimitStrategy(limit_config.get('strategy', 'fixed_window'))
        )
        
        if not allowed:
            return web.json_response(
                {'error': 'Rate limit exceeded'}, 
                status=429
            )
        
        return await handler(request)
    
    async def _logging_middleware(self, request: web.Request, handler):
        """日志中间件"""
        start_time = time.time()
        
        try:
            response = await handler(request)
            duration = time.time() - start_time
            
            self.logger.info(
                f"{request.method} {request.path} - "
                f"Status: {response.status} - "
                f"Duration: {duration:.3f}s"
            )
            
            return response
            
        except Exception as e:
            duration = time.time() - start_time
            
            self.logger.error(
                f"{request.method} {request.path} - "
                f"Error: {str(e)} - "
                f"Duration: {duration:.3f}s"
            )
            
            raise
    
    def _find_route(self, request: web.Request) -> Optional[RouteConfig]:
        """查找匹配的路由"""
        method = request.method
        path = request.path
        
        # 精确匹配
        route_key = f"{method}:{path}"
        if route_key in self.routes:
            return self.routes[route_key]
        
        # 前缀匹配
        for route_key, route in self.routes.items():
            route_method, route_path = route_key.split(':', 1)
            if method == route_method and path.startswith(route_path):
                return route
        
        return None
    
    async def _handle_request(self, request: web.Request) -> web.Response:
        """处理请求"""
        route = self._find_route(request)
        
        if not route:
            return web.json_response(
                {'error': 'Route not found'}, 
                status=404
            )
        
        # 检查熔断器
        route_key = f"{request.method}:{route.path}"
        circuit_breaker = self.circuit_breakers.get(route_key)
        
        if circuit_breaker and not circuit_breaker.can_execute():
            return web.json_response(
                {'error': 'Service temporarily unavailable'}, 
                status=503
            )
        
        # 选择服务实例
        instances = self.services.get(route.service_name, [])
        instance = self.load_balancer.select_instance(instances)
        
        if not instance:
            return web.json_response(
                {'error': 'No healthy service instances available'}, 
                status=503
            )
        
        # 转发请求
        try:
            response = await self._forward_request(request, route, instance)
            
            if circuit_breaker:
                circuit_breaker.record_success()
            
            return response
            
        except Exception as e:
            if circuit_breaker:
                circuit_breaker.record_failure()
            
            self.logger.error(f"Request forwarding failed: {str(e)}")
            return web.json_response(
                {'error': 'Internal server error'}, 
                status=500
            )
    
    async def _forward_request(self, request: web.Request, 
                             route: RouteConfig, 
                             instance: ServiceInstance) -> web.Response:
        """转发请求到后端服务"""
        # 构建目标URL
        target_url = f"http://{instance.host}:{instance.port}{request.path_qs}"
        
        # 准备请求头
        headers = dict(request.headers)
        headers['X-Forwarded-For'] = request.remote
        headers['X-Forwarded-Proto'] = request.scheme
        headers['X-Forwarded-Host'] = request.host
        
        # 读取请求体
        body = await request.read() if request.can_read_body else None
        
        # 发送请求
        timeout = aiohttp.ClientTimeout(total=route.timeout)
        
        async with self.session.request(
            method=request.method,
            url=target_url,
            headers=headers,
            data=body,
            timeout=timeout
        ) as response:
            
            # 读取响应
            response_body = await response.read()
            
            # 构建响应
            return web.Response(
                body=response_body,
                status=response.status,
                headers=response.headers
            )
    
    async def _health_check_loop(self):
        """健康检查循环"""
        while True:
            try:
                await self._perform_health_checks()
                await asyncio.sleep(30)  # 每30秒检查一次
            except Exception as e:
                self.logger.error(f"Health check failed: {str(e)}")
                await asyncio.sleep(10)
    
    async def _perform_health_checks(self):
        """执行健康检查"""
        for service_name, instances in self.services.items():
            for instance in instances:
                try:
                    health_url = f"http://{instance.host}:{instance.port}/health"
                    timeout = aiohttp.ClientTimeout(total=5)
                    
                    async with self.session.get(health_url, timeout=timeout) as response:
                        instance.healthy = response.status == 200
                        instance.last_health_check = datetime.utcnow()
                        
                except Exception:
                    instance.healthy = False
                    instance.last_health_check = datetime.utcnow()
    
    async def stop(self):
        """停止网关"""
        if self.session:
            await self.session.close()

# 配置文件示例
GATEWAY_CONFIG = """
jwt_secret: "your-secret-key"

redis:
  host: localhost
  port: 6379
  db: 0

routes:
  - path: "/api/users"
    method: "GET"
    service_name: "user-service"
    service_url: "http://user-service:8080"
    auth_required: true
    rate_limit:
      requests: 100
      window: 60
      strategy: "sliding_window"
    circuit_breaker: true
  
  - path: "/api/orders"
    method: "POST"
    service_name: "order-service"
    service_url: "http://order-service:8080"
    auth_required: true
    rate_limit:
      requests: 50
      window: 60
      strategy: "token_bucket"
    circuit_breaker: true

services:
  user-service:
    instances:
      - id: "user-1"
        host: "user-service-1"
        port: 8080
        weight: 1
      - id: "user-2"
        host: "user-service-2"
        port: 8080
        weight: 1
  
  order-service:
    instances:
      - id: "order-1"
        host: "order-service-1"
        port: 8080
        weight: 2
      - id: "order-2"
        host: "order-service-2"
        port: 8080
        weight: 1
"""

async def main():
    """主函数 - 启动API网关"""
    # 创建配置文件
    with open('gateway-config.yaml', 'w') as f:
        f.write(GATEWAY_CONFIG)
    
    # 创建并启动网关
    gateway = APIGateway('gateway-config.yaml')
    app = await gateway.start()
    
    # 启动Web服务器
    runner = web.AppRunner(app)
    await runner.setup()
    
    site = web.TCPSite(runner, 'localhost', 8080)
    await site.start()
    
    print("API网关启动成功,监听端口: 8080")
    print("访问地址: http://localhost:8080")
    
    try:
        # 保持运行
        await asyncio.Event().wait()
    finally:
        await gateway.stop()
        await runner.cleanup()

if __name__ == "__main__":
    asyncio.run(main())

分享文章