跳转到主要内容

云安全架构设计:零信任模型的实施

博主
28 分钟
5762 字
--

AI 导读

深刻理解和准确把握"云安全架构设计:零信任模型的实施"这一重要概念的核心要义,本文从理论基础、实践应用和发展前景等多个维度进行了系统性阐述,为读者提供了全面而深入的分析视角。

内容由AI智能生成

云安全架构设计:零信任模型的实施

引言

在传统的网络安全模型中,企业通常采用"城堡和护城河"的防护策略,即在网络边界建立强大的防护措施,而对内部网络相对信任。然而,随着云计算、移动办公、物联网等技术的快速发展,传统的边界防护模式已经无法满足现代企业的安全需求。零信任安全模型应运而生,它基于"永不信任,始终验证"的核心理念,为云环境提供了更加安全、灵活的防护方案。

目录

  1. 零信任安全模型概述
  2. 零信任架构核心组件
  3. 身份与访问管理
  4. 网络微分段
  5. 数据保护与加密
  6. 设备信任与管理
  7. 应用安全
  8. 监控与分析
  9. 实施策略与最佳实践

零信任安全模型概述

零信任安全模型是一种网络安全范式,它假设网络内外都存在威胁,因此不会自动信任任何用户、设备或应用程序。该模型要求对每个访问请求进行严格的身份验证和授权,无论请求来自网络内部还是外部。

零信任架构图

graph TB
    subgraph "零信任云安全架构"
        subgraph "用户层"
            U1[内部用户]
            U2[外部用户]
            U3[合作伙伴]
            U4[移动设备]
        end
        
        subgraph "身份与访问管理层"
            IAM[身份认证中心]
            MFA[多因子认证]
            PAM[特权访问管理]
            RBAC[基于角色的访问控制]
        end
        
        subgraph "策略引擎层"
            PE[策略引擎]
            RA[风险评估]
            AD[异常检测]
            ML[机器学习分析]
        end
        
        subgraph "网络安全层"
            SDP[软件定义边界]
            MSG[微分段网关]
            FW[下一代防火墙]
            IPS[入侵防护系统]
        end
        
        subgraph "数据保护层"
            DLP[数据防泄漏]
            ENC[端到端加密]
            KMS[密钥管理服务]
            CASB[云访问安全代理]
        end
        
        subgraph "应用与服务层"
            API[API网关]
            MS[微服务]
            DB[(数据库)]
            FS[文件存储]
        end
        
        subgraph "监控与分析层"
            SIEM[安全信息与事件管理]
            SOAR[安全编排与自动响应]
            UBA[用户行为分析]
            TI[威胁情报]
        end
    end
    
    U1 --> IAM
    U2 --> IAM
    U3 --> IAM
    U4 --> IAM
    
    IAM --> PE
    MFA --> PE
    PAM --> PE
    RBAC --> PE
    
    PE --> SDP
    RA --> MSG
    AD --> FW
    ML --> IPS
    
    SDP --> API
    MSG --> MS
    FW --> DB
    IPS --> FS
    
    DLP --> SIEM
    ENC --> SOAR
    KMS --> UBA
    CASB --> TI
    
    style IAM fill:#e1f5fe
    style PE fill:#f3e5f5
    style SDP fill:#e8f5e8
    style DLP fill:#fff3e0
    style SIEM fill:#fce4ec

零信任核心原则

  1. 永不信任,始终验证 - 对所有访问请求进行验证
  2. 最小权限原则 - 仅授予完成任务所需的最小权限
  3. 假设违规 - 假设网络已被入侵,设计相应的防护措施
  4. 显式验证 - 基于所有可用数据点进行访问决策
  5. 持续监控 - 实时监控和分析所有活动

零信任架构核心组件

零信任架构分析器

#!/usr/bin/env python3
"""
零信任架构分析器
评估和设计零信任安全架构
"""

import json
import logging
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import hashlib
import time
from datetime import datetime, timedelta

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TrustLevel(Enum):
    """信任级别枚举"""
    UNTRUSTED = 0
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    VERIFIED = 4

class RiskLevel(Enum):
    """风险级别枚举"""
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4

@dataclass
class User:
    """用户实体"""
    user_id: str
    username: str
    email: str
    roles: List[str]
    department: str
    trust_score: float
    last_login: datetime
    failed_attempts: int
    mfa_enabled: bool
    device_registered: bool

@dataclass
class Device:
    """设备实体"""
    device_id: str
    device_type: str
    os_version: str
    security_patch_level: str
    compliance_status: bool
    trust_score: float
    last_seen: datetime
    location: str
    owner: str

@dataclass
class AccessRequest:
    """访问请求"""
    request_id: str
    user_id: str
    device_id: str
    resource: str
    action: str
    timestamp: datetime
    source_ip: str
    location: str
    user_agent: str

@dataclass
class PolicyRule:
    """策略规则"""
    rule_id: str
    name: str
    description: str
    conditions: Dict[str, Any]
    actions: List[str]
    priority: int
    enabled: bool

class ZeroTrustArchitectureAnalyzer:
    """零信任架构分析器"""
    
    def __init__(self):
        self.users: Dict[str, User] = {}
        self.devices: Dict[str, Device] = {}
        self.policies: Dict[str, PolicyRule] = {}
        self.access_logs: List[AccessRequest] = []
        self.risk_scores: Dict[str, float] = {}
        
    def register_user(self, user: User) -> bool:
        """注册用户"""
        try:
            # 验证用户信息
            if not user.user_id or not user.email:
                logger.error("用户ID和邮箱不能为空")
                return False
            
            # 计算初始信任分数
            trust_score = self._calculate_user_trust_score(user)
            user.trust_score = trust_score
            
            self.users[user.user_id] = user
            logger.info(f"用户 {user.username} 注册成功,信任分数: {trust_score}")
            return True
            
        except Exception as e:
            logger.error(f"用户注册失败: {e}")
            return False
    
    def register_device(self, device: Device) -> bool:
        """注册设备"""
        try:
            # 验证设备信息
            if not device.device_id or not device.device_type:
                logger.error("设备ID和类型不能为空")
                return False
            
            # 计算设备信任分数
            trust_score = self._calculate_device_trust_score(device)
            device.trust_score = trust_score
            
            self.devices[device.device_id] = device
            logger.info(f"设备 {device.device_id} 注册成功,信任分数: {trust_score}")
            return True
            
        except Exception as e:
            logger.error(f"设备注册失败: {e}")
            return False
    
    def create_policy(self, policy: PolicyRule) -> bool:
        """创建策略规则"""
        try:
            if not policy.rule_id or not policy.name:
                logger.error("策略ID和名称不能为空")
                return False
            
            self.policies[policy.rule_id] = policy
            logger.info(f"策略 {policy.name} 创建成功")
            return True
            
        except Exception as e:
            logger.error(f"策略创建失败: {e}")
            return False
    
    def evaluate_access_request(self, request: AccessRequest) -> Dict[str, Any]:
        """评估访问请求"""
        try:
            # 获取用户和设备信息
            user = self.users.get(request.user_id)
            device = self.devices.get(request.device_id)
            
            if not user:
                return {
                    'allowed': False,
                    'reason': '用户未注册',
                    'risk_level': RiskLevel.CRITICAL.name,
                    'trust_score': 0
                }
            
            if not device:
                return {
                    'allowed': False,
                    'reason': '设备未注册',
                    'risk_level': RiskLevel.HIGH.name,
                    'trust_score': 0
                }
            
            # 计算综合风险分数
            risk_score = self._calculate_risk_score(request, user, device)
            
            # 应用策略规则
            policy_result = self._apply_policies(request, user, device, risk_score)
            
            # 记录访问请求
            self.access_logs.append(request)
            
            # 更新用户和设备信任分数
            self._update_trust_scores(request, user, device, policy_result['allowed'])
            
            return {
                'allowed': policy_result['allowed'],
                'reason': policy_result['reason'],
                'risk_level': self._get_risk_level(risk_score).name,
                'trust_score': (user.trust_score + device.trust_score) / 2,
                'required_actions': policy_result.get('required_actions', []),
                'session_timeout': policy_result.get('session_timeout', 3600)
            }
            
        except Exception as e:
            logger.error(f"访问请求评估失败: {e}")
            return {
                'allowed': False,
                'reason': f'评估失败: {e}',
                'risk_level': RiskLevel.CRITICAL.name,
                'trust_score': 0
            }
    
    def _calculate_user_trust_score(self, user: User) -> float:
        """计算用户信任分数"""
        score = 50.0  # 基础分数
        
        # MFA启用加分
        if user.mfa_enabled:
            score += 20
        
        # 设备注册加分
        if user.device_registered:
            score += 15
        
        # 角色权重
        if 'admin' in user.roles:
            score += 10
        elif 'user' in user.roles:
            score += 5
        
        # 失败尝试扣分
        score -= user.failed_attempts * 5
        
        # 最近登录时间
        if user.last_login:
            days_since_login = (datetime.now() - user.last_login).days
            if days_since_login > 30:
                score -= 10
            elif days_since_login > 7:
                score -= 5
        
        return max(0, min(100, score))
    
    def _calculate_device_trust_score(self, device: Device) -> float:
        """计算设备信任分数"""
        score = 50.0  # 基础分数
        
        # 合规状态
        if device.compliance_status:
            score += 25
        else:
            score -= 20
        
        # 设备类型
        if device.device_type in ['laptop', 'desktop']:
            score += 10
        elif device.device_type in ['mobile', 'tablet']:
            score += 5
        
        # 安全补丁级别
        if device.security_patch_level == 'latest':
            score += 15
        elif device.security_patch_level == 'recent':
            score += 10
        elif device.security_patch_level == 'outdated':
            score -= 15
        
        # 最近活动
        if device.last_seen:
            hours_since_seen = (datetime.now() - device.last_seen).total_seconds() / 3600
            if hours_since_seen > 168:  # 一周
                score -= 10
            elif hours_since_seen > 24:  # 一天
                score -= 5
        
        return max(0, min(100, score))
    
    def _calculate_risk_score(self, request: AccessRequest, user: User, device: Device) -> float:
        """计算访问请求风险分数"""
        risk_score = 0.0
        
        # 用户风险因素
        if user.failed_attempts > 3:
            risk_score += 30
        
        if not user.mfa_enabled:
            risk_score += 25
        
        # 设备风险因素
        if not device.compliance_status:
            risk_score += 35
        
        if device.security_patch_level == 'outdated':
            risk_score += 20
        
        # 访问模式风险
        risk_score += self._analyze_access_pattern(request, user)
        
        # 地理位置风险
        risk_score += self._analyze_location_risk(request, user, device)
        
        # 时间风险
        risk_score += self._analyze_time_risk(request)
        
        return min(100, risk_score)
    
    def _analyze_access_pattern(self, request: AccessRequest, user: User) -> float:
        """分析访问模式风险"""
        risk = 0.0
        
        # 获取用户历史访问记录
        user_logs = [log for log in self.access_logs if log.user_id == user.user_id]
        
        if len(user_logs) < 5:
            # 新用户或访问记录少
            risk += 10
        else:
            # 分析访问频率
            recent_logs = [log for log in user_logs 
                          if (datetime.now() - log.timestamp).days <= 7]
            
            if len(recent_logs) > 50:
                # 异常高频访问
                risk += 15
            
            # 分析访问资源类型
            accessed_resources = set(log.resource for log in recent_logs)
            if request.resource not in accessed_resources:
                # 访问新资源
                risk += 5
        
        return risk
    
    def _analyze_location_risk(self, request: AccessRequest, user: User, device: Device) -> float:
        """分析地理位置风险"""
        risk = 0.0
        
        # 简化的地理位置风险分析
        # 实际实现中应该使用更精确的地理位置服务
        
        # 检查IP地址变化
        recent_logs = [log for log in self.access_logs 
                      if log.user_id == user.user_id and 
                      (datetime.now() - log.timestamp).hours <= 24]
        
        if recent_logs:
            recent_ips = set(log.source_ip for log in recent_logs)
            if request.source_ip not in recent_ips:
                risk += 10
        
        # 检查设备位置变化
        if device.location != request.location:
            risk += 15
        
        return risk
    
    def _analyze_time_risk(self, request: AccessRequest) -> float:
        """分析时间风险"""
        risk = 0.0
        
        # 检查访问时间
        hour = request.timestamp.hour
        
        # 非工作时间访问
        if hour < 6 or hour > 22:
            risk += 10
        
        # 周末访问
        if request.timestamp.weekday() >= 5:
            risk += 5
        
        return risk
    
    def _apply_policies(self, request: AccessRequest, user: User, device: Device, risk_score: float) -> Dict[str, Any]:
        """应用策略规则"""
        result = {
            'allowed': False,
            'reason': '默认拒绝',
            'required_actions': [],
            'session_timeout': 3600
        }
        
        # 按优先级排序策略
        sorted_policies = sorted(self.policies.values(), key=lambda p: p.priority, reverse=True)
        
        for policy in sorted_policies:
            if not policy.enabled:
                continue
            
            if self._match_policy_conditions(policy, request, user, device, risk_score):
                # 执行策略动作
                for action in policy.actions:
                    if action == 'allow':
                        result['allowed'] = True
                        result['reason'] = f'策略 {policy.name} 允许访问'
                    elif action == 'deny':
                        result['allowed'] = False
                        result['reason'] = f'策略 {policy.name} 拒绝访问'
                        return result  # 拒绝策略立即返回
                    elif action == 'require_mfa':
                        result['required_actions'].append('mfa')
                    elif action == 'require_device_verification':
                        result['required_actions'].append('device_verification')
                    elif action.startswith('session_timeout:'):
                        timeout = int(action.split(':')[1])
                        result['session_timeout'] = timeout
                
                # 如果匹配到策略,停止处理后续策略
                break
        
        return result
    
    def _match_policy_conditions(self, policy: PolicyRule, request: AccessRequest, 
                                user: User, device: Device, risk_score: float) -> bool:
        """匹配策略条件"""
        conditions = policy.conditions
        
        # 检查用户条件
        if 'user_roles' in conditions:
            required_roles = conditions['user_roles']
            if not any(role in user.roles for role in required_roles):
                return False
        
        if 'user_trust_score_min' in conditions:
            if user.trust_score < conditions['user_trust_score_min']:
                return False
        
        # 检查设备条件
        if 'device_compliance' in conditions:
            if device.compliance_status != conditions['device_compliance']:
                return False
        
        if 'device_trust_score_min' in conditions:
            if device.trust_score < conditions['device_trust_score_min']:
                return False
        
        # 检查风险条件
        if 'max_risk_score' in conditions:
            if risk_score > conditions['max_risk_score']:
                return False
        
        # 检查资源条件
        if 'resources' in conditions:
            allowed_resources = conditions['resources']
            if request.resource not in allowed_resources:
                return False
        
        # 检查时间条件
        if 'allowed_hours' in conditions:
            allowed_hours = conditions['allowed_hours']
            current_hour = request.timestamp.hour
            if current_hour not in allowed_hours:
                return False
        
        return True
    
    def _get_risk_level(self, risk_score: float) -> RiskLevel:
        """获取风险级别"""
        if risk_score >= 75:
            return RiskLevel.CRITICAL
        elif risk_score >= 50:
            return RiskLevel.HIGH
        elif risk_score >= 25:
            return RiskLevel.MEDIUM
        else:
            return RiskLevel.LOW
    
    def _update_trust_scores(self, request: AccessRequest, user: User, device: Device, allowed: bool):
        """更新信任分数"""
        if allowed:
            # 成功访问,轻微提升信任分数
            user.trust_score = min(100, user.trust_score + 1)
            device.trust_score = min(100, device.trust_score + 1)
            user.failed_attempts = 0
        else:
            # 访问被拒绝,降低信任分数
            user.trust_score = max(0, user.trust_score - 5)
            device.trust_score = max(0, device.trust_score - 3)
            user.failed_attempts += 1
    
    def generate_security_report(self) -> Dict[str, Any]:
        """生成安全报告"""
        total_requests = len(self.access_logs)
        if total_requests == 0:
            return {'message': '暂无访问记录'}
        
        # 统计访问成功率(这里简化处理,实际需要记录访问结果)
        recent_logs = [log for log in self.access_logs 
                      if (datetime.now() - log.timestamp).days <= 7]
        
        # 用户风险分析
        high_risk_users = [user for user in self.users.values() 
                          if user.trust_score < 30]
        
        # 设备风险分析
        non_compliant_devices = [device for device in self.devices.values() 
                               if not device.compliance_status]
        
        # 访问模式分析
        resource_access_count = {}
        for log in recent_logs:
            resource_access_count[log.resource] = resource_access_count.get(log.resource, 0) + 1
        
        most_accessed_resources = sorted(resource_access_count.items(), 
                                       key=lambda x: x[1], reverse=True)[:10]
        
        return {
            'report_time': datetime.now().isoformat(),
            'total_users': len(self.users),
            'total_devices': len(self.devices),
            'total_policies': len(self.policies),
            'recent_access_requests': len(recent_logs),
            'high_risk_users': len(high_risk_users),
            'non_compliant_devices': len(non_compliant_devices),
            'most_accessed_resources': most_accessed_resources,
            'average_user_trust_score': sum(u.trust_score for u in self.users.values()) / len(self.users) if self.users else 0,
            'average_device_trust_score': sum(d.trust_score for d in self.devices.values()) / len(self.devices) if self.devices else 0
        }

def main():
    """主函数 - 演示零信任架构分析器"""
    analyzer = ZeroTrustArchitectureAnalyzer()
    
    # 注册用户
    user1 = User(
        user_id="user001",
        username="alice",
        email="alice@company.com",
        roles=["user", "developer"],
        department="engineering",
        trust_score=0,
        last_login=datetime.now() - timedelta(days=1),
        failed_attempts=0,
        mfa_enabled=True,
        device_registered=True
    )
    analyzer.register_user(user1)
    
    # 注册设备
    device1 = Device(
        device_id="device001",
        device_type="laptop",
        os_version="Windows 11",
        security_patch_level="latest",
        compliance_status=True,
        trust_score=0,
        last_seen=datetime.now(),
        location="Beijing",
        owner="alice"
    )
    analyzer.register_device(device1)
    
    # 创建策略
    policy1 = PolicyRule(
        rule_id="policy001",
        name="开发者访问策略",
        description="允许开发者访问开发环境",
        conditions={
            "user_roles": ["developer"],
            "device_compliance": True,
            "max_risk_score": 30,
            "resources": ["dev-server", "test-db"],
            "allowed_hours": list(range(8, 18))  # 8:00-18:00
        },
        actions=["allow", "session_timeout:7200"],
        priority=10,
        enabled=True
    )
    analyzer.create_policy(policy1)
    
    # 模拟访问请求
    request1 = AccessRequest(
        request_id="req001",
        user_id="user001",
        device_id="device001",
        resource="dev-server",
        action="read",
        timestamp=datetime.now(),
        source_ip="192.168.1.100",
        location="Beijing",
        user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
    )
    
    # 评估访问请求
    result = analyzer.evaluate_access_request(request1)
    print("访问请求评估结果:")
    print(json.dumps(result, indent=2, ensure_ascii=False))
    
    # 生成安全报告
    report = analyzer.generate_security_report()
    print("\n安全报告:")
    print(json.dumps(report, indent=2, ensure_ascii=False))

if __name__ == "__main__":
    main()

最佳实践与建议

架构设计原则

  1. 最小权限原则

    • 默认拒绝所有访问
    • 基于需要授予最小权限
    • 定期审查和调整权限
  2. 持续验证

    • 每次访问都进行身份验证
    • 实时监控用户和设备行为
    • 动态调整信任级别
  3. 深度防御

    • 多层安全控制
    • 冗余安全机制
    • 故障安全设计

实施策略

  1. 分阶段实施

    • 从核心资产开始
    • 逐步扩展到所有系统
    • 持续优化和改进
  2. 用户体验平衡

    • 透明的安全控制
    • 简化认证流程
    • 智能风险评估
  3. 合规性考虑

    • 满足行业标准
    • 数据保护法规
    • 审计要求

技术选型建议

  1. 身份提供商

    • Azure AD / AWS IAM
    • Okta / Auth0
    • 自建身份系统
  2. 网络安全

    • Zscaler / Palo Alto Prisma
    • Cloudflare Access
    • 自建SDP解决方案
  3. 数据保护

    • HashiCorp Vault
    • AWS KMS / Azure Key Vault
    • 自建密钥管理系统

总结

零信任云安全架构代表了现代网络安全的发展方向,通过"永不信任,始终验证"的核心理念,为云环境提供了更加安全、灵活和可扩展的安全保护。

核心价值

  1. 安全性提升

    • 减少攻击面
    • 限制横向移动
    • 提高威胁检测能力
  2. 灵活性增强

    • 支持远程办公
    • 适应云原生架构
    • 简化网络管理
  3. 合规性保障

    • 满足监管要求
    • 提供审计追踪
    • 保护敏感数据

实施要点

  1. 组织准备

    • 高层支持和承诺
    • 跨部门协作
    • 充分的资源投入
  2. 技术准备

    • 现有系统评估
    • 技术栈选择
    • 人员技能培训
  3. 持续改进

    • 定期安全评估
    • 威胁情报更新
    • 技术演进跟进

零信任架构的成功实施需要技术、流程和人员的协调配合,是一个持续演进的过程。通过合理的规划、分阶段的实施和持续的优化,组织可以构建起真正安全、可靠的云安全防护体系。


## 网络微分段

网络微分段是零信任架构的重要组成部分,通过将网络划分为更小的安全区域,限制横向移动,减少攻击面。

### 软件定义边界(SDP)实现

```bash
#!/bin/bash
"""
软件定义边界(SDP)部署脚本
实现基于身份的网络访问控制
"""

set -euo pipefail

# 配置变量
SDP_NAMESPACE="zero-trust-sdp"
CONTROLLER_IMAGE="sdp-controller:latest"
GATEWAY_IMAGE="sdp-gateway:latest"
CLIENT_IMAGE="sdp-client:latest"

# 日志函数
log() {
    echo "[$(date +'%Y-%m-%d %H:%M:%S')] $1"
}

# 创建命名空间
create_namespace() {
    log "创建SDP命名空间..."
    
    kubectl create namespace ${SDP_NAMESPACE} --dry-run=client -o yaml | kubectl apply -f -
    
    # 添加标签
    kubectl label namespace ${SDP_NAMESPACE} \
        zero-trust=enabled \
        network-policy=strict \
        --overwrite
}

# 部署SDP控制器
deploy_sdp_controller() {
    log "部署SDP控制器..."
    
    cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
  name: sdp-controller-config
  namespace: ${SDP_NAMESPACE}
data:
  config.yaml: |
    controller:
      listen_port: 8443
      tls:
        cert_file: /etc/certs/tls.crt
        key_file: /etc/certs/tls.key
      database:
        type: postgresql
        host: postgres-service
        port: 5432
        database: sdp
        username: sdp_user
        password_secret: postgres-secret
      authentication:
        jwt_secret: jwt-secret
        token_expiry: 3600
      authorization:
        policy_engine: opa
        policy_url: http://opa-service:8181/v1/data/sdp/allow
    
    logging:
      level: info
      format: json
    
    metrics:
      enabled: true
      port: 9090
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: sdp-controller
  namespace: ${SDP_NAMESPACE}
  labels:
    app: sdp-controller
    component: control-plane
spec:
  replicas: 2
  selector:
    matchLabels:
      app: sdp-controller
  template:
    metadata:
      labels:
        app: sdp-controller
        component: control-plane
    spec:
      serviceAccountName: sdp-controller
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        fsGroup: 2000
      containers:
      - name: controller
        image: ${CONTROLLER_IMAGE}
        imagePullPolicy: Always
        ports:
        - containerPort: 8443
          name: https
        - containerPort: 9090
          name: metrics
        env:
        - name: CONFIG_FILE
          value: /etc/config/config.yaml
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: POD_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        volumeMounts:
        - name: config
          mountPath: /etc/config
        - name: certs
          mountPath: /etc/certs
          readOnly: true
        - name: jwt-secret
          mountPath: /etc/secrets/jwt
          readOnly: true
        livenessProbe:
          httpGet:
            path: /health
            port: 8443
            scheme: HTTPS
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8443
            scheme: HTTPS
          initialDelaySeconds: 5
          periodSeconds: 5
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 500m
            memory: 512Mi
      volumes:
      - name: config
        configMap:
          name: sdp-controller-config
      - name: certs
        secret:
          secretName: sdp-controller-tls
      - name: jwt-secret
        secret:
          secretName: jwt-secret
---
apiVersion: v1
kind: Service
metadata:
  name: sdp-controller-service
  namespace: ${SDP_NAMESPACE}
  labels:
    app: sdp-controller
spec:
  type: ClusterIP
  ports:
  - port: 8443
    targetPort: 8443
    protocol: TCP
    name: https
  - port: 9090
    targetPort: 9090
    protocol: TCP
    name: metrics
  selector:
    app: sdp-controller
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: sdp-controller
  namespace: ${SDP_NAMESPACE}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: sdp-controller
rules:
- apiGroups: [""]
  resources: ["pods", "services", "endpoints"]
  verbs: ["get", "list", "watch"]
- apiGroups: ["networking.k8s.io"]
  resources: ["networkpolicies"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["extensions", "apps"]
  resources: ["deployments", "replicasets"]
  verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: sdp-controller
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: sdp-controller
subjects:
- kind: ServiceAccount
  name: sdp-controller
  namespace: ${SDP_NAMESPACE}
EOF
}

# 部署SDP网关
deploy_sdp_gateway() {
    log "部署SDP网关..."
    
    cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
  name: sdp-gateway-config
  namespace: ${SDP_NAMESPACE}
data:
  config.yaml: |
    gateway:
      listen_port: 8080
      controller_url: https://sdp-controller-service:8443
      tls:
        cert_file: /etc/certs/tls.crt
        key_file: /etc/certs/tls.key
      
    spa:
      enabled: true
      timeout: 30
      max_attempts: 3
    
    tunneling:
      protocol: wireguard
      port_range: "51820-51920"
      encryption: chacha20poly1305
    
    logging:
      level: info
      format: json
    
    metrics:
      enabled: true
      port: 9091
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: sdp-gateway
  namespace: ${SDP_NAMESPACE}
  labels:
    app: sdp-gateway
    component: data-plane
spec:
  selector:
    matchLabels:
      app: sdp-gateway
  template:
    metadata:
      labels:
        app: sdp-gateway
        component: data-plane
    spec:
      serviceAccountName: sdp-gateway
      hostNetwork: true
      dnsPolicy: ClusterFirstWithHostNet
      securityContext:
        runAsNonRoot: false  # 需要root权限操作网络
      containers:
      - name: gateway
        image: ${GATEWAY_IMAGE}
        imagePullPolicy: Always
        securityContext:
          privileged: true
          capabilities:
            add:
            - NET_ADMIN
            - NET_RAW
        ports:
        - containerPort: 8080
          name: http
        - containerPort: 9091
          name: metrics
        env:
        - name: CONFIG_FILE
          value: /etc/config/config.yaml
        - name: NODE_NAME
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        - name: POD_IP
          valueFrom:
            fieldRef:
              fieldPath: status.podIP
        volumeMounts:
        - name: config
          mountPath: /etc/config
        - name: certs
          mountPath: /etc/certs
          readOnly: true
        - name: lib-modules
          mountPath: /lib/modules
          readOnly: true
        - name: dev-net-tun
          mountPath: /dev/net/tun
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 1000m
            memory: 1Gi
      volumes:
      - name: config
        configMap:
          name: sdp-gateway-config
      - name: certs
        secret:
          secretName: sdp-gateway-tls
      - name: lib-modules
        hostPath:
          path: /lib/modules
      - name: dev-net-tun
        hostPath:
          path: /dev/net/tun
      tolerations:
      - operator: Exists
        effect: NoSchedule
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: sdp-gateway
  namespace: ${SDP_NAMESPACE}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: sdp-gateway
rules:
- apiGroups: [""]
  resources: ["nodes", "pods"]
  verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: sdp-gateway
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: sdp-gateway
subjects:
- kind: ServiceAccount
  name: sdp-gateway
  namespace: ${SDP_NAMESPACE}
EOF
}

# 部署网络策略
deploy_network_policies() {
    log "部署网络策略..."
    
    cat <<EOF | kubectl apply -f -
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: default-deny-all
  namespace: ${SDP_NAMESPACE}
spec:
  podSelector: {}
  policyTypes:
  - Ingress
  - Egress
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: allow-sdp-controller
  namespace: ${SDP_NAMESPACE}
spec:
  podSelector:
    matchLabels:
      app: sdp-controller
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - podSelector:
        matchLabels:
          app: sdp-gateway
    - podSelector:
        matchLabels:
          app: sdp-client
    ports:
    - protocol: TCP
      port: 8443
  egress:
  - to:
    - namespaceSelector:
        matchLabels:
          name: kube-system
    ports:
    - protocol: TCP
      port: 53
    - protocol: UDP
      port: 53
  - to:
    - podSelector:
        matchLabels:
          app: postgres
    ports:
    - protocol: TCP
      port: 5432
  - to:
    - podSelector:
        matchLabels:
          app: opa
    ports:
    - protocol: TCP
      port: 8181
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: allow-sdp-gateway
  namespace: ${SDP_NAMESPACE}
spec:
  podSelector:
    matchLabels:
      app: sdp-gateway
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from: []  # 允许来自任何地方的连接(SPA端口敲门)
    ports:
    - protocol: TCP
      port: 8080
  - from:
    - podSelector:
        matchLabels:
          app: sdp-client
    ports:
    - protocol: UDP
      port: 51820
  egress:
  - to:
    - namespaceSelector:
        matchLabels:
          name: kube-system
    ports:
    - protocol: TCP
      port: 53
    - protocol: UDP
      port: 53
  - to:
    - podSelector:
        matchLabels:
          app: sdp-controller
    ports:
    - protocol: TCP
      port: 8443
  - to: []  # 允许访问受保护的资源
EOF
}

# 部署OPA策略引擎
deploy_opa_policies() {
    log "部署OPA策略引擎..."
    
    cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
  name: opa-policies
  namespace: ${SDP_NAMESPACE}
data:
  sdp.rego: |
    package sdp
    
    import future.keywords.if
    import future.keywords.in
    
    # 默认拒绝
    default allow = false
    
    # 允许访问的条件
    allow if {
        # 检查用户身份
        valid_user
        
        # 检查设备信任
        trusted_device
        
        # 检查访问时间
        valid_time
        
        # 检查资源权限
        has_resource_permission
    }
    
    # 验证用户身份
    valid_user if {
        input.user.authenticated == true
        input.user.mfa_verified == true
        input.user.trust_score >= 70
    }
    
    # 验证设备信任
    trusted_device if {
        input.device.registered == true
        input.device.compliant == true
        input.device.trust_score >= 60
    }
    
    # 验证访问时间
    valid_time if {
        # 工作时间访问
        hour := time.parse_rfc3339_ns(input.timestamp)[1]
        hour >= 8
        hour <= 18
    }
    
    valid_time if {
        # 管理员可以24小时访问
        "admin" in input.user.roles
    }
    
    # 验证资源权限
    has_resource_permission if {
        # 检查用户角色和资源的匹配
        some role in input.user.roles
        some permission in data.permissions[role]
        permission.resource == input.resource
        input.action in permission.actions
    }
    
    # 风险评估
    risk_score := score if {
        user_risk := calculate_user_risk
        device_risk := calculate_device_risk
        context_risk := calculate_context_risk
        score := (user_risk + device_risk + context_risk) / 3
    }
    
    calculate_user_risk := risk if {
        base_risk := 100 - input.user.trust_score
        
        # 失败尝试增加风险
        failure_risk := input.user.failed_attempts * 10
        
        # MFA未启用增加风险
        mfa_risk := 20 if not input.user.mfa_verified else 0
        
        risk := base_risk + failure_risk + mfa_risk
    }
    
    calculate_device_risk := risk if {
        base_risk := 100 - input.device.trust_score
        
        # 设备不合规增加风险
        compliance_risk := 30 if not input.device.compliant else 0
        
        # 设备未注册增加风险
        registration_risk := 40 if not input.device.registered else 0
        
        risk := base_risk + compliance_risk + registration_risk
    }
    
    calculate_context_risk := risk if {
        # 地理位置风险
        location_risk := 20 if input.location.suspicious else 0
        
        # 时间风险
        time_risk := 15 if not valid_time else 0
        
        # 网络风险
        network_risk := 10 if input.network.public else 0
        
        risk := location_risk + time_risk + network_risk
    }
  
  permissions.json: |
    {
      "admin": [
        {
          "resource": "*",
          "actions": ["read", "write", "delete", "execute"]
        }
      ],
      "developer": [
        {
          "resource": "dev-*",
          "actions": ["read", "write", "execute"]
        },
        {
          "resource": "test-*",
          "actions": ["read", "write"]
        }
      ],
      "user": [
        {
          "resource": "app-*",
          "actions": ["read"]
        }
      ]
    }
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: opa
  namespace: ${SDP_NAMESPACE}
  labels:
    app: opa
spec:
  replicas: 2
  selector:
    matchLabels:
      app: opa
  template:
    metadata:
      labels:
        app: opa
    spec:
      containers:
      - name: opa
        image: openpolicyagent/opa:latest-envoy
        args:
        - "run"
        - "--server"
        - "--config-file=/etc/opa/config.yaml"
        - "/etc/opa/policies"
        ports:
        - containerPort: 8181
          name: http
        volumeMounts:
        - name: policies
          mountPath: /etc/opa/policies
        - name: config
          mountPath: /etc/opa
        livenessProbe:
          httpGet:
            path: /health
            port: 8181
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health?bundle=true
            port: 8181
          initialDelaySeconds: 5
          periodSeconds: 5
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 500m
            memory: 512Mi
      volumes:
      - name: policies
        configMap:
          name: opa-policies
      - name: config
        configMap:
          name: opa-config
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: opa-config
  namespace: ${SDP_NAMESPACE}
data:
  config.yaml: |
    services:
      authz:
        url: http://localhost:8181
    
    bundles:
      authz:
        resource: "/etc/opa/policies"
    
    decision_logs:
      console: true
    
    status:
      console: true
---
apiVersion: v1
kind: Service
metadata:
  name: opa-service
  namespace: ${SDP_NAMESPACE}
  labels:
    app: opa
spec:
  type: ClusterIP
  ports:
  - port: 8181
    targetPort: 8181
    protocol: TCP
    name: http
  selector:
    app: opa
EOF
}

# 生成TLS证书
generate_certificates() {
    log "生成TLS证书..."
    
    # 创建CA私钥
    openssl genrsa -out ca.key 4096
    
    # 创建CA证书
    openssl req -new -x509 -key ca.key -sha256 -subj "/C=CN/ST=Beijing/L=Beijing/O=ZeroTrust/CN=ZeroTrust-CA" -days 3650 -out ca.crt
    
    # 为SDP控制器生成证书
    openssl genrsa -out controller.key 4096
    openssl req -new -key controller.key -out controller.csr -subj "/C=CN/ST=Beijing/L=Beijing/O=ZeroTrust/CN=sdp-controller-service"
    openssl x509 -req -in controller.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out controller.crt -days 365 -sha256
    
    # 为SDP网关生成证书
    openssl genrsa -out gateway.key 4096
    openssl req -new -key gateway.key -out gateway.csr -subj "/C=CN/ST=Beijing/L=Beijing/O=ZeroTrust/CN=sdp-gateway"
    openssl x509 -req -in gateway.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out gateway.crt -days 365 -sha256
    
    # 创建Kubernetes secrets
    kubectl create secret tls sdp-controller-tls \
        --cert=controller.crt \
        --key=controller.key \
        --namespace=${SDP_NAMESPACE}
    
    kubectl create secret tls sdp-gateway-tls \
        --cert=gateway.crt \
        --key=gateway.key \
        --namespace=${SDP_NAMESPACE}
    
    # 创建JWT密钥
    JWT_SECRET=$(openssl rand -base64 32)
    kubectl create secret generic jwt-secret \
        --from-literal=secret=${JWT_SECRET} \
        --namespace=${SDP_NAMESPACE}
    
    # 清理临时文件
    rm -f ca.key ca.crt ca.srl controller.key controller.csr controller.crt gateway.key gateway.csr gateway.crt
}

# 部署PostgreSQL数据库
deploy_postgresql() {
    log "部署PostgreSQL数据库..."
    
    cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Secret
metadata:
  name: postgres-secret
  namespace: ${SDP_NAMESPACE}
type: Opaque
data:
  username: $(echo -n "sdp_user" | base64)
  password: $(echo -n "$(openssl rand -base64 16)" | base64)
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: postgres-pvc
  namespace: ${SDP_NAMESPACE}
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: postgres
  namespace: ${SDP_NAMESPACE}
  labels:
    app: postgres
spec:
  replicas: 1
  selector:
    matchLabels:
      app: postgres
  template:
    metadata:
      labels:
        app: postgres
    spec:
      containers:
      - name: postgres
        image: postgres:13
        env:
        - name: POSTGRES_DB
          value: sdp
        - name: POSTGRES_USER
          valueFrom:
            secretKeyRef:
              name: postgres-secret
              key: username
        - name: POSTGRES_PASSWORD
          valueFrom:
            secretKeyRef:
              name: postgres-secret
              key: password
        ports:
        - containerPort: 5432
          name: postgres
        volumeMounts:
        - name: postgres-storage
          mountPath: /var/lib/postgresql/data
        resources:
          requests:
            cpu: 100m
            memory: 256Mi
          limits:
            cpu: 500m
            memory: 1Gi
      volumes:
      - name: postgres-storage
        persistentVolumeClaim:
          claimName: postgres-pvc
---
apiVersion: v1
kind: Service
metadata:
  name: postgres-service
  namespace: ${SDP_NAMESPACE}
  labels:
    app: postgres
spec:
  type: ClusterIP
  ports:
  - port: 5432
    targetPort: 5432
    protocol: TCP
    name: postgres
  selector:
    app: postgres
EOF
}

# 部署监控
deploy_monitoring() {
    log "部署监控组件..."
    
    cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ServiceMonitor
metadata:
  name: sdp-controller-monitor
  namespace: ${SDP_NAMESPACE}
  labels:
    app: sdp-controller
spec:
  selector:
    matchLabels:
      app: sdp-controller
  endpoints:
  - port: metrics
    interval: 30s
    path: /metrics
---
apiVersion: v1
kind: ServiceMonitor
metadata:
  name: sdp-gateway-monitor
  namespace: ${SDP_NAMESPACE}
  labels:
    app: sdp-gateway
spec:
  selector:
    matchLabels:
      app: sdp-gateway
  endpoints:
  - port: metrics
    interval: 30s
    path: /metrics
EOF
}

# 主函数
main() {
    log "开始部署软件定义边界(SDP)..."
    
    # 检查kubectl
    if ! command -v kubectl &> /dev/null; then
        log "错误: kubectl未安装"
        exit 1
    fi
    
    # 检查openssl
    if ! command -v openssl &> /dev/null; then
        log "错误: openssl未安装"
        exit 1
    fi
    
    # 执行部署步骤
    create_namespace
    generate_certificates
    deploy_postgresql
    deploy_opa_policies
    deploy_sdp_controller
    deploy_sdp_gateway
    deploy_network_policies
    deploy_monitoring
    
    log "SDP部署完成!"
    log "控制器服务: kubectl get svc sdp-controller-service -n ${SDP_NAMESPACE}"
    log "网关状态: kubectl get ds sdp-gateway -n ${SDP_NAMESPACE}"
    log "查看日志: kubectl logs -f deployment/sdp-controller -n ${SDP_NAMESPACE}"
}

# 清理函数
cleanup() {
    log "清理SDP部署..."
    kubectl delete namespace ${SDP_NAMESPACE} --ignore-not-found=true
    log "清理完成"
}

# 检查参数
case "${1:-deploy}" in
    deploy)
        main
        ;;
    cleanup)
        cleanup
        ;;
    *)
        echo "用法: $0 [deploy|cleanup]"
        exit 1
        ;;
esac

数据保护与加密

数据保护是零信任架构的核心要素,需要实现端到端的数据加密、数据分类和数据防泄漏。

数据加密管理系统

#!/usr/bin/env python3
"""
数据加密管理系统
实现端到端数据加密、密钥管理和数据分类
"""

import os
import json
import logging
import hashlib
import hmac
import base64
import time
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import secrets

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class DataClassification(Enum):
    """数据分类级别"""
    PUBLIC = "public"
    INTERNAL = "internal"
    CONFIDENTIAL = "confidential"
    RESTRICTED = "restricted"
    TOP_SECRET = "top_secret"

class EncryptionAlgorithm(Enum):
    """加密算法"""
    AES_256_GCM = "aes_256_gcm"
    CHACHA20_POLY1305 = "chacha20_poly1305"
    RSA_4096 = "rsa_4096"
    FERNET = "fernet"

@dataclass
class DataPolicy:
    """数据策略"""
    classification: DataClassification
    encryption_required: bool
    encryption_algorithm: EncryptionAlgorithm
    key_rotation_days: int
    access_roles: List[str]
    retention_days: int
    audit_required: bool

@dataclass
class EncryptionKey:
    """加密密钥"""
    key_id: str
    algorithm: EncryptionAlgorithm
    key_data: bytes
    created_at: float
    expires_at: Optional[float]
    classification: DataClassification
    usage_count: int
    max_usage: Optional[int]

@dataclass
class EncryptedData:
    """加密数据"""
    data_id: str
    encrypted_content: bytes
    key_id: str
    algorithm: EncryptionAlgorithm
    iv: Optional[bytes]
    tag: Optional[bytes]
    classification: DataClassification
    metadata: Dict[str, Any]
    created_at: float

class DataEncryptionManager:
    """数据加密管理器"""
    
    def __init__(self):
        self.keys: Dict[str, EncryptionKey] = {}
        self.data_store: Dict[str, EncryptedData] = {}
        self.policies: Dict[DataClassification, DataPolicy] = {}
        self.audit_log: List[Dict[str, Any]] = []
        
        # 初始化默认策略
        self._initialize_default_policies()
    
    def _initialize_default_policies(self):
        """初始化默认数据策略"""
        self.policies = {
            DataClassification.PUBLIC: DataPolicy(
                classification=DataClassification.PUBLIC,
                encryption_required=False,
                encryption_algorithm=EncryptionAlgorithm.FERNET,
                key_rotation_days=365,
                access_roles=["*"],
                retention_days=2555,  # 7年
                audit_required=False
            ),
            DataClassification.INTERNAL: DataPolicy(
                classification=DataClassification.INTERNAL,
                encryption_required=True,
                encryption_algorithm=EncryptionAlgorithm.AES_256_GCM,
                key_rotation_days=180,
                access_roles=["employee"],
                retention_days=1825,  # 5年
                audit_required=True
            ),
            DataClassification.CONFIDENTIAL: DataPolicy(
                classification=DataClassification.CONFIDENTIAL,
                encryption_required=True,
                encryption_algorithm=EncryptionAlgorithm.AES_256_GCM,
                key_rotation_days=90,
                access_roles=["manager", "admin"],
                retention_days=1095,  # 3年
                audit_required=True
            ),
            DataClassification.RESTRICTED: DataPolicy(
                classification=DataClassification.RESTRICTED,
                encryption_required=True,
                encryption_algorithm=EncryptionAlgorithm.CHACHA20_POLY1305,
                key_rotation_days=30,
                access_roles=["admin"],
                retention_days=365,  # 1年
                audit_required=True
            ),
            DataClassification.TOP_SECRET: DataPolicy(
                classification=DataClassification.TOP_SECRET,
                encryption_required=True,
                encryption_algorithm=EncryptionAlgorithm.CHACHA20_POLY1305,
                key_rotation_days=7,
                access_roles=["security_admin"],
                retention_days=90,
                audit_required=True
            )
        }
    
    def generate_key(self, classification: DataClassification, 
                    algorithm: EncryptionAlgorithm = None) -> str:
        """生成加密密钥"""
        try:
            policy = self.policies.get(classification)
            if not policy:
                raise ValueError(f"未找到分类 {classification} 的策略")
            
            if algorithm is None:
                algorithm = policy.encryption_algorithm
            
            key_id = self._generate_key_id()
            
            # 根据算法生成密钥
            if algorithm == EncryptionAlgorithm.AES_256_GCM:
                key_data = os.urandom(32)  # 256位密钥
            elif algorithm == EncryptionAlgorithm.CHACHA20_POLY1305:
                key_data = os.urandom(32)  # 256位密钥
            elif algorithm == EncryptionAlgorithm.RSA_4096:
                private_key = rsa.generate_private_key(
                    public_exponent=65537,
                    key_size=4096
                )
                key_data = private_key.private_bytes(
                    encoding=serialization.Encoding.PEM,
                    format=serialization.PrivateFormat.PKCS8,
                    encryption_algorithm=serialization.NoEncryption()
                )
            elif algorithm == EncryptionAlgorithm.FERNET:
                key_data = Fernet.generate_key()
            else:
                raise ValueError(f"不支持的加密算法: {algorithm}")
            
            # 计算过期时间
            expires_at = time.time() + (policy.key_rotation_days * 24 * 3600)
            
            # 创建密钥对象
            encryption_key = EncryptionKey(
                key_id=key_id,
                algorithm=algorithm,
                key_data=key_data,
                created_at=time.time(),
                expires_at=expires_at,
                classification=classification,
                usage_count=0,
                max_usage=None
            )
            
            self.keys[key_id] = encryption_key
            
            # 记录审计日志
            self._log_audit("key_generated", {
                "key_id": key_id,
                "algorithm": algorithm.value,
                "classification": classification.value
            })
            
            logger.info(f"生成密钥 {key_id},算法: {algorithm.value}")
            return key_id
            
        except Exception as e:
            logger.error(f"生成密钥失败: {e}")
            raise
    
    def encrypt_data(self, data: bytes, classification: DataClassification, 
                    key_id: str = None, metadata: Dict[str, Any] = None) -> str:
        """加密数据"""
        try:
            policy = self.policies.get(classification)
            if not policy:
                raise ValueError(f"未找到分类 {classification} 的策略")
            
            if not policy.encryption_required and classification != DataClassification.PUBLIC:
                logger.warning(f"分类 {classification} 不要求加密,但仍然执行加密")
            
            # 如果没有指定密钥,生成新密钥
            if key_id is None:
                key_id = self.generate_key(classification, policy.encryption_algorithm)
            
            encryption_key = self.keys.get(key_id)
            if not encryption_key:
                raise ValueError(f"密钥 {key_id} 不存在")
            
            # 检查密钥是否过期
            if encryption_key.expires_at and time.time() > encryption_key.expires_at:
                raise ValueError(f"密钥 {key_id} 已过期")
            
            # 检查密钥使用次数
            if encryption_key.max_usage and encryption_key.usage_count >= encryption_key.max_usage:
                raise ValueError(f"密钥 {key_id} 使用次数已达上限")
            
            data_id = self._generate_data_id()
            iv = None
            tag = None
            
            # 根据算法加密数据
            if encryption_key.algorithm == EncryptionAlgorithm.AES_256_GCM:
                iv = os.urandom(12)  # GCM推荐12字节IV
                cipher = Cipher(
                    algorithms.AES(encryption_key.key_data),
                    modes.GCM(iv)
                )
                encryptor = cipher.encryptor()
                encrypted_content = encryptor.update(data) + encryptor.finalize()
                tag = encryptor.tag
                
            elif encryption_key.algorithm == EncryptionAlgorithm.CHACHA20_POLY1305:
                iv = os.urandom(12)  # ChaCha20需要12字节nonce
                cipher = Cipher(
                    algorithms.ChaCha20(encryption_key.key_data, iv),
                    modes.GCM(b'\x00' * 16)  # ChaCha20-Poly1305
                )
                encryptor = cipher.encryptor()
                encrypted_content = encryptor.update(data) + encryptor.finalize()
                tag = encryptor.tag
                
            elif encryption_key.algorithm == EncryptionAlgorithm.FERNET:
                fernet = Fernet(encryption_key.key_data)
                encrypted_content = fernet.encrypt(data)
                
            elif encryption_key.algorithm == EncryptionAlgorithm.RSA_4096:
                private_key = serialization.load_pem_private_key(
                    encryption_key.key_data,
                    password=None
                )
                public_key = private_key.public_key()
                
                # RSA加密有长度限制,需要分块加密
                max_chunk_size = 446  # 4096位RSA密钥的最大明文长度
                encrypted_chunks = []
                
                for i in range(0, len(data), max_chunk_size):
                    chunk = data[i:i + max_chunk_size]
                    encrypted_chunk = public_key.encrypt(
                        chunk,
                        padding.OAEP(
                            mgf=padding.MGF1(algorithm=hashes.SHA256()),
                            algorithm=hashes.SHA256(),
                            label=None
                        )
                    )
                    encrypted_chunks.append(encrypted_chunk)
                
                encrypted_content = b''.join(encrypted_chunks)
            else:
                raise ValueError(f"不支持的加密算法: {encryption_key.algorithm}")
            
            # 创建加密数据对象
            encrypted_data = EncryptedData(
                data_id=data_id,
                encrypted_content=encrypted_content,
                key_id=key_id,
                algorithm=encryption_key.algorithm,
                iv=iv,
                tag=tag,
                classification=classification,
                metadata=metadata or {},
                created_at=time.time()
            )
            
            self.data_store[data_id] = encrypted_data
            
            # 更新密钥使用计数
            encryption_key.usage_count += 1
            
            # 记录审计日志
            if policy.audit_required:
                self._log_audit("data_encrypted", {
                    "data_id": data_id,
                    "key_id": key_id,
                    "classification": classification.value,
                    "data_size": len(data)
                })
            
            logger.info(f"数据加密完成,数据ID: {data_id}")
            return data_id
            
        except Exception as e:
            logger.error(f"数据加密失败: {e}")
            raise
    
    def decrypt_data(self, data_id: str, user_roles: List[str] = None) -> bytes:
        """解密数据"""
        try:
            encrypted_data = self.data_store.get(data_id)
            if not encrypted_data:
                raise ValueError(f"数据 {data_id} 不存在")
            
            # 检查访问权限
            policy = self.policies.get(encrypted_data.classification)
            if policy and user_roles:
                if not any(role in policy.access_roles or "*" in policy.access_roles 
                          for role in user_roles):
                    raise PermissionError(f"用户角色 {user_roles} 无权访问 {encrypted_data.classification.value} 级别数据")
            
            encryption_key = self.keys.get(encrypted_data.key_id)
            if not encryption_key:
                raise ValueError(f"密钥 {encrypted_data.key_id} 不存在")
            
            # 根据算法解密数据
            if encrypted_data.algorithm == EncryptionAlgorithm.AES_256_GCM:
                cipher = Cipher(
                    algorithms.AES(encryption_key.key_data),
                    modes.GCM(encrypted_data.iv, encrypted_data.tag)
                )
                decryptor = cipher.decryptor()
                decrypted_data = decryptor.update(encrypted_data.encrypted_content) + decryptor.finalize()
                
            elif encrypted_data.algorithm == EncryptionAlgorithm.CHACHA20_POLY1305:
                cipher = Cipher(
                    algorithms.ChaCha20(encryption_key.key_data, encrypted_data.iv),
                    modes.GCM(encrypted_data.tag)
                )
                decryptor = cipher.decryptor()
                decrypted_data = decryptor.update(encrypted_data.encrypted_content) + decryptor.finalize()
                
            elif encrypted_data.algorithm == EncryptionAlgorithm.FERNET:
                fernet = Fernet(encryption_key.key_data)
                decrypted_data = fernet.decrypt(encrypted_data.encrypted_content)
                
            elif encrypted_data.algorithm == EncryptionAlgorithm.RSA_4096:
                private_key = serialization.load_pem_private_key(
                    encryption_key.key_data,
                    password=None
                )
                
                # RSA解密需要分块处理
                chunk_size = 512  # 4096位RSA密钥的密文块大小
                decrypted_chunks = []
                
                for i in range(0, len(encrypted_data.encrypted_content), chunk_size):
                    chunk = encrypted_data.encrypted_content[i:i + chunk_size]
                    decrypted_chunk = private_key.decrypt(
                        chunk,
                        padding.OAEP(
                            mgf=padding.MGF1(algorithm=hashes.SHA256()),
                            algorithm=hashes.SHA256(),
                            label=None
                        )
                    )
                    decrypted_chunks.append(decrypted_chunk)
                
                decrypted_data = b''.join(decrypted_chunks)
            else:
                raise ValueError(f"不支持的解密算法: {encrypted_data.algorithm}")
            
            # 记录审计日志
            if policy and policy.audit_required:
                self._log_audit("data_decrypted", {
                    "data_id": data_id,
                    "key_id": encrypted_data.key_id,
                    "classification": encrypted_data.classification.value,
                    "user_roles": user_roles
                })
            
            logger.info(f"数据解密完成,数据ID: {data_id}")
            return decrypted_data
            
        except Exception as e:
            logger.error(f"数据解密失败: {e}")
            raise
    
    def rotate_key(self, old_key_id: str) -> str:
        """轮换密钥"""
        try:
            old_key = self.keys.get(old_key_id)
            if not old_key:
                raise ValueError(f"密钥 {old_key_id} 不存在")
            
            # 生成新密钥
            new_key_id = self.generate_key(old_key.classification, old_key.algorithm)
            
            # 查找使用旧密钥的数据
            affected_data = [
                data for data in self.data_store.values()
                if data.key_id == old_key_id
            ]
            
            # 重新加密数据
            for data in affected_data:
                # 解密数据
                decrypted_content = self.decrypt_data(data.data_id)
                
                # 删除旧数据
                del self.data_store[data.data_id]
                
                # 使用新密钥重新加密
                new_data_id = self.encrypt_data(
                    decrypted_content,
                    data.classification,
                    new_key_id,
                    data.metadata
                )
                
                logger.info(f"数据 {data.data_id} 重新加密为 {new_data_id}")
            
            # 标记旧密钥为已过期
            old_key.expires_at = time.time()
            
            # 记录审计日志
            self._log_audit("key_rotated", {
                "old_key_id": old_key_id,
                "new_key_id": new_key_id,
                "affected_data_count": len(affected_data)
            })
            
            logger.info(f"密钥轮换完成: {old_key_id} -> {new_key_id}")
            return new_key_id
            
        except Exception as e:
            logger.error(f"密钥轮换失败: {e}")
            raise
    
    def classify_data(self, data: bytes, content_type: str = None) -> DataClassification:
        """自动数据分类"""
        try:
            # 简化的数据分类逻辑,实际应用中可以使用机器学习模型
            data_str = data.decode('utf-8', errors='ignore').lower()
            
            # 检查敏感关键词
            top_secret_keywords = ['top secret', 'classified', 'confidential', 'secret key', 'password']
            restricted_keywords = ['restricted', 'internal only', 'private', 'ssn', 'credit card']
            confidential_keywords = ['confidential', 'proprietary', 'financial', 'personal']
            internal_keywords = ['internal', 'employee', 'staff', 'company']
            
            if any(keyword in data_str for keyword in top_secret_keywords):
                return DataClassification.TOP_SECRET
            elif any(keyword in data_str for keyword in restricted_keywords):
                return DataClassification.RESTRICTED
            elif any(keyword in data_str for keyword in confidential_keywords):
                return DataClassification.CONFIDENTIAL
            elif any(keyword in data_str for keyword in internal_keywords):
                return DataClassification.INTERNAL
            else:
                return DataClassification.PUBLIC
                
        except Exception as e:
            logger.warning(f"数据分类失败,使用默认分类: {e}")
            return DataClassification.INTERNAL
    
    def get_data_info(self, data_id: str) -> Dict[str, Any]:
        """获取数据信息"""
        encrypted_data = self.data_store.get(data_id)
        if not encrypted_data:
            return None
        
        return {
            "data_id": data_id,
            "classification": encrypted_data.classification.value,
            "algorithm": encrypted_data.algorithm.value,
            "key_id": encrypted_data.key_id,
            "created_at": encrypted_data.created_at,
            "metadata": encrypted_data.metadata,
            "size": len(encrypted_data.encrypted_content)
        }
    
    def get_key_info(self, key_id: str) -> Dict[str, Any]:
        """获取密钥信息"""
        key = self.keys.get(key_id)
        if not key:
            return None
        
        return {
            "key_id": key_id,
            "algorithm": key.algorithm.value,
            "classification": key.classification.value,
            "created_at": key.created_at,
            "expires_at": key.expires_at,
            "usage_count": key.usage_count,
            "max_usage": key.max_usage,
            "is_expired": key.expires_at and time.time() > key.expires_at
        }
    
    def cleanup_expired_keys(self) -> List[str]:
        """清理过期密钥"""
        current_time = time.time()
        expired_keys = []
        
        for key_id, key in list(self.keys.items()):
            if key.expires_at and current_time > key.expires_at:
                # 检查是否还有数据使用此密钥
                using_data = [
                    data for data in self.data_store.values()
                    if data.key_id == key_id
                ]
                
                if not using_data:
                    del self.keys[key_id]
                    expired_keys.append(key_id)
                    logger.info(f"清理过期密钥: {key_id}")
        
        return expired_keys
    
    def generate_audit_report(self) -> Dict[str, Any]:
        """生成审计报告"""
        current_time = time.time()
        
        # 统计数据
        total_keys = len(self.keys)
        expired_keys = sum(1 for key in self.keys.values() 
                          if key.expires_at and current_time > key.expires_at)
        
        total_data = len(self.data_store)
        data_by_classification = {}
        for data in self.data_store.values():
            classification = data.classification.value
            data_by_classification[classification] = data_by_classification.get(classification, 0) + 1
        
        # 最近的审计事件
        recent_events = [
            event for event in self.audit_log
            if current_time - event['timestamp'] <= 86400  # 最近24小时
        ]
        
        return {
            "report_time": current_time,
            "summary": {
                "total_keys": total_keys,
                "expired_keys": expired_keys,
                "total_encrypted_data": total_data,
                "data_by_classification": data_by_classification
            },
            "recent_events": len(recent_events),
            "audit_log_size": len(self.audit_log)
        }
    
    def _generate_key_id(self) -> str:
        """生成密钥ID"""
        return f"key_{int(time.time())}_{secrets.token_hex(8)}"
    
    def _generate_data_id(self) -> str:
        """生成数据ID"""
        return f"data_{int(time.time())}_{secrets.token_hex(8)}"
    
    def _log_audit(self, action: str, details: Dict[str, Any]):
        """记录审计日志"""
        audit_entry = {
            "timestamp": time.time(),
            "action": action,
            "details": details
        }
        self.audit_log.append(audit_entry)

def main():
    """主函数 - 演示数据加密管理系统"""
    dem = DataEncryptionManager()
    
    # 测试数据
    test_data = b"This is confidential financial data containing sensitive information."
    
    # 自动分类数据
    classification = dem.classify_data(test_data)
    print(f"数据分类: {classification.value}")
    
    # 加密数据
    data_id = dem.encrypt_data(test_data, classification, metadata={"source": "financial_system"})
    print(f"数据已加密,ID: {data_id}")
    
    # 获取数据信息
    data_info = dem.get_data_info(data_id)
    print(f"数据信息: {json.dumps(data_info, indent=2, default=str)}")
    
    # 解密数据
    decrypted_data = dem.decrypt_data(data_id, user_roles=["admin"])
    print(f"解密成功: {decrypted_data == test_data}")
    
    # 密钥轮换
    key_id = data_info["key_id"]
    new_key_id = dem.rotate_key(key_id)
    print(f"密钥轮换: {key_id} -> {new_key_id}")
    
    # 生成审计报告
    report = dem.generate_audit_report()
    print(f"审计报告: {json.dumps(report, indent=2, default=str)}")

if __name__ == "__main__":
    main()

身份与访问管理

身份与访问管理(IAM)是零信任架构的核心组件,负责验证用户身份、管理访问权限和监控用户行为。

多因子认证系统

#!/usr/bin/env python3
"""
多因子认证系统
实现基于TOTP、SMS、生物识别等多种认证方式
"""

import pyotp
import qrcode
import io
import base64
import hashlib
import hmac
import time
import random
import string
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import logging

logger = logging.getLogger(__name__)

class AuthFactorType(Enum):
    """认证因子类型"""
    PASSWORD = "password"
    TOTP = "totp"
    SMS = "sms"
    EMAIL = "email"
    BIOMETRIC = "biometric"
    HARDWARE_TOKEN = "hardware_token"
    PUSH_NOTIFICATION = "push_notification"

@dataclass
class AuthFactor:
    """认证因子"""
    factor_id: str
    factor_type: AuthFactorType
    user_id: str
    secret: str
    backup_codes: List[str]
    enabled: bool
    created_at: float
    last_used: Optional[float] = None

@dataclass
class AuthSession:
    """认证会话"""
    session_id: str
    user_id: str
    completed_factors: List[AuthFactorType]
    required_factors: List[AuthFactorType]
    created_at: float
    expires_at: float
    ip_address: str
    user_agent: str

class MultiFactorAuthenticator:
    """多因子认证器"""
    
    def __init__(self):
        self.factors: Dict[str, AuthFactor] = {}
        self.sessions: Dict[str, AuthSession] = {}
        self.user_factors: Dict[str, List[str]] = {}  # user_id -> factor_ids
        
    def register_totp_factor(self, user_id: str, issuer: str = "ZeroTrust") -> Tuple[str, str]:
        """注册TOTP认证因子"""
        try:
            # 生成密钥
            secret = pyotp.random_base32()
            
            # 创建认证因子
            factor_id = self._generate_factor_id()
            backup_codes = self._generate_backup_codes()
            
            factor = AuthFactor(
                factor_id=factor_id,
                factor_type=AuthFactorType.TOTP,
                user_id=user_id,
                secret=secret,
                backup_codes=backup_codes,
                enabled=True,
                created_at=time.time()
            )
            
            self.factors[factor_id] = factor
            
            # 添加到用户因子列表
            if user_id not in self.user_factors:
                self.user_factors[user_id] = []
            self.user_factors[user_id].append(factor_id)
            
            # 生成QR码
            totp = pyotp.TOTP(secret)
            provisioning_uri = totp.provisioning_uri(
                name=user_id,
                issuer_name=issuer
            )
            
            qr = qrcode.QRCode(version=1, box_size=10, border=5)
            qr.add_data(provisioning_uri)
            qr.make(fit=True)
            
            img = qr.make_image(fill_color="black", back_color="white")
            img_buffer = io.BytesIO()
            img.save(img_buffer, format='PNG')
            img_buffer.seek(0)
            
            qr_code_base64 = base64.b64encode(img_buffer.getvalue()).decode()
            
            logger.info(f"为用户 {user_id} 注册TOTP认证因子: {factor_id}")
            
            return factor_id, qr_code_base64
            
        except Exception as e:
            logger.error(f"注册TOTP认证因子失败: {e}")
            raise
    
    def register_sms_factor(self, user_id: str, phone_number: str) -> str:
        """注册SMS认证因子"""
        try:
            factor_id = self._generate_factor_id()
            
            # 这里使用手机号作为密钥(实际应用中需要加密存储)
            secret = hashlib.sha256(phone_number.encode()).hexdigest()
            backup_codes = self._generate_backup_codes()
            
            factor = AuthFactor(
                factor_id=factor_id,
                factor_type=AuthFactorType.SMS,
                user_id=user_id,
                secret=secret,
                backup_codes=backup_codes,
                enabled=True,
                created_at=time.time()
            )
            
            self.factors[factor_id] = factor
            
            if user_id not in self.user_factors:
                self.user_factors[user_id] = []
            self.user_factors[user_id].append(factor_id)
            
            logger.info(f"为用户 {user_id} 注册SMS认证因子: {factor_id}")
            return factor_id
            
        except Exception as e:
            logger.error(f"注册SMS认证因子失败: {e}")
            raise
    
    def start_auth_session(self, user_id: str, required_factors: List[AuthFactorType], 
                          ip_address: str, user_agent: str, session_timeout: int = 300) -> str:
        """开始认证会话"""
        try:
            session_id = self._generate_session_id()
            
            session = AuthSession(
                session_id=session_id,
                user_id=user_id,
                completed_factors=[],
                required_factors=required_factors,
                created_at=time.time(),
                expires_at=time.time() + session_timeout,
                ip_address=ip_address,
                user_agent=user_agent
            )
            
            self.sessions[session_id] = session
            
            logger.info(f"为用户 {user_id} 开始认证会话: {session_id}")
            return session_id
            
        except Exception as e:
            logger.error(f"开始认证会话失败: {e}")
            raise
    
    def verify_totp_code(self, session_id: str, code: str) -> bool:
        """验证TOTP代码"""
        try:
            session = self.sessions.get(session_id)
            if not session:
                logger.error("认证会话不存在")
                return False
            
            if time.time() > session.expires_at:
                logger.error("认证会话已过期")
                return False
            
            # 查找用户的TOTP因子
            user_factor_ids = self.user_factors.get(session.user_id, [])
            totp_factor = None
            
            for factor_id in user_factor_ids:
                factor = self.factors.get(factor_id)
                if factor and factor.factor_type == AuthFactorType.TOTP and factor.enabled:
                    totp_factor = factor
                    break
            
            if not totp_factor:
                logger.error("用户未注册TOTP认证因子")
                return False
            
            # 验证TOTP代码
            totp = pyotp.TOTP(totp_factor.secret)
            if totp.verify(code, valid_window=1):  # 允许前后30秒的时间窗口
                # 更新会话状态
                if AuthFactorType.TOTP not in session.completed_factors:
                    session.completed_factors.append(AuthFactorType.TOTP)
                
                # 更新因子使用时间
                totp_factor.last_used = time.time()
                
                logger.info(f"用户 {session.user_id} TOTP验证成功")
                return True
            
            # 检查备用代码
            if code in totp_factor.backup_codes:
                totp_factor.backup_codes.remove(code)  # 备用代码只能使用一次
                
                if AuthFactorType.TOTP not in session.completed_factors:
                    session.completed_factors.append(AuthFactorType.TOTP)
                
                totp_factor.last_used = time.time()
                
                logger.info(f"用户 {session.user_id} 备用代码验证成功")
                return True
            
            logger.error("TOTP代码验证失败")
            return False
            
        except Exception as e:
            logger.error(f"验证TOTP代码失败: {e}")
            return False
    
    def send_sms_code(self, session_id: str) -> bool:
        """发送SMS验证码"""
        try:
            session = self.sessions.get(session_id)
            if not session:
                logger.error("认证会话不存在")
                return False
            
            if time.time() > session.expires_at:
                logger.error("认证会话已过期")
                return False
            
            # 生成6位数字验证码
            sms_code = ''.join(random.choices(string.digits, k=6))
            
            # 存储验证码(实际应用中应该有过期时间和使用次数限制)
            session_key = f"sms_code_{session_id}"
            self._store_temp_code(session_key, sms_code, 300)  # 5分钟过期
            
            # 这里应该调用SMS服务发送验证码
            # 为了演示,我们只是记录日志
            logger.info(f"向用户 {session.user_id} 发送SMS验证码: {sms_code}")
            
            return True
            
        except Exception as e:
            logger.error(f"发送SMS验证码失败: {e}")
            return False
    
    def verify_sms_code(self, session_id: str, code: str) -> bool:
        """验证SMS代码"""
        try:
            session = self.sessions.get(session_id)
            if not session:
                logger.error("认证会话不存在")
                return False
            
            if time.time() > session.expires_at:
                logger.error("认证会话已过期")
                return False
            
            # 获取存储的验证码
            session_key = f"sms_code_{session_id}"
            stored_code = self._get_temp_code(session_key)
            
            if not stored_code:
                logger.error("SMS验证码不存在或已过期")
                return False
            
            if code == stored_code:
                # 验证成功,删除验证码
                self._delete_temp_code(session_key)
                
                # 更新会话状态
                if AuthFactorType.SMS not in session.completed_factors:
                    session.completed_factors.append(AuthFactorType.SMS)
                
                logger.info(f"用户 {session.user_id} SMS验证成功")
                return True
            
            logger.error("SMS验证码错误")
            return False
            
        except Exception as e:
            logger.error(f"验证SMS代码失败: {e}")
            return False
    
    def is_auth_complete(self, session_id: str) -> bool:
        """检查认证是否完成"""
        session = self.sessions.get(session_id)
        if not session:
            return False
        
        if time.time() > session.expires_at:
            return False
        
        # 检查是否完成了所有必需的认证因子
        for required_factor in session.required_factors:
            if required_factor not in session.completed_factors:
                return False
        
        return True
    
    def get_auth_status(self, session_id: str) -> Dict[str, any]:
        """获取认证状态"""
        session = self.sessions.get(session_id)
        if not session:
            return {'error': '认证会话不存在'}
        
        if time.time() > session.expires_at:
            return {'error': '认证会话已过期'}
        
        remaining_factors = [
            factor for factor in session.required_factors 
            if factor not in session.completed_factors
        ]
        
        return {
            'session_id': session_id,
            'user_id': session.user_id,
            'completed_factors': [f.value for f in session.completed_factors],
            'remaining_factors': [f.value for f in remaining_factors],
            'is_complete': len(remaining_factors) == 0,
            'expires_at': session.expires_at
        }
    
    def _generate_factor_id(self) -> str:
        """生成认证因子ID"""
        return f"factor_{int(time.time())}_{random.randint(1000, 9999)}"
    
    def _generate_session_id(self) -> str:
        """生成会话ID"""
        return f"session_{int(time.time())}_{random.randint(10000, 99999)}"
    
    def _generate_backup_codes(self, count: int = 10) -> List[str]:
        """生成备用代码"""
        codes = []
        for _ in range(count):
            code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
            codes.append(code)
        return codes
    
    def _store_temp_code(self, key: str, code: str, ttl: int):
        """存储临时验证码(简化实现,实际应使用Redis等)"""
        # 这里使用简单的内存存储,实际应用中应该使用Redis等持久化存储
        if not hasattr(self, '_temp_codes'):
            self._temp_codes = {}
        
        self._temp_codes[key] = {
            'code': code,
            'expires_at': time.time() + ttl
        }
    
    def _get_temp_code(self, key: str) -> Optional[str]:
        """获取临时验证码"""
        if not hasattr(self, '_temp_codes'):
            return None
        
        data = self._temp_codes.get(key)
        if not data:
            return None
        
        if time.time() > data['expires_at']:
            del self._temp_codes[key]
            return None
        
        return data['code']
    
    def _delete_temp_code(self, key: str):
        """删除临时验证码"""
        if hasattr(self, '_temp_codes') and key in self._temp_codes:
            del self._temp_codes[key]

def main():
    """主函数 - 演示多因子认证系统"""
    mfa = MultiFactorAuthenticator()
    
    user_id = "alice@company.com"
    
    # 注册TOTP认证因子
    print("注册TOTP认证因子...")
    factor_id, qr_code = mfa.register_totp_factor(user_id)
    print(f"TOTP因子ID: {factor_id}")
    print(f"QR码已生成 (Base64长度: {len(qr_code)})")
    
    # 注册SMS认证因子
    print("\n注册SMS认证因子...")
    sms_factor_id = mfa.register_sms_factor(user_id, "+86138****1234")
    print(f"SMS因子ID: {sms_factor_id}")
    
    # 开始认证会话
    print("\n开始认证会话...")
    session_id = mfa.start_auth_session(
        user_id=user_id,
        required_factors=[AuthFactorType.TOTP, AuthFactorType.SMS],
        ip_address="192.168.1.100",
        user_agent="Mozilla/5.0"
    )
    print(f"会话ID: {session_id}")
    
    # 检查认证状态
    status = mfa.get_auth_status(session_id)
    print(f"\n认证状态: {status}")
    
    # 模拟TOTP验证(实际应用中用户会从认证器应用获取代码)
    print("\n模拟TOTP验证...")
    factor = mfa.factors[factor_id]
    totp = pyotp.TOTP(factor.secret)
    current_code = totp.now()
    print(f"当前TOTP代码: {current_code}")
    
    totp_result = mfa.verify_totp_code(session_id, current_code)
    print(f"TOTP验证结果: {totp_result}")
    
    # 发送SMS验证码
    print("\n发送SMS验证码...")
    sms_sent = mfa.send_sms_code(session_id)
    print(f"SMS发送结果: {sms_sent}")
    
    # 模拟SMS验证(实际应用中用户会从短信获取代码)
    print("\n模拟SMS验证...")
    # 这里我们直接从临时存储中获取验证码进行演示
    temp_key = f"sms_code_{session_id}"
    sms_code = mfa._get_temp_code(temp_key)
    print(f"SMS验证码: {sms_code}")
    
    sms_result = mfa.verify_sms_code(session_id, sms_code)
    print(f"SMS验证结果: {sms_result}")
    
    # 检查最终认证状态
    final_status = mfa.get_auth_status(session_id)
    print(f"\n最终认证状态: {final_status}")
    
    is_complete = mfa.is_auth_complete(session_id)
    print(f"认证完成: {is_complete}")

if __name__ == "__main__":
    main()

分享文章