目录
微服务架构概述
微服务架构是一种将单一应用程序开发为一套小型服务的方法,每个服务运行在自己的进程中,并使用轻量级机制(通常是HTTP API)进行通信。
微服务架构图
graph TB
subgraph "客户端层"
Web[Web应用]
Mobile[移动应用]
API[第三方API]
end
subgraph "API网关层"
Gateway[API网关]
LB[负载均衡器]
end
subgraph "微服务层"
UserService[用户服务]
OrderService[订单服务]
PaymentService[支付服务]
ProductService[商品服务]
NotificationService[通知服务]
end
subgraph "数据层"
UserDB[(用户数据库)]
OrderDB[(订单数据库)]
PaymentDB[(支付数据库)]
ProductDB[(商品数据库)]
Cache[(缓存层)]
end
subgraph "基础设施层"
ServiceRegistry[服务注册中心]
ConfigCenter[配置中心]
MessageQueue[消息队列]
Monitoring[监控系统]
end
Web --> Gateway
Mobile --> Gateway
API --> Gateway
Gateway --> LB
LB --> UserService
LB --> OrderService
LB --> PaymentService
LB --> ProductService
LB --> NotificationService
UserService --> UserDB
OrderService --> OrderDB
PaymentService --> PaymentDB
ProductService --> ProductDB
UserService --> Cache
OrderService --> Cache
ProductService --> Cache
UserService --> ServiceRegistry
OrderService --> ServiceRegistry
PaymentService --> ServiceRegistry
ProductService --> ServiceRegistry
NotificationService --> ServiceRegistry
OrderService --> MessageQueue
PaymentService --> MessageQueue
NotificationService --> MessageQueue
微服务架构优势
- 技术多样性:每个服务可以使用最适合的技术栈
- 独立部署:服务可以独立开发、测试和部署
- 故障隔离:单个服务的故障不会影响整个系统
- 团队自治:小团队可以独立负责特定服务
- 可扩展性:可以根据需求独立扩展特定服务
服务拆分与设计原则
合理的服务拆分是微服务架构成功的关键。
服务拆分策略
#!/usr/bin/env python3
"""
微服务拆分分析器
基于业务领域、数据依赖和团队结构分析服务拆分策略
"""
from typing import Dict, List, Set, Tuple
from dataclasses import dataclass
from enum import Enum
import json
class CouplingType(Enum):
"""耦合类型"""
DATA = "data"
FUNCTIONAL = "functional"
TEMPORAL = "temporal"
SPATIAL = "spatial"
@dataclass
class BusinessCapability:
"""业务能力"""
name: str
description: str
data_entities: List[str]
operations: List[str]
dependencies: List[str]
team_ownership: str
complexity_score: int
@dataclass
class ServiceBoundary:
"""服务边界"""
service_name: str
capabilities: List[BusinessCapability]
data_ownership: List[str]
api_contracts: List[str]
coupling_score: float
class MicroserviceDecomposer:
"""微服务拆分器"""
def __init__(self):
self.business_capabilities = []
self.coupling_matrix = {}
self.team_structure = {}
def add_business_capability(self, capability: BusinessCapability):
"""添加业务能力"""
self.business_capabilities.append(capability)
def analyze_coupling(self) -> Dict[str, Dict[str, float]]:
"""分析服务间耦合度"""
coupling_matrix = {}
for cap1 in self.business_capabilities:
coupling_matrix[cap1.name] = {}
for cap2 in self.business_capabilities:
if cap1.name == cap2.name:
coupling_matrix[cap1.name][cap2.name] = 0.0
continue
# 计算数据耦合
data_coupling = self._calculate_data_coupling(cap1, cap2)
# 计算功能耦合
functional_coupling = self._calculate_functional_coupling(cap1, cap2)
# 计算团队耦合
team_coupling = self._calculate_team_coupling(cap1, cap2)
# 综合耦合度
total_coupling = (data_coupling * 0.4 +
functional_coupling * 0.4 +
team_coupling * 0.2)
coupling_matrix[cap1.name][cap2.name] = total_coupling
return coupling_matrix
def _calculate_data_coupling(self, cap1: BusinessCapability,
cap2: BusinessCapability) -> float:
"""计算数据耦合度"""
shared_entities = set(cap1.data_entities) & set(cap2.data_entities)
total_entities = set(cap1.data_entities) | set(cap2.data_entities)
if not total_entities:
return 0.0
return len(shared_entities) / len(total_entities)
def _calculate_functional_coupling(self, cap1: BusinessCapability,
cap2: BusinessCapability) -> float:
"""计算功能耦合度"""
# 检查依赖关系
if cap2.name in cap1.dependencies or cap1.name in cap2.dependencies:
return 0.8
# 检查操作相似性
shared_operations = set(cap1.operations) & set(cap2.operations)
total_operations = set(cap1.operations) | set(cap2.operations)
if not total_operations:
return 0.0
return len(shared_operations) / len(total_operations)
def _calculate_team_coupling(self, cap1: BusinessCapability,
cap2: BusinessCapability) -> float:
"""计算团队耦合度"""
if cap1.team_ownership == cap2.team_ownership:
return 0.2 # 同一团队,低耦合
else:
return 0.8 # 不同团队,高耦合
def suggest_service_boundaries(self, max_coupling_threshold: float = 0.3) -> List[ServiceBoundary]:
"""建议服务边界"""
coupling_matrix = self.analyze_coupling()
service_boundaries = []
processed_capabilities = set()
for capability in self.business_capabilities:
if capability.name in processed_capabilities:
continue
# 找到低耦合的相关能力
related_capabilities = [capability]
processed_capabilities.add(capability.name)
for other_cap in self.business_capabilities:
if (other_cap.name not in processed_capabilities and
coupling_matrix[capability.name][other_cap.name] <= max_coupling_threshold):
related_capabilities.append(other_cap)
processed_capabilities.add(other_cap.name)
# 创建服务边界
service_boundary = self._create_service_boundary(related_capabilities)
service_boundaries.append(service_boundary)
return service_boundaries
def _create_service_boundary(self, capabilities: List[BusinessCapability]) -> ServiceBoundary:
"""创建服务边界"""
# 生成服务名称
service_name = f"{capabilities[0].name.replace('_', '-')}-service"
# 收集数据所有权
data_ownership = []
for cap in capabilities:
data_ownership.extend(cap.data_entities)
data_ownership = list(set(data_ownership))
# 生成API契约
api_contracts = []
for cap in capabilities:
for operation in cap.operations:
api_contracts.append(f"/{cap.name.lower()}/{operation}")
# 计算平均耦合度
coupling_score = sum(cap.complexity_score for cap in capabilities) / len(capabilities)
return ServiceBoundary(
service_name=service_name,
capabilities=capabilities,
data_ownership=data_ownership,
api_contracts=api_contracts,
coupling_score=coupling_score
)
def generate_decomposition_report(self) -> Dict:
"""生成拆分报告"""
service_boundaries = self.suggest_service_boundaries()
coupling_matrix = self.analyze_coupling()
report = {
"decomposition_summary": {
"total_capabilities": len(self.business_capabilities),
"suggested_services": len(service_boundaries),
"average_coupling": self._calculate_average_coupling(coupling_matrix)
},
"service_boundaries": [],
"coupling_analysis": coupling_matrix,
"recommendations": self._generate_recommendations(service_boundaries)
}
for boundary in service_boundaries:
report["service_boundaries"].append({
"service_name": boundary.service_name,
"capabilities": [cap.name for cap in boundary.capabilities],
"data_ownership": boundary.data_ownership,
"api_contracts": boundary.api_contracts,
"coupling_score": boundary.coupling_score
})
return report
def _calculate_average_coupling(self, coupling_matrix: Dict) -> float:
"""计算平均耦合度"""
total_coupling = 0
count = 0
for service1, couplings in coupling_matrix.items():
for service2, coupling in couplings.items():
if service1 != service2:
total_coupling += coupling
count += 1
return total_coupling / count if count > 0 else 0.0
def _generate_recommendations(self, service_boundaries: List[ServiceBoundary]) -> List[str]:
"""生成建议"""
recommendations = []
# 检查服务数量
if len(service_boundaries) > 15:
recommendations.append("服务数量较多,考虑进一步合并相关服务")
elif len(service_boundaries) < 3:
recommendations.append("服务数量较少,可能存在过度聚合")
# 检查耦合度
high_coupling_services = [b for b in service_boundaries if b.coupling_score > 0.7]
if high_coupling_services:
recommendations.append(f"以下服务耦合度较高,建议重新设计: {[s.service_name for s in high_coupling_services]}")
# 检查数据所有权
data_conflicts = self._check_data_ownership_conflicts(service_boundaries)
if data_conflicts:
recommendations.append(f"存在数据所有权冲突: {data_conflicts}")
return recommendations
def _check_data_ownership_conflicts(self, service_boundaries: List[ServiceBoundary]) -> List[str]:
"""检查数据所有权冲突"""
data_ownership_map = {}
conflicts = []
for boundary in service_boundaries:
for data_entity in boundary.data_ownership:
if data_entity in data_ownership_map:
conflicts.append(f"{data_entity} 被多个服务拥有: {data_ownership_map[data_entity]}, {boundary.service_name}")
else:
data_ownership_map[data_entity] = boundary.service_name
return conflicts
def main():
"""主函数 - 演示微服务拆分分析"""
decomposer = MicroserviceDecomposer()
# 定义业务能力
user_management = BusinessCapability(
name="user_management",
description="用户管理",
data_entities=["User", "Profile", "Authentication"],
operations=["create_user", "update_profile", "authenticate"],
dependencies=[],
team_ownership="user_team",
complexity_score=3
)
order_processing = BusinessCapability(
name="order_processing",
description="订单处理",
data_entities=["Order", "OrderItem", "OrderStatus"],
operations=["create_order", "update_order", "cancel_order"],
dependencies=["user_management", "product_catalog", "payment_processing"],
team_ownership="order_team",
complexity_score=5
)
product_catalog = BusinessCapability(
name="product_catalog",
description="商品目录",
data_entities=["Product", "Category", "Inventory"],
operations=["add_product", "update_inventory", "search_products"],
dependencies=[],
team_ownership="product_team",
complexity_score=4
)
payment_processing = BusinessCapability(
name="payment_processing",
description="支付处理",
data_entities=["Payment", "Transaction", "PaymentMethod"],
operations=["process_payment", "refund", "validate_payment"],
dependencies=["user_management"],
team_ownership="payment_team",
complexity_score=6
)
notification_service = BusinessCapability(
name="notification_service",
description="通知服务",
data_entities=["Notification", "Template", "DeliveryLog"],
operations=["send_email", "send_sms", "push_notification"],
dependencies=["user_management"],
team_ownership="platform_team",
complexity_score=2
)
# 添加业务能力
for capability in [user_management, order_processing, product_catalog,
payment_processing, notification_service]:
decomposer.add_business_capability(capability)
# 生成拆分报告
print("生成微服务拆分报告...")
report = decomposer.generate_decomposition_report()
print(f"\n=== 微服务拆分分析报告 ===")
print(f"总业务能力数: {report['decomposition_summary']['total_capabilities']}")
print(f"建议服务数: {report['decomposition_summary']['suggested_services']}")
print(f"平均耦合度: {report['decomposition_summary']['average_coupling']:.2f}")
print(f"\n=== 建议的服务边界 ===")
for service in report['service_boundaries']:
print(f"\n服务名: {service['service_name']}")
print(f" 业务能力: {service['capabilities']}")
print(f" 数据所有权: {service['data_ownership']}")
print(f" 耦合度评分: {service['coupling_score']:.2f}")
print(f"\n=== 优化建议 ===")
for recommendation in report['recommendations']:
print(f"- {recommendation}")
if __name__ == "__main__":
main()
服务通信模式
微服务之间的通信是架构设计的核心考虑因素。
同步通信模式
#!/usr/bin/env python3
"""
微服务同步通信实现
包括HTTP REST API、gRPC等同步通信模式
"""
import asyncio
import aiohttp
import grpc
from typing import Dict, Any, Optional
import json
import time
from dataclasses import dataclass
from enum import Enum
import logging
class CommunicationProtocol(Enum):
"""通信协议"""
HTTP_REST = "http_rest"
GRPC = "grpc"
GRAPHQL = "graphql"
@dataclass
class ServiceEndpoint:
"""服务端点"""
service_name: str
host: str
port: int
protocol: CommunicationProtocol
health_check_path: str = "/health"
@dataclass
class RequestContext:
"""请求上下文"""
correlation_id: str
user_id: Optional[str] = None
tenant_id: Optional[str] = None
timeout: int = 30
retry_count: int = 3
class CircuitBreakerState(Enum):
"""熔断器状态"""
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""熔断器实现"""
def __init__(self, failure_threshold: int = 5,
recovery_timeout: int = 60,
success_threshold: int = 3):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
self.state = CircuitBreakerState.CLOSED
def can_execute(self) -> bool:
"""检查是否可以执行请求"""
if self.state == CircuitBreakerState.CLOSED:
return True
elif self.state == CircuitBreakerState.OPEN:
if (time.time() - self.last_failure_time) > self.recovery_timeout:
self.state = CircuitBreakerState.HALF_OPEN
self.success_count = 0
return True
return False
else: # HALF_OPEN
return True
def record_success(self):
"""记录成功"""
if self.state == CircuitBreakerState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitBreakerState.CLOSED
self.failure_count = 0
elif self.state == CircuitBreakerState.CLOSED:
self.failure_count = 0
def record_failure(self):
"""记录失败"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitBreakerState.OPEN
class ServiceCommunicator:
"""服务通信器"""
def __init__(self):
self.service_registry = {}
self.circuit_breakers = {}
self.session = None
self.logger = logging.getLogger(__name__)
async def __aenter__(self):
"""异步上下文管理器入口"""
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
if self.session:
await self.session.close()
def register_service(self, endpoint: ServiceEndpoint):
"""注册服务端点"""
self.service_registry[endpoint.service_name] = endpoint
self.circuit_breakers[endpoint.service_name] = CircuitBreaker()
async def call_service(self, service_name: str, method: str,
path: str, context: RequestContext,
data: Optional[Dict] = None) -> Dict[str, Any]:
"""调用服务"""
if service_name not in self.service_registry:
raise ValueError(f"Service {service_name} not registered")
endpoint = self.service_registry[service_name]
circuit_breaker = self.circuit_breakers[service_name]
# 检查熔断器状态
if not circuit_breaker.can_execute():
raise Exception(f"Circuit breaker is OPEN for service {service_name}")
try:
if endpoint.protocol == CommunicationProtocol.HTTP_REST:
result = await self._call_http_service(endpoint, method, path, context, data)
elif endpoint.protocol == CommunicationProtocol.GRPC:
result = await self._call_grpc_service(endpoint, method, path, context, data)
else:
raise ValueError(f"Unsupported protocol: {endpoint.protocol}")
circuit_breaker.record_success()
return result
except Exception as e:
circuit_breaker.record_failure()
self.logger.error(f"Service call failed: {service_name}.{method} - {str(e)}")
raise
async def _call_http_service(self, endpoint: ServiceEndpoint, method: str,
path: str, context: RequestContext,
data: Optional[Dict] = None) -> Dict[str, Any]:
"""HTTP服务调用"""
url = f"http://{endpoint.host}:{endpoint.port}{path}"
headers = {
'Content-Type': 'application/json',
'X-Correlation-ID': context.correlation_id,
'X-Request-Timeout': str(context.timeout)
}
if context.user_id:
headers['X-User-ID'] = context.user_id
if context.tenant_id:
headers['X-Tenant-ID'] = context.tenant_id
timeout = aiohttp.ClientTimeout(total=context.timeout)
for attempt in range(context.retry_count):
try:
async with self.session.request(
method=method.upper(),
url=url,
headers=headers,
json=data,
timeout=timeout
) as response:
if response.status == 200:
return await response.json()
elif response.status >= 500:
# 服务器错误,可以重试
if attempt < context.retry_count - 1:
await asyncio.sleep(2 ** attempt) # 指数退避
continue
# 客户端错误或最后一次重试失败
error_text = await response.text()
raise Exception(f"HTTP {response.status}: {error_text}")
except asyncio.TimeoutError:
if attempt < context.retry_count - 1:
await asyncio.sleep(2 ** attempt)
continue
raise Exception(f"Request timeout after {context.retry_count} attempts")
except Exception as e:
if attempt < context.retry_count - 1:
await asyncio.sleep(2 ** attempt)
continue
raise
async def _call_grpc_service(self, endpoint: ServiceEndpoint, method: str,
path: str, context: RequestContext,
data: Optional[Dict] = None) -> Dict[str, Any]:
"""gRPC服务调用"""
# 这里是gRPC调用的示例实现
# 实际实现需要根据具体的protobuf定义
channel_address = f"{endpoint.host}:{endpoint.port}"
try:
async with grpc.aio.insecure_channel(channel_address) as channel:
# 这里需要根据实际的gRPC服务定义来实现
# stub = YourServiceStub(channel)
# response = await stub.YourMethod(request, timeout=context.timeout)
# return MessageToDict(response)
# 示例返回
return {"status": "success", "message": "gRPC call completed"}
except grpc.RpcError as e:
raise Exception(f"gRPC call failed: {e.code()} - {e.details()}")
async def health_check(self, service_name: str) -> bool:
"""健康检查"""
if service_name not in self.service_registry:
return False
endpoint = self.service_registry[service_name]
try:
context = RequestContext(
correlation_id=f"health-check-{int(time.time())}",
timeout=5,
retry_count=1
)
await self.call_service(
service_name=service_name,
method="GET",
path=endpoint.health_check_path,
context=context
)
return True
except Exception:
return False
async def batch_health_check(self) -> Dict[str, bool]:
"""批量健康检查"""
health_status = {}
tasks = []
for service_name in self.service_registry.keys():
task = asyncio.create_task(self.health_check(service_name))
tasks.append((service_name, task))
for service_name, task in tasks:
try:
health_status[service_name] = await task
except Exception:
health_status[service_name] = False
return health_status
async def main():
"""主函数 - 演示服务通信"""
async with ServiceCommunicator() as communicator:
# 注册服务端点
user_service = ServiceEndpoint(
service_name="user-service",
host="localhost",
port=8001,
protocol=CommunicationProtocol.HTTP_REST
)
order_service = ServiceEndpoint(
service_name="order-service",
host="localhost",
port=8002,
protocol=CommunicationProtocol.HTTP_REST
)
communicator.register_service(user_service)
communicator.register_service(order_service)
# 创建请求上下文
context = RequestContext(
correlation_id="req-12345",
user_id="user-001",
timeout=30
)
try:
# 调用用户服务
print("调用用户服务...")
user_response = await communicator.call_service(
service_name="user-service",
method="GET",
path="/users/user-001",
context=context
)
print(f"用户服务响应: {user_response}")
# 调用订单服务
print("调用订单服务...")
order_data = {
"user_id": "user-001",
"items": [
{"product_id": "prod-001", "quantity": 2},
{"product_id": "prod-002", "quantity": 1}
]
}
order_response = await communicator.call_service(
service_name="order-service",
method="POST",
path="/orders",
context=context,
data=order_data
)
print(f"订单服务响应: {order_response}")
except Exception as e:
print(f"服务调用失败: {str(e)}")
# 批量健康检查
print("\n执行健康检查...")
health_status = await communicator.batch_health_check()
for service_name, is_healthy in health_status.items():
status = "健康" if is_healthy else "不健康"
print(f"{service_name}: {status}")
if __name__ == "__main__":
asyncio.run(main())
服务网格架构
服务网格提供了微服务间通信的基础设施层,处理服务发现、负载均衡、故障恢复、指标收集和安全等功能。
Istio服务网格部署
#!/bin/bash
"""
Istio服务网格部署脚本
包括安装、配置和示例应用部署
"""
set -e
# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# 日志函数
log_info() {
echo -e "${GREEN}[INFO]${NC} $1"
}
log_warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# 检查依赖
check_dependencies() {
log_info "检查依赖..."
if ! command -v kubectl &> /dev/null; then
log_error "kubectl 未安装"
exit 1
fi
if ! command -v curl &> /dev/null; then
log_error "curl 未安装"
exit 1
fi
# 检查Kubernetes集群连接
if ! kubectl cluster-info &> /dev/null; then
log_error "无法连接到Kubernetes集群"
exit 1
fi
log_info "依赖检查完成"
}
# 安装Istio
install_istio() {
log_info "安装Istio..."
# 下载Istio
ISTIO_VERSION="1.19.0"
curl -L https://istio.io/downloadIstio | ISTIO_VERSION=$ISTIO_VERSION sh -
# 添加到PATH
export PATH="$PWD/istio-$ISTIO_VERSION/bin:$PATH"
# 安装Istio
istioctl install --set values.defaultRevision=default -y
# 启用自动注入
kubectl label namespace default istio-injection=enabled
log_info "Istio安装完成"
}
# 部署Istio网关
deploy_gateway() {
log_info "部署Istio网关..."
cat <<EOF | kubectl apply -f -
apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
name: microservices-gateway
namespace: default
spec:
selector:
istio: ingressgateway
servers:
- port:
number: 80
name: http
protocol: HTTP
hosts:
- "*"
- port:
number: 443
name: https
protocol: HTTPS
tls:
mode: SIMPLE
credentialName: microservices-tls
hosts:
- "*.example.com"
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: microservices-vs
namespace: default
spec:
hosts:
- "*"
gateways:
- microservices-gateway
http:
- match:
- uri:
prefix: /api/users
route:
- destination:
host: user-service
port:
number: 8080
fault:
delay:
percentage:
value: 0.1
fixedDelay: 5s
retries:
attempts: 3
perTryTimeout: 2s
- match:
- uri:
prefix: /api/orders
route:
- destination:
host: order-service
port:
number: 8080
weight: 80
- destination:
host: order-service-v2
port:
number: 8080
weight: 20
timeout: 10s
- match:
- uri:
prefix: /api/payments
route:
- destination:
host: payment-service
port:
number: 8080
headers:
request:
add:
x-request-id: "%REQ(x-request-id)%"
x-trace-id: "%REQ(x-trace-id)%"
EOF
log_info "Istio网关部署完成"
}
# 配置流量管理
configure_traffic_management() {
log_info "配置流量管理..."
# 目标规则
cat <<EOF | kubectl apply -f -
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: user-service-dr
namespace: default
spec:
host: user-service
trafficPolicy:
connectionPool:
tcp:
maxConnections: 100
http:
http1MaxPendingRequests: 50
maxRequestsPerConnection: 10
loadBalancer:
simple: LEAST_CONN
outlierDetection:
consecutiveErrors: 3
interval: 30s
baseEjectionTime: 30s
maxEjectionPercent: 50
subsets:
- name: v1
labels:
version: v1
- name: v2
labels:
version: v2
---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: order-service-dr
namespace: default
spec:
host: order-service
trafficPolicy:
connectionPool:
tcp:
maxConnections: 50
http:
http1MaxPendingRequests: 25
maxRequestsPerConnection: 5
loadBalancer:
simple: ROUND_ROBIN
circuitBreaker:
consecutiveErrors: 5
interval: 10s
baseEjectionTime: 30s
maxEjectionPercent: 50
minHealthPercent: 30
EOF
log_info "流量管理配置完成"
}
# 配置安全策略
configure_security() {
log_info "配置安全策略..."
# 认证策略
cat <<EOF | kubectl apply -f -
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
name: default
namespace: default
spec:
mtls:
mode: STRICT
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: user-service-authz
namespace: default
spec:
selector:
matchLabels:
app: user-service
rules:
- from:
- source:
principals: ["cluster.local/ns/default/sa/api-gateway"]
- to:
- operation:
methods: ["GET", "POST"]
paths: ["/api/users/*"]
- when:
- key: request.headers[authorization]
values: ["Bearer *"]
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: order-service-authz
namespace: default
spec:
selector:
matchLabels:
app: order-service
rules:
- from:
- source:
principals: ["cluster.local/ns/default/sa/api-gateway"]
- source:
principals: ["cluster.local/ns/default/sa/user-service"]
- to:
- operation:
methods: ["GET", "POST", "PUT", "DELETE"]
paths: ["/api/orders/*"]
- when:
- key: request.headers[x-user-id]
notValues: [""]
EOF
log_info "安全策略配置完成"
}
# 部署监控组件
deploy_monitoring() {
log_info "部署监控组件..."
# 安装Prometheus
kubectl apply -f https://raw.githubusercontent.com/istio/istio/release-1.19/samples/addons/prometheus.yaml
# 安装Grafana
kubectl apply -f https://raw.githubusercontent.com/istio/istio/release-1.19/samples/addons/grafana.yaml
# 安装Jaeger
kubectl apply -f https://raw.githubusercontent.com/istio/istio/release-1.19/samples/addons/jaeger.yaml
# 安装Kiali
kubectl apply -f https://raw.githubusercontent.com/istio/istio/release-1.19/samples/addons/kiali.yaml
# 等待部署完成
kubectl wait --for=condition=available --timeout=300s deployment/prometheus -n istio-system
kubectl wait --for=condition=available --timeout=300s deployment/grafana -n istio-system
kubectl wait --for=condition=available --timeout=300s deployment/jaeger -n istio-system
kubectl wait --for=condition=available --timeout=300s deployment/kiali -n istio-system
log_info "监控组件部署完成"
}
# 部署示例应用
deploy_sample_apps() {
log_info "部署示例应用..."
# 用户服务
cat <<EOF | kubectl apply -f -
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
labels:
app: user-service
version: v1
spec:
replicas: 2
selector:
matchLabels:
app: user-service
version: v1
template:
metadata:
labels:
app: user-service
version: v1
spec:
containers:
- name: user-service
image: nginx:alpine
ports:
- containerPort: 80
env:
- name: SERVICE_NAME
value: "user-service"
- name: VERSION
value: "v1"
---
apiVersion: v1
kind: Service
metadata:
name: user-service
labels:
app: user-service
spec:
ports:
- port: 8080
targetPort: 80
name: http
selector:
app: user-service
EOF
# 订单服务
cat <<EOF | kubectl apply -f -
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-service
labels:
app: order-service
version: v1
spec:
replicas: 2
selector:
matchLabels:
app: order-service
version: v1
template:
metadata:
labels:
app: order-service
version: v1
spec:
containers:
- name: order-service
image: nginx:alpine
ports:
- containerPort: 80
env:
- name: SERVICE_NAME
value: "order-service"
- name: VERSION
value: "v1"
---
apiVersion: v1
kind: Service
metadata:
name: order-service
labels:
app: order-service
spec:
ports:
- port: 8080
targetPort: 80
name: http
selector:
app: order-service
EOF
log_info "示例应用部署完成"
}
# 验证部署
verify_deployment() {
log_info "验证部署..."
# 检查Istio组件
kubectl get pods -n istio-system
# 检查应用
kubectl get pods -n default
# 检查网关
kubectl get gateway,virtualservice,destinationrule -n default
# 获取Ingress Gateway地址
INGRESS_HOST=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].port}')
if [ -z "$INGRESS_HOST" ]; then
INGRESS_HOST=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].hostname}')
fi
if [ -z "$INGRESS_HOST" ]; then
INGRESS_HOST="localhost"
INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
fi
GATEWAY_URL="$INGRESS_HOST:$INGRESS_PORT"
log_info "网关地址: http://$GATEWAY_URL"
log_info "Kiali仪表板: kubectl port-forward -n istio-system svc/kiali 20001:20001"
log_info "Grafana仪表板: kubectl port-forward -n istio-system svc/grafana 3000:3000"
log_info "Jaeger仪表板: kubectl port-forward -n istio-system svc/jaeger 16686:16686"
log_info "部署验证完成"
}
# 清理资源
cleanup() {
log_info "清理资源..."
# 删除应用
kubectl delete deployment,service user-service order-service 2>/dev/null || true
# 删除Istio配置
kubectl delete gateway,virtualservice,destinationrule,peerauthentication,authorizationpolicy --all 2>/dev/null || true
# 删除监控组件
kubectl delete -f https://raw.githubusercontent.com/istio/istio/release-1.19/samples/addons/prometheus.yaml 2>/dev/null || true
kubectl delete -f https://raw.githubusercontent.com/istio/istio/release-1.19/samples/addons/grafana.yaml 2>/dev/null || true
kubectl delete -f https://raw.githubusercontent.com/istio/istio/release-1.19/samples/addons/jaeger.yaml 2>/dev/null || true
kubectl delete -f https://raw.githubusercontent.com/istio/istio/release-1.19/samples/addons/kiali.yaml 2>/dev/null || true
# 卸载Istio
istioctl uninstall --purge -y 2>/dev/null || true
log_info "清理完成"
}
# 主函数
main() {
case "${1:-deploy}" in
"deploy")
check_dependencies
install_istio
deploy_gateway
configure_traffic_management
configure_security
deploy_monitoring
deploy_sample_apps
verify_deployment
;;
"cleanup")
cleanup
;;
"verify")
verify_deployment
;;
*)
echo "用法: $0 [deploy|cleanup|verify]"
echo " deploy - 部署Istio服务网格"
echo " cleanup - 清理所有资源"
echo " verify - 验证部署状态"
exit 1
;;
esac
}
main "$@"
监控与可观测性
微服务架构需要全面的监控和可观测性来确保系统的健康运行。
分布式监控系统
#!/usr/bin/env python3
"""
微服务分布式监控系统
包括指标收集、日志聚合、链路追踪和告警
"""
import asyncio
import aiohttp
import json
import time
import logging
import uuid
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge, CollectorRegistry
import jaeger_client
from jaeger_client import Config
import structlog
from elasticsearch import AsyncElasticsearch
import redis.asyncio as redis
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
class MetricType(Enum):
"""指标类型"""
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
SUMMARY = "summary"
class LogLevel(Enum):
"""日志级别"""
DEBUG = "debug"
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class MetricData:
"""指标数据"""
name: str
type: MetricType
value: float
labels: Dict[str, str]
timestamp: datetime
help_text: str = ""
@dataclass
class LogEntry:
"""日志条目"""
timestamp: datetime
level: LogLevel
service: str
message: str
trace_id: Optional[str] = None
span_id: Optional[str] = None
user_id: Optional[str] = None
request_id: Optional[str] = None
extra_fields: Dict[str, Any] = None
@dataclass
class TraceSpan:
"""追踪跨度"""
trace_id: str
span_id: str
parent_span_id: Optional[str]
operation_name: str
service_name: str
start_time: datetime
end_time: Optional[datetime]
duration: Optional[float]
tags: Dict[str, Any]
logs: List[Dict[str, Any]]
status: str = "ok"
class MetricsCollector:
"""指标收集器"""
def __init__(self):
self.registry = CollectorRegistry()
self.metrics = {}
# 预定义指标
self._setup_default_metrics()
def _setup_default_metrics(self):
"""设置默认指标"""
# HTTP请求指标
self.http_requests_total = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status_code', 'service'],
registry=self.registry
)
self.http_request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint', 'service'],
registry=self.registry
)
# 业务指标
self.active_users = Gauge(
'active_users_total',
'Number of active users',
['service'],
registry=self.registry
)
self.database_connections = Gauge(
'database_connections_active',
'Active database connections',
['database', 'service'],
registry=self.registry
)
# 系统指标
self.memory_usage = Gauge(
'memory_usage_bytes',
'Memory usage in bytes',
['service'],
registry=self.registry
)
self.cpu_usage = Gauge(
'cpu_usage_percent',
'CPU usage percentage',
['service'],
registry=self.registry
)
def record_http_request(self, method: str, endpoint: str,
status_code: int, service: str, duration: float):
"""记录HTTP请求指标"""
self.http_requests_total.labels(
method=method,
endpoint=endpoint,
status_code=str(status_code),
service=service
).inc()
self.http_request_duration.labels(
method=method,
endpoint=endpoint,
service=service
).observe(duration)
def set_active_users(self, count: int, service: str):
"""设置活跃用户数"""
self.active_users.labels(service=service).set(count)
def set_database_connections(self, count: int, database: str, service: str):
"""设置数据库连接数"""
self.database_connections.labels(
database=database,
service=service
).set(count)
def set_memory_usage(self, bytes_used: int, service: str):
"""设置内存使用量"""
self.memory_usage.labels(service=service).set(bytes_used)
def set_cpu_usage(self, percentage: float, service: str):
"""设置CPU使用率"""
self.cpu_usage.labels(service=service).set(percentage)
def get_metrics(self) -> str:
"""获取Prometheus格式的指标"""
return prometheus_client.generate_latest(self.registry).decode('utf-8')
class DistributedTracer:
"""分布式追踪器"""
def __init__(self, service_name: str, jaeger_endpoint: str = "http://localhost:14268/api/traces"):
self.service_name = service_name
# 配置Jaeger
config = Config(
config={
'sampler': {
'type': 'const',
'param': 1,
},
'logging': True,
'reporter_batch_size': 1,
},
service_name=service_name,
validate=True,
)
self.tracer = config.initialize_tracer()
# 配置OpenTelemetry
trace.set_tracer_provider(TracerProvider())
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
self.otel_tracer = trace.get_tracer(__name__)
def start_span(self, operation_name: str, parent_span=None, tags: Dict[str, Any] = None) -> Any:
"""开始一个新的跨度"""
span = self.tracer.start_span(
operation_name=operation_name,
child_of=parent_span,
tags=tags or {}
)
return span
def finish_span(self, span: Any, tags: Dict[str, Any] = None, logs: List[Dict[str, Any]] = None):
"""结束跨度"""
if tags:
for key, value in tags.items():
span.set_tag(key, value)
if logs:
for log_entry in logs:
span.log_kv(log_entry)
span.finish()
def extract_span_context(self, headers: Dict[str, str]) -> Any:
"""从HTTP头中提取跨度上下文"""
try:
span_ctx = self.tracer.extract(
format=jaeger_client.Format.HTTP_HEADERS,
carrier=headers
)
return span_ctx
except Exception:
return None
def inject_span_context(self, span: Any, headers: Dict[str, str]) -> Dict[str, str]:
"""将跨度上下文注入HTTP头"""
self.tracer.inject(
span_context=span.context,
format=jaeger_client.Format.HTTP_HEADERS,
carrier=headers
)
return headers
class LogAggregator:
"""日志聚合器"""
def __init__(self, elasticsearch_url: str = "http://localhost:9200"):
self.es_client = AsyncElasticsearch([elasticsearch_url])
self.logger = structlog.get_logger()
# 配置结构化日志
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
async def log(self, entry: LogEntry):
"""记录日志"""
# 发送到Elasticsearch
await self._send_to_elasticsearch(entry)
# 本地日志
self._log_locally(entry)
async def _send_to_elasticsearch(self, entry: LogEntry):
"""发送日志到Elasticsearch"""
try:
index_name = f"microservices-logs-{entry.timestamp.strftime('%Y.%m.%d')}"
doc = {
'timestamp': entry.timestamp.isoformat(),
'level': entry.level.value,
'service': entry.service,
'message': entry.message,
'trace_id': entry.trace_id,
'span_id': entry.span_id,
'user_id': entry.user_id,
'request_id': entry.request_id,
**entry.extra_fields or {}
}
await self.es_client.index(
index=index_name,
body=doc
)
except Exception as e:
print(f"Failed to send log to Elasticsearch: {e}")
def _log_locally(self, entry: LogEntry):
"""本地日志记录"""
logger = self.logger.bind(
service=entry.service,
trace_id=entry.trace_id,
span_id=entry.span_id,
user_id=entry.user_id,
request_id=entry.request_id,
**(entry.extra_fields or {})
)
if entry.level == LogLevel.DEBUG:
logger.debug(entry.message)
elif entry.level == LogLevel.INFO:
logger.info(entry.message)
elif entry.level == LogLevel.WARNING:
logger.warning(entry.message)
elif entry.level == LogLevel.ERROR:
logger.error(entry.message)
elif entry.level == LogLevel.CRITICAL:
logger.critical(entry.message)
async def search_logs(self, query: str, service: str = None,
start_time: datetime = None, end_time: datetime = None,
limit: int = 100) -> List[Dict[str, Any]]:
"""搜索日志"""
try:
search_body = {
"query": {
"bool": {
"must": []
}
},
"sort": [
{"timestamp": {"order": "desc"}}
],
"size": limit
}
# 添加查询条件
if query:
search_body["query"]["bool"]["must"].append({
"multi_match": {
"query": query,
"fields": ["message", "service"]
}
})
if service:
search_body["query"]["bool"]["must"].append({
"term": {"service": service}
})
if start_time or end_time:
time_range = {}
if start_time:
time_range["gte"] = start_time.isoformat()
if end_time:
time_range["lte"] = end_time.isoformat()
search_body["query"]["bool"]["must"].append({
"range": {"timestamp": time_range}
})
response = await self.es_client.search(
index="microservices-logs-*",
body=search_body
)
return [hit["_source"] for hit in response["hits"]["hits"]]
except Exception as e:
print(f"Failed to search logs: {e}")
return []
class AlertManager:
"""告警管理器"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url)
self.alert_rules = []
self.notification_channels = []
def add_alert_rule(self, name: str, condition: str, threshold: float,
duration: int = 300, severity: str = "warning"):
"""添加告警规则"""
rule = {
'name': name,
'condition': condition,
'threshold': threshold,
'duration': duration,
'severity': severity,
'last_triggered': None
}
self.alert_rules.append(rule)
def add_notification_channel(self, channel_type: str, config: Dict[str, Any]):
"""添加通知渠道"""
channel = {
'type': channel_type,
'config': config
}
self.notification_channels.append(channel)
async def check_alerts(self, metrics: Dict[str, float]):
"""检查告警条件"""
current_time = datetime.utcnow()
for rule in self.alert_rules:
try:
# 简化的条件检查
if self._evaluate_condition(rule['condition'], metrics, rule['threshold']):
# 检查是否需要触发告警
if await self._should_trigger_alert(rule, current_time):
await self._trigger_alert(rule, metrics)
rule['last_triggered'] = current_time
except Exception as e:
print(f"Error checking alert rule {rule['name']}: {e}")
def _evaluate_condition(self, condition: str, metrics: Dict[str, float], threshold: float) -> bool:
"""评估告警条件"""
# 简化实现,实际应该支持更复杂的表达式
if condition in metrics:
value = metrics[condition]
return value > threshold
return False
async def _should_trigger_alert(self, rule: Dict[str, Any], current_time: datetime) -> bool:
"""检查是否应该触发告警"""
if rule['last_triggered'] is None:
return True
time_since_last = (current_time - rule['last_triggered']).total_seconds()
return time_since_last >= rule['duration']
async def _trigger_alert(self, rule: Dict[str, Any], metrics: Dict[str, float]):
"""触发告警"""
alert = {
'id': str(uuid.uuid4()),
'rule_name': rule['name'],
'severity': rule['severity'],
'message': f"Alert: {rule['name']} triggered",
'timestamp': datetime.utcnow().isoformat(),
'metrics': metrics
}
# 存储告警
await self.redis_client.lpush('alerts', json.dumps(alert))
# 发送通知
for channel in self.notification_channels:
await self._send_notification(channel, alert)
async def _send_notification(self, channel: Dict[str, Any], alert: Dict[str, Any]):
"""发送通知"""
try:
if channel['type'] == 'webhook':
await self._send_webhook_notification(channel['config'], alert)
elif channel['type'] == 'email':
await self._send_email_notification(channel['config'], alert)
elif channel['type'] == 'slack':
await self._send_slack_notification(channel['config'], alert)
except Exception as e:
print(f"Failed to send notification via {channel['type']}: {e}")
async def _send_webhook_notification(self, config: Dict[str, Any], alert: Dict[str, Any]):
"""发送Webhook通知"""
async with aiohttp.ClientSession() as session:
await session.post(config['url'], json=alert)
async def _send_email_notification(self, config: Dict[str, Any], alert: Dict[str, Any]):
"""发送邮件通知"""
# 实现邮件发送逻辑
pass
async def _send_slack_notification(self, config: Dict[str, Any], alert: Dict[str, Any]):
"""发送Slack通知"""
# 实现Slack通知逻辑
pass
class MonitoringSystem:
"""监控系统"""
def __init__(self, service_name: str):
self.service_name = service_name
self.metrics_collector = MetricsCollector()
self.tracer = DistributedTracer(service_name)
self.log_aggregator = LogAggregator()
self.alert_manager = AlertManager()
# 设置默认告警规则
self._setup_default_alerts()
def _setup_default_alerts(self):
"""设置默认告警规则"""
self.alert_manager.add_alert_rule(
name="high_error_rate",
condition="error_rate",
threshold=0.05, # 5%
duration=300, # 5分钟
severity="critical"
)
self.alert_manager.add_alert_rule(
name="high_response_time",
condition="avg_response_time",
threshold=2.0, # 2秒
duration=600, # 10分钟
severity="warning"
)
self.alert_manager.add_alert_rule(
name="high_memory_usage",
condition="memory_usage_percent",
threshold=80.0, # 80%
duration=300, # 5分钟
severity="warning"
)
async def start_monitoring(self):
"""启动监控"""
# 启动指标收集循环
asyncio.create_task(self._metrics_collection_loop())
# 启动告警检查循环
asyncio.create_task(self._alert_check_loop())
async def _metrics_collection_loop(self):
"""指标收集循环"""
while True:
try:
# 收集系统指标
await self._collect_system_metrics()
# 等待下一次收集
await asyncio.sleep(30) # 每30秒收集一次
except Exception as e:
await self.log_aggregator.log(LogEntry(
timestamp=datetime.utcnow(),
level=LogLevel.ERROR,
service=self.service_name,
message=f"Metrics collection failed: {str(e)}"
))
await asyncio.sleep(10)
async def _alert_check_loop(self):
"""告警检查循环"""
while True:
try:
# 获取当前指标
metrics = await self._get_current_metrics()
# 检查告警
await self.alert_manager.check_alerts(metrics)
# 等待下一次检查
await asyncio.sleep(60) # 每分钟检查一次
except Exception as e:
await self.log_aggregator.log(LogEntry(
timestamp=datetime.utcnow(),
level=LogLevel.ERROR,
service=self.service_name,
message=f"Alert check failed: {str(e)}"
))
await asyncio.sleep(30)
async def _collect_system_metrics(self):
"""收集系统指标"""
import psutil
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
self.metrics_collector.set_cpu_usage(cpu_percent, self.service_name)
# 内存使用量
memory = psutil.virtual_memory()
self.metrics_collector.set_memory_usage(memory.used, self.service_name)
async def _get_current_metrics(self) -> Dict[str, float]:
"""获取当前指标"""
import psutil
return {
'cpu_usage_percent': psutil.cpu_percent(),
'memory_usage_percent': psutil.virtual_memory().percent,
'disk_usage_percent': psutil.disk_usage('/').percent,
}
# 使用示例
async def main():
"""主函数 - 监控系统示例"""
monitoring = MonitoringSystem("user-service")
# 启动监控
await monitoring.start_monitoring()
# 模拟一些指标
monitoring.metrics_collector.record_http_request(
method="GET",
endpoint="/api/users",
status_code=200,
service="user-service",
duration=0.5
)
# 记录日志
await monitoring.log_aggregator.log(LogEntry(
timestamp=datetime.utcnow(),
level=LogLevel.INFO,
service="user-service",
message="User service started successfully",
extra_fields={"version": "1.0.0"}
))
print("监控系统启动成功")
print("指标端点: http://localhost:8080/metrics")
print("日志查询: http://localhost:9200/microservices-logs-*/_search")
# 保持运行
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(main())
总结
微服务架构在云环境中的设计模式涵盖了多个关键领域:
核心架构要素
-
服务拆分与设计
- 基于业务能力的服务边界划分
- 数据库分离和事务管理
- 服务间通信协议设计
-
通信模式
- 同步通信(REST、gRPC)
- 异步通信(消息队列、事件驱动)
- 服务发现与注册机制
-
API网关
- 统一入口和路由管理
- 认证授权和限流控制
- 负载均衡和熔断保护
-
服务网格
- 服务间通信基础设施
- 流量管理和安全策略
- 可观测性和监控集成
-
监控与可观测性
- 分布式追踪和指标收集
- 日志聚合和分析
- 告警和通知机制
最佳实践
-
架构设计原则
- 单一职责和松耦合
- 故障隔离和容错设计
- 数据一致性和事务管理
-
运维管理
- 自动化部署和扩缩容
- 配置管理和版本控制
- 灾难恢复和备份策略
-
团队协作
- DevOps文化和实践
- API设计和文档管理
- 持续集成和交付
通过合理的架构设计和工具选择,可以构建出高可用、高性能、易维护的微服务系统,为业务发展提供强有力的技术支撑。
异步通信模式
#!/usr/bin/env python3
"""
微服务异步通信实现
包括消息队列、事件驱动架构等异步通信模式
"""
import asyncio
import json
import uuid
from typing import Dict, Any, Callable, List, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
from enum import Enum
import logging
class MessageType(Enum):
"""消息类型"""
COMMAND = "command"
EVENT = "event"
QUERY = "query"
class DeliveryMode(Enum):
"""投递模式"""
AT_MOST_ONCE = "at_most_once"
AT_LEAST_ONCE = "at_least_once"
EXACTLY_ONCE = "exactly_once"
@dataclass
class Message:
"""消息"""
id: str
type: MessageType
topic: str
payload: Dict[str, Any]
correlation_id: Optional[str] = None
reply_to: Optional[str] = None
timestamp: datetime = None
headers: Dict[str, str] = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.utcnow()
if self.headers is None:
self.headers = {}
@dataclass
class EventMessage(Message):
"""事件消息"""
event_type: str
aggregate_id: str
version: int
def __post_init__(self):
super().__post_init__()
self.type = MessageType.EVENT
class MessageBroker:
"""消息代理"""
def __init__(self):
self.topics = {}
self.subscribers = {}
self.message_store = {}
self.logger = logging.getLogger(__name__)
def create_topic(self, topic_name: str, partitions: int = 1):
"""创建主题"""
if topic_name not in self.topics:
self.topics[topic_name] = {
'partitions': partitions,
'messages': [[] for _ in range(partitions)]
}
self.subscribers[topic_name] = []
self.logger.info(f"Created topic: {topic_name} with {partitions} partitions")
def subscribe(self, topic_name: str, handler: Callable[[Message], None],
consumer_group: str = "default"):
"""订阅主题"""
if topic_name not in self.topics:
self.create_topic(topic_name)
subscriber = {
'handler': handler,
'consumer_group': consumer_group,
'id': str(uuid.uuid4())
}
self.subscribers[topic_name].append(subscriber)
self.logger.info(f"Subscribed to topic: {topic_name}, group: {consumer_group}")
return subscriber['id']
async def publish(self, message: Message, delivery_mode: DeliveryMode = DeliveryMode.AT_LEAST_ONCE):
"""发布消息"""
topic_name = message.topic
if topic_name not in self.topics:
self.create_topic(topic_name)
# 选择分区(简单的轮询策略)
partition_count = self.topics[topic_name]['partitions']
partition = hash(message.id) % partition_count
# 存储消息
self.topics[topic_name]['messages'][partition].append(message)
self.message_store[message.id] = message
# 异步投递给订阅者
await self._deliver_message(message, delivery_mode)
self.logger.info(f"Published message {message.id} to topic {topic_name}")
async def _deliver_message(self, message: Message, delivery_mode: DeliveryMode):
"""投递消息给订阅者"""
topic_name = message.topic
if topic_name not in self.subscribers:
return
delivery_tasks = []
for subscriber in self.subscribers[topic_name]:
task = asyncio.create_task(
self._deliver_to_subscriber(message, subscriber, delivery_mode)
)
delivery_tasks.append(task)
# 等待所有投递完成
if delivery_tasks:
await asyncio.gather(*delivery_tasks, return_exceptions=True)
async def _deliver_to_subscriber(self, message: Message, subscriber: Dict,
delivery_mode: DeliveryMode):
"""投递消息给特定订阅者"""
try:
if delivery_mode == DeliveryMode.AT_LEAST_ONCE:
# 至少一次投递,可能重复
await self._invoke_handler(message, subscriber)
elif delivery_mode == DeliveryMode.AT_MOST_ONCE:
# 至多一次投递,可能丢失
try:
await self._invoke_handler(message, subscriber)
except Exception:
# 忽略错误,不重试
pass
else: # EXACTLY_ONCE
# 精确一次投递(简化实现)
await self._invoke_handler(message, subscriber)
except Exception as e:
self.logger.error(f"Failed to deliver message {message.id} to subscriber {subscriber['id']}: {str(e)}")
async def _invoke_handler(self, message: Message, subscriber: Dict):
"""调用处理器"""
handler = subscriber['handler']
if asyncio.iscoroutinefunction(handler):
await handler(message)
else:
# 在线程池中执行同步处理器
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, handler, message)
class EventStore:
"""事件存储"""
def __init__(self):
self.events = {}
self.snapshots = {}
self.logger = logging.getLogger(__name__)
async def append_events(self, aggregate_id: str, events: List[EventMessage],
expected_version: int = -1):
"""追加事件"""
if aggregate_id not in self.events:
self.events[aggregate_id] = []
current_version = len(self.events[aggregate_id])
if expected_version != -1 and current_version != expected_version:
raise Exception(f"Concurrency conflict: expected version {expected_version}, actual {current_version}")
# 设置事件版本
for i, event in enumerate(events):
event.version = current_version + i + 1
self.events[aggregate_id].extend(events)
self.logger.info(f"Appended {len(events)} events to aggregate {aggregate_id}")
return current_version + len(events)
async def get_events(self, aggregate_id: str, from_version: int = 0) -> List[EventMessage]:
"""获取事件"""
if aggregate_id not in self.events:
return []
events = self.events[aggregate_id]
return [event for event in events if event.version > from_version]
async def save_snapshot(self, aggregate_id: str, snapshot: Dict[str, Any], version: int):
"""保存快照"""
self.snapshots[aggregate_id] = {
'data': snapshot,
'version': version,
'timestamp': datetime.utcnow()
}
self.logger.info(f"Saved snapshot for aggregate {aggregate_id} at version {version}")
async def get_snapshot(self, aggregate_id: str) -> Optional[Dict[str, Any]]:
"""获取快照"""
return self.snapshots.get(aggregate_id)
class SagaOrchestrator:
"""Saga编排器"""
def __init__(self, message_broker: MessageBroker):
self.message_broker = message_broker
self.saga_instances = {}
self.logger = logging.getLogger(__name__)
async def start_saga(self, saga_id: str, saga_definition: Dict[str, Any],
initial_data: Dict[str, Any]):
"""启动Saga"""
saga_instance = {
'id': saga_id,
'definition': saga_definition,
'current_step': 0,
'data': initial_data,
'status': 'running',
'completed_steps': [],
'failed_steps': []
}
self.saga_instances[saga_id] = saga_instance
# 执行第一步
await self._execute_next_step(saga_instance)
self.logger.info(f"Started saga {saga_id}")
async def _execute_next_step(self, saga_instance: Dict[str, Any]):
"""执行下一步"""
definition = saga_instance['definition']
current_step = saga_instance['current_step']
if current_step >= len(definition['steps']):
# Saga完成
saga_instance['status'] = 'completed'
self.logger.info(f"Saga {saga_instance['id']} completed successfully")
return
step = definition['steps'][current_step]
try:
# 创建命令消息
command_message = Message(
id=str(uuid.uuid4()),
type=MessageType.COMMAND,
topic=step['service'],
payload={
'command': step['action'],
'data': saga_instance['data'],
'saga_id': saga_instance['id'],
'step_id': current_step
},
correlation_id=saga_instance['id']
)
# 发布命令
await self.message_broker.publish(command_message)
self.logger.info(f"Executed step {current_step} of saga {saga_instance['id']}")
except Exception as e:
# 步骤失败,开始补偿
saga_instance['failed_steps'].append(current_step)
await self._start_compensation(saga_instance)
self.logger.error(f"Step {current_step} of saga {saga_instance['id']} failed: {str(e)}")
async def handle_step_completion(self, saga_id: str, step_id: int,
success: bool, result_data: Dict[str, Any]):
"""处理步骤完成"""
if saga_id not in self.saga_instances:
return
saga_instance = self.saga_instances[saga_id]
if success:
saga_instance['completed_steps'].append(step_id)
saga_instance['data'].update(result_data)
saga_instance['current_step'] += 1
# 执行下一步
await self._execute_next_step(saga_instance)
else:
saga_instance['failed_steps'].append(step_id)
await self._start_compensation(saga_instance)
async def _start_compensation(self, saga_instance: Dict[str, Any]):
"""开始补偿"""
saga_instance['status'] = 'compensating'
# 按相反顺序执行补偿动作
for step_id in reversed(saga_instance['completed_steps']):
step = saga_instance['definition']['steps'][step_id]
if 'compensation' in step:
compensation_message = Message(
id=str(uuid.uuid4()),
type=MessageType.COMMAND,
topic=step['service'],
payload={
'command': step['compensation'],
'data': saga_instance['data'],
'saga_id': saga_instance['id'],
'step_id': step_id
},
correlation_id=saga_instance['id']
)
await self.message_broker.publish(compensation_message)
saga_instance['status'] = 'compensated'
self.logger.info(f"Saga {saga_instance['id']} compensated")
async def main():
"""主函数 - 演示异步通信"""
# 创建消息代理
broker = MessageBroker()
# 创建事件存储
event_store = EventStore()
# 创建Saga编排器
saga_orchestrator = SagaOrchestrator(broker)
# 定义消息处理器
async def order_created_handler(message: Message):
print(f"处理订单创建事件: {message.payload}")
# 发布库存检查命令
inventory_command = Message(
id=str(uuid.uuid4()),
type=MessageType.COMMAND,
topic="inventory-service",
payload={
"command": "check_inventory",
"order_id": message.payload["order_id"],
"items": message.payload["items"]
},
correlation_id=message.correlation_id
)
await broker.publish(inventory_command)
async def inventory_checked_handler(message: Message):
print(f"处理库存检查结果: {message.payload}")
if message.payload["available"]:
# 发布支付处理命令
payment_command = Message(
id=str(uuid.uuid4()),
type=MessageType.COMMAND,
topic="payment-service",
payload={
"command": "process_payment",
"order_id": message.payload["order_id"],
"amount": message.payload["total_amount"]
},
correlation_id=message.correlation_id
)
await broker.publish(payment_command)
else:
# 发布订单取消事件
order_cancelled_event = EventMessage(
id=str(uuid.uuid4()),
topic="order-events",
payload={
"order_id": message.payload["order_id"],
"reason": "insufficient_inventory"
},
event_type="OrderCancelled",
aggregate_id=message.payload["order_id"],
version=1
)
await broker.publish(order_cancelled_event)
# 订阅事件
broker.subscribe("order-events", order_created_handler)
broker.subscribe("inventory-events", inventory_checked_handler)
# 模拟订单创建事件
order_created_event = EventMessage(
id=str(uuid.uuid4()),
topic="order-events",
payload={
"order_id": "order-12345",
"user_id": "user-001",
"items": [
{"product_id": "prod-001", "quantity": 2},
{"product_id": "prod-002", "quantity": 1}
],
"total_amount": 299.99
},
event_type="OrderCreated",
aggregate_id="order-12345",
version=1
)
print("发布订单创建事件...")
await broker.publish(order_created_event)
# 存储事件
await event_store.append_events("order-12345", [order_created_event])
# 模拟库存检查结果
await asyncio.sleep(1)
inventory_checked_event = Message(
id=str(uuid.uuid4()),
type=MessageType.EVENT,
topic="inventory-events",
payload={
"order_id": "order-12345",
"available": True,
"total_amount": 299.99
},
correlation_id=order_created_event.id
)
print("发布库存检查结果...")
await broker.publish(inventory_checked_event)
# 演示Saga模式
print("\n演示Saga编排...")
saga_definition = {
"steps": [
{
"service": "inventory-service",
"action": "reserve_inventory",
"compensation": "release_inventory"
},
{
"service": "payment-service",
"action": "charge_payment",
"compensation": "refund_payment"
},
{
"service": "shipping-service",
"action": "create_shipment",
"compensation": "cancel_shipment"
}
]
}
await saga_orchestrator.start_saga(
saga_id="saga-order-12345",
saga_definition=saga_definition,
initial_data={
"order_id": "order-12345",
"user_id": "user-001",
"total_amount": 299.99
}
)
# 等待异步处理完成
await asyncio.sleep(2)
print("\n=== 异步通信演示完成 ===")
if __name__ == "__main__":
asyncio.run(main())
数据管理策略
微服务架构中的数据管理是一个复杂的挑战,需要平衡数据一致性、性能和服务自治性。
数据库分离模式
#!/usr/bin/env python3
"""
微服务数据管理策略
包括数据库分离、数据同步、分布式事务等
"""
import asyncio
import json
import uuid
from typing import Dict, Any, List, Optional, Union
from dataclasses import dataclass, asdict
from datetime import datetime
from enum import Enum
import logging
from abc import ABC, abstractmethod
class ConsistencyLevel(Enum):
"""一致性级别"""
STRONG = "strong"
EVENTUAL = "eventual"
WEAK = "weak"
class TransactionStatus(Enum):
"""事务状态"""
PENDING = "pending"
COMMITTED = "committed"
ABORTED = "aborted"
COMPENSATED = "compensated"
@dataclass
class DataChangeEvent:
"""数据变更事件"""
id: str
aggregate_id: str
event_type: str
data: Dict[str, Any]
timestamp: datetime
version: int
service_name: str
class DatabaseRepository(ABC):
"""数据库仓储抽象基类"""
@abstractmethod
async def save(self, entity: Dict[str, Any]) -> str:
"""保存实体"""
pass
@abstractmethod
async def find_by_id(self, entity_id: str) -> Optional[Dict[str, Any]]:
"""根据ID查找实体"""
pass
@abstractmethod
async def find_by_criteria(self, criteria: Dict[str, Any]) -> List[Dict[str, Any]]:
"""根据条件查找实体"""
pass
@abstractmethod
async def update(self, entity_id: str, updates: Dict[str, Any]) -> bool:
"""更新实体"""
pass
@abstractmethod
async def delete(self, entity_id: str) -> bool:
"""删除实体"""
pass
class InMemoryRepository(DatabaseRepository):
"""内存数据库仓储实现"""
def __init__(self, service_name: str):
self.service_name = service_name
self.data = {}
self.logger = logging.getLogger(__name__)
async def save(self, entity: Dict[str, Any]) -> str:
"""保存实体"""
entity_id = entity.get('id', str(uuid.uuid4()))
entity['id'] = entity_id
entity['created_at'] = datetime.utcnow().isoformat()
entity['updated_at'] = datetime.utcnow().isoformat()
self.data[entity_id] = entity.copy()
self.logger.info(f"Saved entity {entity_id} in {self.service_name}")
return entity_id
async def find_by_id(self, entity_id: str) -> Optional[Dict[str, Any]]:
"""根据ID查找实体"""
return self.data.get(entity_id)
async def find_by_criteria(self, criteria: Dict[str, Any]) -> List[Dict[str, Any]]:
"""根据条件查找实体"""
results = []
for entity in self.data.values():
match = True
for key, value in criteria.items():
if key not in entity or entity[key] != value:
match = False
break
if match:
results.append(entity.copy())
return results
async def update(self, entity_id: str, updates: Dict[str, Any]) -> bool:
"""更新实体"""
if entity_id not in self.data:
return False
self.data[entity_id].update(updates)
self.data[entity_id]['updated_at'] = datetime.utcnow().isoformat()
self.logger.info(f"Updated entity {entity_id} in {self.service_name}")
return True
async def delete(self, entity_id: str) -> bool:
"""删除实体"""
if entity_id in self.data:
del self.data[entity_id]
self.logger.info(f"Deleted entity {entity_id} from {self.service_name}")
return True
return False
class EventSourcingRepository:
"""事件溯源仓储"""
def __init__(self, service_name: str):
self.service_name = service_name
self.event_store = {}
self.snapshots = {}
self.logger = logging.getLogger(__name__)
async def append_events(self, aggregate_id: str, events: List[DataChangeEvent],
expected_version: int = -1) -> int:
"""追加事件"""
if aggregate_id not in self.event_store:
self.event_store[aggregate_id] = []
current_version = len(self.event_store[aggregate_id])
if expected_version != -1 and current_version != expected_version:
raise Exception(f"Concurrency conflict: expected {expected_version}, got {current_version}")
# 设置事件版本
for i, event in enumerate(events):
event.version = current_version + i + 1
self.event_store[aggregate_id].extend(events)
self.logger.info(f"Appended {len(events)} events to aggregate {aggregate_id}")
return current_version + len(events)
async def get_events(self, aggregate_id: str, from_version: int = 0) -> List[DataChangeEvent]:
"""获取事件"""
if aggregate_id not in self.event_store:
return []
events = self.event_store[aggregate_id]
return [event for event in events if event.version > from_version]
async def rebuild_aggregate(self, aggregate_id: str) -> Optional[Dict[str, Any]]:
"""重建聚合"""
events = await self.get_events(aggregate_id)
if not events:
return None
# 从快照开始(如果有)
snapshot = self.snapshots.get(aggregate_id)
if snapshot:
aggregate = snapshot['data'].copy()
from_version = snapshot['version']
events = [e for e in events if e.version > from_version]
else:
aggregate = {'id': aggregate_id}
# 应用事件
for event in events:
aggregate = self._apply_event(aggregate, event)
return aggregate
def _apply_event(self, aggregate: Dict[str, Any], event: DataChangeEvent) -> Dict[str, Any]:
"""应用事件到聚合"""
if event.event_type == "Created":
aggregate.update(event.data)
elif event.event_type == "Updated":
aggregate.update(event.data)
elif event.event_type == "Deleted":
aggregate['deleted'] = True
aggregate['deleted_at'] = event.timestamp.isoformat()
aggregate['version'] = event.version
aggregate['last_modified'] = event.timestamp.isoformat()
return aggregate
async def save_snapshot(self, aggregate_id: str, aggregate: Dict[str, Any], version: int):
"""保存快照"""
self.snapshots[aggregate_id] = {
'data': aggregate.copy(),
'version': version,
'timestamp': datetime.utcnow()
}
self.logger.info(f"Saved snapshot for aggregate {aggregate_id} at version {version}")
class DataSynchronizer:
"""数据同步器"""
def __init__(self):
self.sync_rules = {}
self.repositories = {}
self.logger = logging.getLogger(__name__)
def register_repository(self, service_name: str, repository: DatabaseRepository):
"""注册仓储"""
self.repositories[service_name] = repository
def add_sync_rule(self, source_service: str, target_service: str,
entity_type: str, mapping_function: callable,
consistency_level: ConsistencyLevel = ConsistencyLevel.EVENTUAL):
"""添加同步规则"""
rule_id = f"{source_service}->{target_service}:{entity_type}"
self.sync_rules[rule_id] = {
'source_service': source_service,
'target_service': target_service,
'entity_type': entity_type,
'mapping_function': mapping_function,
'consistency_level': consistency_level
}
self.logger.info(f"Added sync rule: {rule_id}")
async def sync_data_change(self, source_service: str, event: DataChangeEvent):
"""同步数据变更"""
# 查找相关的同步规则
relevant_rules = [
rule for rule_id, rule in self.sync_rules.items()
if rule['source_service'] == source_service and
rule['entity_type'] == event.event_type
]
sync_tasks = []
for rule in relevant_rules:
task = asyncio.create_task(
self._execute_sync_rule(rule, event)
)
sync_tasks.append(task)
if sync_tasks:
if rule['consistency_level'] == ConsistencyLevel.STRONG:
# 强一致性:等待所有同步完成
await asyncio.gather(*sync_tasks)
else:
# 最终一致性:异步执行
asyncio.create_task(asyncio.gather(*sync_tasks, return_exceptions=True))
async def _execute_sync_rule(self, rule: Dict[str, Any], event: DataChangeEvent):
"""执行同步规则"""
try:
target_service = rule['target_service']
mapping_function = rule['mapping_function']
if target_service not in self.repositories:
self.logger.error(f"Target repository not found: {target_service}")
return
target_repo = self.repositories[target_service]
# 应用映射函数
mapped_data = mapping_function(event.data)
if event.event_type in ["Created", "Updated"]:
# 检查目标实体是否存在
existing_entity = await target_repo.find_by_id(event.aggregate_id)
if existing_entity:
await target_repo.update(event.aggregate_id, mapped_data)
else:
mapped_data['id'] = event.aggregate_id
await target_repo.save(mapped_data)
elif event.event_type == "Deleted":
await target_repo.delete(event.aggregate_id)
self.logger.info(f"Synced data from {rule['source_service']} to {target_service}")
except Exception as e:
self.logger.error(f"Sync failed: {str(e)}")
# 这里可以实现重试机制或死信队列
class DistributedTransactionManager:
"""分布式事务管理器(2PC实现)"""
def __init__(self):
self.transactions = {}
self.participants = {}
self.logger = logging.getLogger(__name__)
def register_participant(self, service_name: str, repository: DatabaseRepository):
"""注册事务参与者"""
self.participants[service_name] = repository
async def begin_transaction(self, transaction_id: str, operations: List[Dict[str, Any]]) -> bool:
"""开始分布式事务"""
transaction = {
'id': transaction_id,
'status': TransactionStatus.PENDING,
'operations': operations,
'participants': set(),
'prepared_participants': set(),
'start_time': datetime.utcnow()
}
# 收集参与者
for operation in operations:
service_name = operation['service']
transaction['participants'].add(service_name)
self.transactions[transaction_id] = transaction
try:
# Phase 1: Prepare
prepare_success = await self._prepare_phase(transaction)
if prepare_success:
# Phase 2: Commit
await self._commit_phase(transaction)
transaction['status'] = TransactionStatus.COMMITTED
self.logger.info(f"Transaction {transaction_id} committed successfully")
return True
else:
# Phase 2: Abort
await self._abort_phase(transaction)
transaction['status'] = TransactionStatus.ABORTED
self.logger.info(f"Transaction {transaction_id} aborted")
return False
except Exception as e:
await self._abort_phase(transaction)
transaction['status'] = TransactionStatus.ABORTED
self.logger.error(f"Transaction {transaction_id} failed: {str(e)}")
return False
async def _prepare_phase(self, transaction: Dict[str, Any]) -> bool:
"""准备阶段"""
prepare_tasks = []
for participant in transaction['participants']:
if participant in self.participants:
task = asyncio.create_task(
self._prepare_participant(transaction, participant)
)
prepare_tasks.append(task)
results = await asyncio.gather(*prepare_tasks, return_exceptions=True)
# 检查所有参与者是否都准备好
for result in results:
if isinstance(result, Exception) or not result:
return False
return True
async def _prepare_participant(self, transaction: Dict[str, Any], participant: str) -> bool:
"""准备参与者"""
try:
# 这里应该实现具体的准备逻辑
# 例如:锁定资源、验证操作等
# 模拟准备过程
await asyncio.sleep(0.1)
transaction['prepared_participants'].add(participant)
self.logger.info(f"Participant {participant} prepared for transaction {transaction['id']}")
return True
except Exception as e:
self.logger.error(f"Participant {participant} failed to prepare: {str(e)}")
return False
async def _commit_phase(self, transaction: Dict[str, Any]):
"""提交阶段"""
commit_tasks = []
for participant in transaction['prepared_participants']:
task = asyncio.create_task(
self._commit_participant(transaction, participant)
)
commit_tasks.append(task)
await asyncio.gather(*commit_tasks, return_exceptions=True)
async def _commit_participant(self, transaction: Dict[str, Any], participant: str):
"""提交参与者"""
try:
repository = self.participants[participant]
# 执行该参与者的操作
for operation in transaction['operations']:
if operation['service'] == participant:
if operation['type'] == 'create':
await repository.save(operation['data'])
elif operation['type'] == 'update':
await repository.update(operation['entity_id'], operation['data'])
elif operation['type'] == 'delete':
await repository.delete(operation['entity_id'])
self.logger.info(f"Participant {participant} committed transaction {transaction['id']}")
except Exception as e:
self.logger.error(f"Participant {participant} failed to commit: {str(e)}")
async def _abort_phase(self, transaction: Dict[str, Any]):
"""中止阶段"""
abort_tasks = []
for participant in transaction['prepared_participants']:
task = asyncio.create_task(
self._abort_participant(transaction, participant)
)
abort_tasks.append(task)
await asyncio.gather(*abort_tasks, return_exceptions=True)
async def _abort_participant(self, transaction: Dict[str, Any], participant: str):
"""中止参与者"""
try:
# 这里应该实现回滚逻辑
# 例如:释放锁定的资源等
self.logger.info(f"Participant {participant} aborted transaction {transaction['id']}")
except Exception as e:
self.logger.error(f"Participant {participant} failed to abort: {str(e)}")
async def main():
"""主函数 - 演示数据管理策略"""
# 创建仓储
user_repo = InMemoryRepository("user-service")
order_repo = InMemoryRepository("order-service")
billing_repo = InMemoryRepository("billing-service")
# 创建事件溯源仓储
event_repo = EventSourcingRepository("event-store")
# 创建数据同步器
synchronizer = DataSynchronizer()
synchronizer.register_repository("user-service", user_repo)
synchronizer.register_repository("order-service", order_repo)
synchronizer.register_repository("billing-service", billing_repo)
# 定义数据映射函数
def user_to_billing_mapping(user_data: Dict[str, Any]) -> Dict[str, Any]:
"""用户数据到账单服务的映射"""
return {
'customer_id': user_data['id'],
'customer_name': user_data['name'],
'customer_email': user_data['email'],
'billing_address': user_data.get('address', '')
}
def order_to_billing_mapping(order_data: Dict[str, Any]) -> Dict[str, Any]:
"""订单数据到账单服务的映射"""
return {
'order_id': order_data['id'],
'customer_id': order_data['user_id'],
'amount': order_data['total_amount'],
'items': order_data['items']
}
# 添加同步规则
synchronizer.add_sync_rule(
source_service="user-service",
target_service="billing-service",
entity_type="UserCreated",
mapping_function=user_to_billing_mapping,
consistency_level=ConsistencyLevel.EVENTUAL
)
synchronizer.add_sync_rule(
source_service="order-service",
target_service="billing-service",
entity_type="OrderCreated",
mapping_function=order_to_billing_mapping,
consistency_level=ConsistencyLevel.STRONG
)
# 演示数据操作
print("=== 数据管理策略演示 ===")
# 1. 创建用户
user_data = {
'name': '张三',
'email': 'zhangsan@example.com',
'address': '北京市朝阳区'
}
user_id = await user_repo.save(user_data)
print(f"创建用户: {user_id}")
# 触发数据同步
user_event = DataChangeEvent(
id=str(uuid.uuid4()),
aggregate_id=user_id,
event_type="UserCreated",
data=user_data,
timestamp=datetime.utcnow(),
version=1,
service_name="user-service"
)
await synchronizer.sync_data_change("user-service", user_event)
# 2. 创建订单
order_data = {
'user_id': user_id,
'items': [
{'product_id': 'prod-001', 'quantity': 2, 'price': 99.99},
{'product_id': 'prod-002', 'quantity': 1, 'price': 199.99}
],
'total_amount': 399.97,
'status': 'pending'
}
order_id = await order_repo.save(order_data)
print(f"创建订单: {order_id}")
# 3. 演示事件溯源
print("\n=== 事件溯源演示 ===")
# 创建事件序列
events = [
DataChangeEvent(
id=str(uuid.uuid4()),
aggregate_id=order_id,
event_type="Created",
data=order_data,
timestamp=datetime.utcnow(),
version=1,
service_name="order-service"
),
DataChangeEvent(
id=str(uuid.uuid4()),
aggregate_id=order_id,
event_type="Updated",
data={'status': 'confirmed'},
timestamp=datetime.utcnow(),
version=2,
service_name="order-service"
),
DataChangeEvent(
id=str(uuid.uuid4()),
aggregate_id=order_id,
event_type="Updated",
data={'status': 'shipped'},
timestamp=datetime.utcnow(),
version=3,
service_name="order-service"
)
]
# 存储事件
final_version = await event_repo.append_events(order_id, events)
print(f"存储了 {len(events)} 个事件,最终版本: {final_version}")
# 重建聚合
rebuilt_order = await event_repo.rebuild_aggregate(order_id)
print(f"重建的订单状态: {rebuilt_order['status']}")
# 4. 演示分布式事务
print("\n=== 分布式事务演示 ===")
tx_manager = DistributedTransactionManager()
tx_manager.register_participant("user-service", user_repo)
tx_manager.register_participant("order-service", order_repo)
tx_manager.register_participant("billing-service", billing_repo)
# 定义分布式事务操作
transaction_operations = [
{
'service': 'user-service',
'type': 'update',
'entity_id': user_id,
'data': {'last_order_date': datetime.utcnow().isoformat()}
},
{
'service': 'order-service',
'type': 'update',
'entity_id': order_id,
'data': {'status': 'processing'}
},
{
'service': 'billing-service',
'type': 'create',
'data': {
'order_id': order_id,
'customer_id': user_id,
'amount': 399.97,
'status': 'pending'
}
}
]
# 执行分布式事务
transaction_id = str(uuid.uuid4())
success = await tx_manager.begin_transaction(transaction_id, transaction_operations)
if success:
print(f"分布式事务 {transaction_id} 执行成功")
else:
print(f"分布式事务 {transaction_id} 执行失败")
# 验证数据一致性
print("\n=== 数据一致性验证 ===")
updated_user = await user_repo.find_by_id(user_id)
updated_order = await order_repo.find_by_id(order_id)
billing_records = await billing_repo.find_by_criteria({'customer_id': user_id})
print(f"用户最后订单日期: {updated_user.get('last_order_date', 'N/A')}")
print(f"订单状态: {updated_order.get('status', 'N/A')}")
print(f"账单记录数: {len(billing_records)}")
if __name__ == "__main__":
asyncio.run(main())
服务发现与注册
服务发现是微服务架构中的关键组件,确保服务能够动态地找到和通信。
服务注册中心实现
#!/bin/bash
# 服务发现基础设施部署脚本
set -e
echo "部署服务发现基础设施..."
# 1. 部署Consul集群
deploy_consul() {
echo "部署Consul服务注册中心..."
# 创建Consul配置
cat > consul-config.json << EOF
{
"datacenter": "dc1",
"data_dir": "/opt/consul/data",
"log_level": "INFO",
"server": true,
"bootstrap_expect": 3,
"bind_addr": "0.0.0.0",
"client_addr": "0.0.0.0",
"retry_join": ["consul-1", "consul-2", "consul-3"],
"ui_config": {
"enabled": true
},
"connect": {
"enabled": true
},
"ports": {
"grpc": 8502
}
}
EOF
# 部署Consul集群
docker network create consul-network || true
for i in {1..3}; do
docker run -d \
--name consul-$i \
--network consul-network \
-p $((8500 + i - 1)):8500 \
-p $((8600 + i - 1)):8600/udp \
-v $(pwd)/consul-config.json:/consul/config/config.json \
consul:latest agent -config-file=/consul/config/config.json
done
echo "Consul集群部署完成"
}
# 2. 部署Eureka服务注册中心
deploy_eureka() {
echo "部署Eureka服务注册中心..."
# 创建Eureka配置
cat > eureka-application.yml << EOF
server:
port: 8761
eureka:
instance:
hostname: localhost
client:
registerWithEureka: false
fetchRegistry: false
serviceUrl:
defaultZone: http://\${eureka.instance.hostname}:\${server.port}/eureka/
server:
enableSelfPreservation: false
evictionIntervalTimerInMs: 5000
spring:
application:
name: eureka-server
management:
endpoints:
web:
exposure:
include: health,info,metrics
EOF
# 使用Docker部署Eureka
docker run -d \
--name eureka-server \
-p 8761:8761 \
-v $(pwd)/eureka-application.yml:/app/application.yml \
springcloud/eureka:latest
echo "Eureka服务注册中心部署完成"
}
# 3. 配置负载均衡器
configure_load_balancer() {
echo "配置负载均衡器..."
# 创建Nginx配置
cat > nginx.conf << EOF
upstream user-service {
least_conn;
server user-service-1:8080;
server user-service-2:8080;
server user-service-3:8080;
}
upstream order-service {
least_conn;
server order-service-1:8080;
server order-service-2:8080;
}
server {
listen 80;
server_name api.example.com;
location /api/users/ {
proxy_pass http://user-service/;
proxy_set_header Host \$host;
proxy_set_header X-Real-IP \$remote_addr;
proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
# 健康检查
proxy_next_upstream error timeout invalid_header http_500 http_502 http_503;
proxy_connect_timeout 5s;
proxy_send_timeout 10s;
proxy_read_timeout 10s;
}
location /api/orders/ {
proxy_pass http://order-service/;
proxy_set_header Host \$host;
proxy_set_header X-Real-IP \$remote_addr;
proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
proxy_next_upstream error timeout invalid_header http_500 http_502 http_503;
proxy_connect_timeout 5s;
proxy_send_timeout 10s;
proxy_read_timeout 10s;
}
# 健康检查端点
location /health {
access_log off;
return 200 "healthy\n";
add_header Content-Type text/plain;
}
}
EOF
# 部署Nginx负载均衡器
docker run -d \
--name api-gateway \
-p 80:80 \
-v $(pwd)/nginx.conf:/etc/nginx/conf.d/default.conf \
nginx:alpine
echo "负载均衡器配置完成"
}
# 4. 部署服务网格(Istio)
deploy_istio() {
echo "部署Istio服务网格..."
# 下载并安装Istio
curl -L https://istio.io/downloadIstio | sh -
cd istio-*
export PATH=$PWD/bin:$PATH
# 安装Istio
istioctl install --set values.defaultRevision=default -y
# 启用自动注入
kubectl label namespace default istio-injection=enabled
# 创建Gateway配置
cat > gateway.yaml << EOF
apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
name: microservices-gateway
spec:
selector:
istio: ingressgateway
servers:
- port:
number: 80
name: http
protocol: HTTP
hosts:
- "*"
---
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: microservices-vs
spec:
hosts:
- "*"
gateways:
- microservices-gateway
http:
- match:
- uri:
prefix: /api/users
route:
- destination:
host: user-service
port:
number: 8080
- match:
- uri:
prefix: /api/orders
route:
- destination:
host: order-service
port:
number: 8080
EOF
kubectl apply -f gateway.yaml
echo "Istio服务网格部署完成"
}
# 主函数
main() {
echo "开始部署微服务基础设施..."
# 选择部署方案
read -p "选择部署方案 (1: Consul, 2: Eureka, 3: Istio): " choice
case $choice in
1)
deploy_consul
configure_load_balancer
;;
2)
deploy_eureka
configure_load_balancer
;;
3)
deploy_istio
;;
*)
echo "无效选择,部署所有组件..."
deploy_consul
deploy_eureka
configure_load_balancer
;;
esac
echo "微服务基础设施部署完成!"
echo "访问地址:"
echo "- Consul UI: http://localhost:8500"
echo "- Eureka UI: http://localhost:8761"
echo "- API Gateway: http://localhost"
}
main "$@"
API网关设计
API网关是微服务架构的入口点,负责请求路由、认证授权、限流等功能。
API网关实现
#!/usr/bin/env python3
"""
微服务API网关实现
包括路由、认证、限流、监控等功能
"""
import asyncio
import aiohttp
import jwt
import time
import json
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
import logging
import hashlib
import redis
from aiohttp import web, ClientSession
import yaml
class AuthenticationMethod(Enum):
"""认证方法"""
JWT = "jwt"
API_KEY = "api_key"
OAUTH2 = "oauth2"
BASIC = "basic"
class RateLimitStrategy(Enum):
"""限流策略"""
FIXED_WINDOW = "fixed_window"
SLIDING_WINDOW = "sliding_window"
TOKEN_BUCKET = "token_bucket"
LEAKY_BUCKET = "leaky_bucket"
@dataclass
class RouteConfig:
"""路由配置"""
path: str
method: str
service_name: str
service_url: str
timeout: int = 30
retries: int = 3
auth_required: bool = True
rate_limit: Optional[Dict[str, Any]] = None
circuit_breaker: bool = True
@dataclass
class ServiceInstance:
"""服务实例"""
id: str
host: str
port: int
weight: int = 1
healthy: bool = True
last_health_check: datetime = None
class LoadBalancer:
"""负载均衡器"""
def __init__(self, strategy: str = "round_robin"):
self.strategy = strategy
self.current_index = 0
def select_instance(self, instances: List[ServiceInstance]) -> Optional[ServiceInstance]:
"""选择服务实例"""
healthy_instances = [inst for inst in instances if inst.healthy]
if not healthy_instances:
return None
if self.strategy == "round_robin":
return self._round_robin(healthy_instances)
elif self.strategy == "weighted_round_robin":
return self._weighted_round_robin(healthy_instances)
elif self.strategy == "least_connections":
return self._least_connections(healthy_instances)
else:
return healthy_instances[0]
def _round_robin(self, instances: List[ServiceInstance]) -> ServiceInstance:
"""轮询算法"""
instance = instances[self.current_index % len(instances)]
self.current_index += 1
return instance
def _weighted_round_robin(self, instances: List[ServiceInstance]) -> ServiceInstance:
"""加权轮询算法"""
total_weight = sum(inst.weight for inst in instances)
current_weight = self.current_index % total_weight
cumulative_weight = 0
for instance in instances:
cumulative_weight += instance.weight
if current_weight < cumulative_weight:
self.current_index += 1
return instance
return instances[0]
def _least_connections(self, instances: List[ServiceInstance]) -> ServiceInstance:
"""最少连接算法(简化实现)"""
# 这里应该跟踪每个实例的连接数
# 简化实现,返回第一个实例
return instances[0]
class RateLimiter:
"""限流器"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def is_allowed(self, key: str, limit: int, window: int,
strategy: RateLimitStrategy = RateLimitStrategy.FIXED_WINDOW) -> bool:
"""检查是否允许请求"""
if strategy == RateLimitStrategy.FIXED_WINDOW:
return await self._fixed_window(key, limit, window)
elif strategy == RateLimitStrategy.SLIDING_WINDOW:
return await self._sliding_window(key, limit, window)
elif strategy == RateLimitStrategy.TOKEN_BUCKET:
return await self._token_bucket(key, limit, window)
else:
return True
async def _fixed_window(self, key: str, limit: int, window: int) -> bool:
"""固定窗口限流"""
current_window = int(time.time()) // window
window_key = f"{key}:{current_window}"
try:
current_count = await self.redis.get(window_key)
current_count = int(current_count) if current_count else 0
if current_count >= limit:
return False
pipe = self.redis.pipeline()
pipe.incr(window_key)
pipe.expire(window_key, window)
await pipe.execute()
return True
except Exception:
# Redis异常时允许请求通过
return True
async def _sliding_window(self, key: str, limit: int, window: int) -> bool:
"""滑动窗口限流"""
now = time.time()
window_start = now - window
try:
# 清理过期记录
await self.redis.zremrangebyscore(key, 0, window_start)
# 检查当前窗口内的请求数
current_count = await self.redis.zcard(key)
if current_count >= limit:
return False
# 添加当前请求
await self.redis.zadd(key, {str(now): now})
await self.redis.expire(key, window)
return True
except Exception:
return True
async def _token_bucket(self, key: str, capacity: int, refill_rate: int) -> bool:
"""令牌桶限流"""
now = time.time()
try:
bucket_data = await self.redis.hmget(key, 'tokens', 'last_refill')
tokens = float(bucket_data[0]) if bucket_data[0] else capacity
last_refill = float(bucket_data[1]) if bucket_data[1] else now
# 计算需要添加的令牌数
time_passed = now - last_refill
tokens_to_add = time_passed * refill_rate
tokens = min(capacity, tokens + tokens_to_add)
if tokens < 1:
return False
# 消费一个令牌
tokens -= 1
# 更新桶状态
await self.redis.hmset(key, {
'tokens': tokens,
'last_refill': now
})
await self.redis.expire(key, 3600) # 1小时过期
return True
except Exception:
return True
class CircuitBreaker:
"""熔断器"""
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
def can_execute(self) -> bool:
"""检查是否可以执行请求"""
if self.state == "CLOSED":
return True
elif self.state == "OPEN":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "HALF_OPEN"
return True
return False
else: # HALF_OPEN
return True
def record_success(self):
"""记录成功"""
if self.state == "HALF_OPEN":
self.state = "CLOSED"
self.failure_count = 0
def record_failure(self):
"""记录失败"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
class APIGateway:
"""API网关"""
def __init__(self, config_file: str):
self.config = self._load_config(config_file)
self.routes = {}
self.services = {}
self.load_balancer = LoadBalancer()
self.rate_limiter = None
self.circuit_breakers = {}
self.session = None
self.logger = logging.getLogger(__name__)
# 初始化Redis连接
if 'redis' in self.config:
redis_config = self.config['redis']
self.redis_client = redis.Redis(
host=redis_config.get('host', 'localhost'),
port=redis_config.get('port', 6379),
db=redis_config.get('db', 0)
)
self.rate_limiter = RateLimiter(self.redis_client)
def _load_config(self, config_file: str) -> Dict[str, Any]:
"""加载配置文件"""
with open(config_file, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
async def start(self):
"""启动网关"""
self.session = ClientSession()
# 加载路由配置
self._load_routes()
# 加载服务配置
self._load_services()
# 创建Web应用
app = web.Application(middlewares=[
self._auth_middleware,
self._rate_limit_middleware,
self._logging_middleware
])
# 添加路由
app.router.add_route('*', '/{path:.*}', self._handle_request)
# 启动健康检查
asyncio.create_task(self._health_check_loop())
return app
def _load_routes(self):
"""加载路由配置"""
for route_config in self.config.get('routes', []):
route = RouteConfig(**route_config)
route_key = f"{route.method}:{route.path}"
self.routes[route_key] = route
# 初始化熔断器
if route.circuit_breaker:
self.circuit_breakers[route_key] = CircuitBreaker()
def _load_services(self):
"""加载服务配置"""
for service_name, service_config in self.config.get('services', {}).items():
instances = []
for instance_config in service_config.get('instances', []):
instance = ServiceInstance(**instance_config)
instances.append(instance)
self.services[service_name] = instances
async def _auth_middleware(self, request: web.Request, handler):
"""认证中间件"""
# 查找匹配的路由
route = self._find_route(request)
if not route or not route.auth_required:
return await handler(request)
# 检查认证
auth_header = request.headers.get('Authorization')
if not auth_header:
return web.json_response(
{'error': 'Missing authorization header'},
status=401
)
try:
# JWT认证示例
if auth_header.startswith('Bearer '):
token = auth_header[7:]
payload = jwt.decode(
token,
self.config.get('jwt_secret', 'secret'),
algorithms=['HS256']
)
request['user'] = payload
else:
return web.json_response(
{'error': 'Invalid authorization header'},
status=401
)
except jwt.InvalidTokenError:
return web.json_response(
{'error': 'Invalid token'},
status=401
)
return await handler(request)
async def _rate_limit_middleware(self, request: web.Request, handler):
"""限流中间件"""
if not self.rate_limiter:
return await handler(request)
route = self._find_route(request)
if not route or not route.rate_limit:
return await handler(request)
# 生成限流键
user_id = request.get('user', {}).get('user_id', 'anonymous')
rate_limit_key = f"rate_limit:{route.service_name}:{user_id}"
# 检查限流
limit_config = route.rate_limit
allowed = await self.rate_limiter.is_allowed(
key=rate_limit_key,
limit=limit_config['requests'],
window=limit_config['window'],
strategy=RateLimitStrategy(limit_config.get('strategy', 'fixed_window'))
)
if not allowed:
return web.json_response(
{'error': 'Rate limit exceeded'},
status=429
)
return await handler(request)
async def _logging_middleware(self, request: web.Request, handler):
"""日志中间件"""
start_time = time.time()
try:
response = await handler(request)
duration = time.time() - start_time
self.logger.info(
f"{request.method} {request.path} - "
f"Status: {response.status} - "
f"Duration: {duration:.3f}s"
)
return response
except Exception as e:
duration = time.time() - start_time
self.logger.error(
f"{request.method} {request.path} - "
f"Error: {str(e)} - "
f"Duration: {duration:.3f}s"
)
raise
def _find_route(self, request: web.Request) -> Optional[RouteConfig]:
"""查找匹配的路由"""
method = request.method
path = request.path
# 精确匹配
route_key = f"{method}:{path}"
if route_key in self.routes:
return self.routes[route_key]
# 前缀匹配
for route_key, route in self.routes.items():
route_method, route_path = route_key.split(':', 1)
if method == route_method and path.startswith(route_path):
return route
return None
async def _handle_request(self, request: web.Request) -> web.Response:
"""处理请求"""
route = self._find_route(request)
if not route:
return web.json_response(
{'error': 'Route not found'},
status=404
)
# 检查熔断器
route_key = f"{request.method}:{route.path}"
circuit_breaker = self.circuit_breakers.get(route_key)
if circuit_breaker and not circuit_breaker.can_execute():
return web.json_response(
{'error': 'Service temporarily unavailable'},
status=503
)
# 选择服务实例
instances = self.services.get(route.service_name, [])
instance = self.load_balancer.select_instance(instances)
if not instance:
return web.json_response(
{'error': 'No healthy service instances available'},
status=503
)
# 转发请求
try:
response = await self._forward_request(request, route, instance)
if circuit_breaker:
circuit_breaker.record_success()
return response
except Exception as e:
if circuit_breaker:
circuit_breaker.record_failure()
self.logger.error(f"Request forwarding failed: {str(e)}")
return web.json_response(
{'error': 'Internal server error'},
status=500
)
async def _forward_request(self, request: web.Request,
route: RouteConfig,
instance: ServiceInstance) -> web.Response:
"""转发请求到后端服务"""
# 构建目标URL
target_url = f"http://{instance.host}:{instance.port}{request.path_qs}"
# 准备请求头
headers = dict(request.headers)
headers['X-Forwarded-For'] = request.remote
headers['X-Forwarded-Proto'] = request.scheme
headers['X-Forwarded-Host'] = request.host
# 读取请求体
body = await request.read() if request.can_read_body else None
# 发送请求
timeout = aiohttp.ClientTimeout(total=route.timeout)
async with self.session.request(
method=request.method,
url=target_url,
headers=headers,
data=body,
timeout=timeout
) as response:
# 读取响应
response_body = await response.read()
# 构建响应
return web.Response(
body=response_body,
status=response.status,
headers=response.headers
)
async def _health_check_loop(self):
"""健康检查循环"""
while True:
try:
await self._perform_health_checks()
await asyncio.sleep(30) # 每30秒检查一次
except Exception as e:
self.logger.error(f"Health check failed: {str(e)}")
await asyncio.sleep(10)
async def _perform_health_checks(self):
"""执行健康检查"""
for service_name, instances in self.services.items():
for instance in instances:
try:
health_url = f"http://{instance.host}:{instance.port}/health"
timeout = aiohttp.ClientTimeout(total=5)
async with self.session.get(health_url, timeout=timeout) as response:
instance.healthy = response.status == 200
instance.last_health_check = datetime.utcnow()
except Exception:
instance.healthy = False
instance.last_health_check = datetime.utcnow()
async def stop(self):
"""停止网关"""
if self.session:
await self.session.close()
# 配置文件示例
GATEWAY_CONFIG = """
jwt_secret: "your-secret-key"
redis:
host: localhost
port: 6379
db: 0
routes:
- path: "/api/users"
method: "GET"
service_name: "user-service"
service_url: "http://user-service:8080"
auth_required: true
rate_limit:
requests: 100
window: 60
strategy: "sliding_window"
circuit_breaker: true
- path: "/api/orders"
method: "POST"
service_name: "order-service"
service_url: "http://order-service:8080"
auth_required: true
rate_limit:
requests: 50
window: 60
strategy: "token_bucket"
circuit_breaker: true
services:
user-service:
instances:
- id: "user-1"
host: "user-service-1"
port: 8080
weight: 1
- id: "user-2"
host: "user-service-2"
port: 8080
weight: 1
order-service:
instances:
- id: "order-1"
host: "order-service-1"
port: 8080
weight: 2
- id: "order-2"
host: "order-service-2"
port: 8080
weight: 1
"""
async def main():
"""主函数 - 启动API网关"""
# 创建配置文件
with open('gateway-config.yaml', 'w') as f:
f.write(GATEWAY_CONFIG)
# 创建并启动网关
gateway = APIGateway('gateway-config.yaml')
app = await gateway.start()
# 启动Web服务器
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
print("API网关启动成功,监听端口: 8080")
print("访问地址: http://localhost:8080")
try:
# 保持运行
await asyncio.Event().wait()
finally:
await gateway.stop()
await runner.cleanup()
if __name__ == "__main__":
asyncio.run(main())