云安全架构设计:零信任模型的实施
引言
在传统的网络安全模型中,企业通常采用"城堡和护城河"的防护策略,即在网络边界建立强大的防护措施,而对内部网络相对信任。然而,随着云计算、移动办公、物联网等技术的快速发展,传统的边界防护模式已经无法满足现代企业的安全需求。零信任安全模型应运而生,它基于"永不信任,始终验证"的核心理念,为云环境提供了更加安全、灵活的防护方案。
目录
零信任安全模型概述
零信任安全模型是一种网络安全范式,它假设网络内外都存在威胁,因此不会自动信任任何用户、设备或应用程序。该模型要求对每个访问请求进行严格的身份验证和授权,无论请求来自网络内部还是外部。
零信任架构图
graph TB
subgraph "零信任云安全架构"
subgraph "用户层"
U1[内部用户]
U2[外部用户]
U3[合作伙伴]
U4[移动设备]
end
subgraph "身份与访问管理层"
IAM[身份认证中心]
MFA[多因子认证]
PAM[特权访问管理]
RBAC[基于角色的访问控制]
end
subgraph "策略引擎层"
PE[策略引擎]
RA[风险评估]
AD[异常检测]
ML[机器学习分析]
end
subgraph "网络安全层"
SDP[软件定义边界]
MSG[微分段网关]
FW[下一代防火墙]
IPS[入侵防护系统]
end
subgraph "数据保护层"
DLP[数据防泄漏]
ENC[端到端加密]
KMS[密钥管理服务]
CASB[云访问安全代理]
end
subgraph "应用与服务层"
API[API网关]
MS[微服务]
DB[(数据库)]
FS[文件存储]
end
subgraph "监控与分析层"
SIEM[安全信息与事件管理]
SOAR[安全编排与自动响应]
UBA[用户行为分析]
TI[威胁情报]
end
end
U1 --> IAM
U2 --> IAM
U3 --> IAM
U4 --> IAM
IAM --> PE
MFA --> PE
PAM --> PE
RBAC --> PE
PE --> SDP
RA --> MSG
AD --> FW
ML --> IPS
SDP --> API
MSG --> MS
FW --> DB
IPS --> FS
DLP --> SIEM
ENC --> SOAR
KMS --> UBA
CASB --> TI
style IAM fill:#e1f5fe
style PE fill:#f3e5f5
style SDP fill:#e8f5e8
style DLP fill:#fff3e0
style SIEM fill:#fce4ec
零信任核心原则
- 永不信任,始终验证 - 对所有访问请求进行验证
- 最小权限原则 - 仅授予完成任务所需的最小权限
- 假设违规 - 假设网络已被入侵,设计相应的防护措施
- 显式验证 - 基于所有可用数据点进行访问决策
- 持续监控 - 实时监控和分析所有活动
零信任架构核心组件
零信任架构分析器
#!/usr/bin/env python3
"""
零信任架构分析器
评估和设计零信任安全架构
"""
import json
import logging
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import hashlib
import time
from datetime import datetime, timedelta
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TrustLevel(Enum):
"""信任级别枚举"""
UNTRUSTED = 0
LOW = 1
MEDIUM = 2
HIGH = 3
VERIFIED = 4
class RiskLevel(Enum):
"""风险级别枚举"""
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
@dataclass
class User:
"""用户实体"""
user_id: str
username: str
email: str
roles: List[str]
department: str
trust_score: float
last_login: datetime
failed_attempts: int
mfa_enabled: bool
device_registered: bool
@dataclass
class Device:
"""设备实体"""
device_id: str
device_type: str
os_version: str
security_patch_level: str
compliance_status: bool
trust_score: float
last_seen: datetime
location: str
owner: str
@dataclass
class AccessRequest:
"""访问请求"""
request_id: str
user_id: str
device_id: str
resource: str
action: str
timestamp: datetime
source_ip: str
location: str
user_agent: str
@dataclass
class PolicyRule:
"""策略规则"""
rule_id: str
name: str
description: str
conditions: Dict[str, Any]
actions: List[str]
priority: int
enabled: bool
class ZeroTrustArchitectureAnalyzer:
"""零信任架构分析器"""
def __init__(self):
self.users: Dict[str, User] = {}
self.devices: Dict[str, Device] = {}
self.policies: Dict[str, PolicyRule] = {}
self.access_logs: List[AccessRequest] = []
self.risk_scores: Dict[str, float] = {}
def register_user(self, user: User) -> bool:
"""注册用户"""
try:
# 验证用户信息
if not user.user_id or not user.email:
logger.error("用户ID和邮箱不能为空")
return False
# 计算初始信任分数
trust_score = self._calculate_user_trust_score(user)
user.trust_score = trust_score
self.users[user.user_id] = user
logger.info(f"用户 {user.username} 注册成功,信任分数: {trust_score}")
return True
except Exception as e:
logger.error(f"用户注册失败: {e}")
return False
def register_device(self, device: Device) -> bool:
"""注册设备"""
try:
# 验证设备信息
if not device.device_id or not device.device_type:
logger.error("设备ID和类型不能为空")
return False
# 计算设备信任分数
trust_score = self._calculate_device_trust_score(device)
device.trust_score = trust_score
self.devices[device.device_id] = device
logger.info(f"设备 {device.device_id} 注册成功,信任分数: {trust_score}")
return True
except Exception as e:
logger.error(f"设备注册失败: {e}")
return False
def create_policy(self, policy: PolicyRule) -> bool:
"""创建策略规则"""
try:
if not policy.rule_id or not policy.name:
logger.error("策略ID和名称不能为空")
return False
self.policies[policy.rule_id] = policy
logger.info(f"策略 {policy.name} 创建成功")
return True
except Exception as e:
logger.error(f"策略创建失败: {e}")
return False
def evaluate_access_request(self, request: AccessRequest) -> Dict[str, Any]:
"""评估访问请求"""
try:
# 获取用户和设备信息
user = self.users.get(request.user_id)
device = self.devices.get(request.device_id)
if not user:
return {
'allowed': False,
'reason': '用户未注册',
'risk_level': RiskLevel.CRITICAL.name,
'trust_score': 0
}
if not device:
return {
'allowed': False,
'reason': '设备未注册',
'risk_level': RiskLevel.HIGH.name,
'trust_score': 0
}
# 计算综合风险分数
risk_score = self._calculate_risk_score(request, user, device)
# 应用策略规则
policy_result = self._apply_policies(request, user, device, risk_score)
# 记录访问请求
self.access_logs.append(request)
# 更新用户和设备信任分数
self._update_trust_scores(request, user, device, policy_result['allowed'])
return {
'allowed': policy_result['allowed'],
'reason': policy_result['reason'],
'risk_level': self._get_risk_level(risk_score).name,
'trust_score': (user.trust_score + device.trust_score) / 2,
'required_actions': policy_result.get('required_actions', []),
'session_timeout': policy_result.get('session_timeout', 3600)
}
except Exception as e:
logger.error(f"访问请求评估失败: {e}")
return {
'allowed': False,
'reason': f'评估失败: {e}',
'risk_level': RiskLevel.CRITICAL.name,
'trust_score': 0
}
def _calculate_user_trust_score(self, user: User) -> float:
"""计算用户信任分数"""
score = 50.0 # 基础分数
# MFA启用加分
if user.mfa_enabled:
score += 20
# 设备注册加分
if user.device_registered:
score += 15
# 角色权重
if 'admin' in user.roles:
score += 10
elif 'user' in user.roles:
score += 5
# 失败尝试扣分
score -= user.failed_attempts * 5
# 最近登录时间
if user.last_login:
days_since_login = (datetime.now() - user.last_login).days
if days_since_login > 30:
score -= 10
elif days_since_login > 7:
score -= 5
return max(0, min(100, score))
def _calculate_device_trust_score(self, device: Device) -> float:
"""计算设备信任分数"""
score = 50.0 # 基础分数
# 合规状态
if device.compliance_status:
score += 25
else:
score -= 20
# 设备类型
if device.device_type in ['laptop', 'desktop']:
score += 10
elif device.device_type in ['mobile', 'tablet']:
score += 5
# 安全补丁级别
if device.security_patch_level == 'latest':
score += 15
elif device.security_patch_level == 'recent':
score += 10
elif device.security_patch_level == 'outdated':
score -= 15
# 最近活动
if device.last_seen:
hours_since_seen = (datetime.now() - device.last_seen).total_seconds() / 3600
if hours_since_seen > 168: # 一周
score -= 10
elif hours_since_seen > 24: # 一天
score -= 5
return max(0, min(100, score))
def _calculate_risk_score(self, request: AccessRequest, user: User, device: Device) -> float:
"""计算访问请求风险分数"""
risk_score = 0.0
# 用户风险因素
if user.failed_attempts > 3:
risk_score += 30
if not user.mfa_enabled:
risk_score += 25
# 设备风险因素
if not device.compliance_status:
risk_score += 35
if device.security_patch_level == 'outdated':
risk_score += 20
# 访问模式风险
risk_score += self._analyze_access_pattern(request, user)
# 地理位置风险
risk_score += self._analyze_location_risk(request, user, device)
# 时间风险
risk_score += self._analyze_time_risk(request)
return min(100, risk_score)
def _analyze_access_pattern(self, request: AccessRequest, user: User) -> float:
"""分析访问模式风险"""
risk = 0.0
# 获取用户历史访问记录
user_logs = [log for log in self.access_logs if log.user_id == user.user_id]
if len(user_logs) < 5:
# 新用户或访问记录少
risk += 10
else:
# 分析访问频率
recent_logs = [log for log in user_logs
if (datetime.now() - log.timestamp).days <= 7]
if len(recent_logs) > 50:
# 异常高频访问
risk += 15
# 分析访问资源类型
accessed_resources = set(log.resource for log in recent_logs)
if request.resource not in accessed_resources:
# 访问新资源
risk += 5
return risk
def _analyze_location_risk(self, request: AccessRequest, user: User, device: Device) -> float:
"""分析地理位置风险"""
risk = 0.0
# 简化的地理位置风险分析
# 实际实现中应该使用更精确的地理位置服务
# 检查IP地址变化
recent_logs = [log for log in self.access_logs
if log.user_id == user.user_id and
(datetime.now() - log.timestamp).hours <= 24]
if recent_logs:
recent_ips = set(log.source_ip for log in recent_logs)
if request.source_ip not in recent_ips:
risk += 10
# 检查设备位置变化
if device.location != request.location:
risk += 15
return risk
def _analyze_time_risk(self, request: AccessRequest) -> float:
"""分析时间风险"""
risk = 0.0
# 检查访问时间
hour = request.timestamp.hour
# 非工作时间访问
if hour < 6 or hour > 22:
risk += 10
# 周末访问
if request.timestamp.weekday() >= 5:
risk += 5
return risk
def _apply_policies(self, request: AccessRequest, user: User, device: Device, risk_score: float) -> Dict[str, Any]:
"""应用策略规则"""
result = {
'allowed': False,
'reason': '默认拒绝',
'required_actions': [],
'session_timeout': 3600
}
# 按优先级排序策略
sorted_policies = sorted(self.policies.values(), key=lambda p: p.priority, reverse=True)
for policy in sorted_policies:
if not policy.enabled:
continue
if self._match_policy_conditions(policy, request, user, device, risk_score):
# 执行策略动作
for action in policy.actions:
if action == 'allow':
result['allowed'] = True
result['reason'] = f'策略 {policy.name} 允许访问'
elif action == 'deny':
result['allowed'] = False
result['reason'] = f'策略 {policy.name} 拒绝访问'
return result # 拒绝策略立即返回
elif action == 'require_mfa':
result['required_actions'].append('mfa')
elif action == 'require_device_verification':
result['required_actions'].append('device_verification')
elif action.startswith('session_timeout:'):
timeout = int(action.split(':')[1])
result['session_timeout'] = timeout
# 如果匹配到策略,停止处理后续策略
break
return result
def _match_policy_conditions(self, policy: PolicyRule, request: AccessRequest,
user: User, device: Device, risk_score: float) -> bool:
"""匹配策略条件"""
conditions = policy.conditions
# 检查用户条件
if 'user_roles' in conditions:
required_roles = conditions['user_roles']
if not any(role in user.roles for role in required_roles):
return False
if 'user_trust_score_min' in conditions:
if user.trust_score < conditions['user_trust_score_min']:
return False
# 检查设备条件
if 'device_compliance' in conditions:
if device.compliance_status != conditions['device_compliance']:
return False
if 'device_trust_score_min' in conditions:
if device.trust_score < conditions['device_trust_score_min']:
return False
# 检查风险条件
if 'max_risk_score' in conditions:
if risk_score > conditions['max_risk_score']:
return False
# 检查资源条件
if 'resources' in conditions:
allowed_resources = conditions['resources']
if request.resource not in allowed_resources:
return False
# 检查时间条件
if 'allowed_hours' in conditions:
allowed_hours = conditions['allowed_hours']
current_hour = request.timestamp.hour
if current_hour not in allowed_hours:
return False
return True
def _get_risk_level(self, risk_score: float) -> RiskLevel:
"""获取风险级别"""
if risk_score >= 75:
return RiskLevel.CRITICAL
elif risk_score >= 50:
return RiskLevel.HIGH
elif risk_score >= 25:
return RiskLevel.MEDIUM
else:
return RiskLevel.LOW
def _update_trust_scores(self, request: AccessRequest, user: User, device: Device, allowed: bool):
"""更新信任分数"""
if allowed:
# 成功访问,轻微提升信任分数
user.trust_score = min(100, user.trust_score + 1)
device.trust_score = min(100, device.trust_score + 1)
user.failed_attempts = 0
else:
# 访问被拒绝,降低信任分数
user.trust_score = max(0, user.trust_score - 5)
device.trust_score = max(0, device.trust_score - 3)
user.failed_attempts += 1
def generate_security_report(self) -> Dict[str, Any]:
"""生成安全报告"""
total_requests = len(self.access_logs)
if total_requests == 0:
return {'message': '暂无访问记录'}
# 统计访问成功率(这里简化处理,实际需要记录访问结果)
recent_logs = [log for log in self.access_logs
if (datetime.now() - log.timestamp).days <= 7]
# 用户风险分析
high_risk_users = [user for user in self.users.values()
if user.trust_score < 30]
# 设备风险分析
non_compliant_devices = [device for device in self.devices.values()
if not device.compliance_status]
# 访问模式分析
resource_access_count = {}
for log in recent_logs:
resource_access_count[log.resource] = resource_access_count.get(log.resource, 0) + 1
most_accessed_resources = sorted(resource_access_count.items(),
key=lambda x: x[1], reverse=True)[:10]
return {
'report_time': datetime.now().isoformat(),
'total_users': len(self.users),
'total_devices': len(self.devices),
'total_policies': len(self.policies),
'recent_access_requests': len(recent_logs),
'high_risk_users': len(high_risk_users),
'non_compliant_devices': len(non_compliant_devices),
'most_accessed_resources': most_accessed_resources,
'average_user_trust_score': sum(u.trust_score for u in self.users.values()) / len(self.users) if self.users else 0,
'average_device_trust_score': sum(d.trust_score for d in self.devices.values()) / len(self.devices) if self.devices else 0
}
def main():
"""主函数 - 演示零信任架构分析器"""
analyzer = ZeroTrustArchitectureAnalyzer()
# 注册用户
user1 = User(
user_id="user001",
username="alice",
email="alice@company.com",
roles=["user", "developer"],
department="engineering",
trust_score=0,
last_login=datetime.now() - timedelta(days=1),
failed_attempts=0,
mfa_enabled=True,
device_registered=True
)
analyzer.register_user(user1)
# 注册设备
device1 = Device(
device_id="device001",
device_type="laptop",
os_version="Windows 11",
security_patch_level="latest",
compliance_status=True,
trust_score=0,
last_seen=datetime.now(),
location="Beijing",
owner="alice"
)
analyzer.register_device(device1)
# 创建策略
policy1 = PolicyRule(
rule_id="policy001",
name="开发者访问策略",
description="允许开发者访问开发环境",
conditions={
"user_roles": ["developer"],
"device_compliance": True,
"max_risk_score": 30,
"resources": ["dev-server", "test-db"],
"allowed_hours": list(range(8, 18)) # 8:00-18:00
},
actions=["allow", "session_timeout:7200"],
priority=10,
enabled=True
)
analyzer.create_policy(policy1)
# 模拟访问请求
request1 = AccessRequest(
request_id="req001",
user_id="user001",
device_id="device001",
resource="dev-server",
action="read",
timestamp=datetime.now(),
source_ip="192.168.1.100",
location="Beijing",
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
)
# 评估访问请求
result = analyzer.evaluate_access_request(request1)
print("访问请求评估结果:")
print(json.dumps(result, indent=2, ensure_ascii=False))
# 生成安全报告
report = analyzer.generate_security_report()
print("\n安全报告:")
print(json.dumps(report, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
最佳实践与建议
架构设计原则
-
最小权限原则
- 默认拒绝所有访问
- 基于需要授予最小权限
- 定期审查和调整权限
-
持续验证
- 每次访问都进行身份验证
- 实时监控用户和设备行为
- 动态调整信任级别
-
深度防御
- 多层安全控制
- 冗余安全机制
- 故障安全设计
实施策略
-
分阶段实施
- 从核心资产开始
- 逐步扩展到所有系统
- 持续优化和改进
-
用户体验平衡
- 透明的安全控制
- 简化认证流程
- 智能风险评估
-
合规性考虑
- 满足行业标准
- 数据保护法规
- 审计要求
技术选型建议
-
身份提供商
- Azure AD / AWS IAM
- Okta / Auth0
- 自建身份系统
-
网络安全
- Zscaler / Palo Alto Prisma
- Cloudflare Access
- 自建SDP解决方案
-
数据保护
- HashiCorp Vault
- AWS KMS / Azure Key Vault
- 自建密钥管理系统
总结
零信任云安全架构代表了现代网络安全的发展方向,通过"永不信任,始终验证"的核心理念,为云环境提供了更加安全、灵活和可扩展的安全保护。
核心价值
-
安全性提升
- 减少攻击面
- 限制横向移动
- 提高威胁检测能力
-
灵活性增强
- 支持远程办公
- 适应云原生架构
- 简化网络管理
-
合规性保障
- 满足监管要求
- 提供审计追踪
- 保护敏感数据
实施要点
-
组织准备
- 高层支持和承诺
- 跨部门协作
- 充分的资源投入
-
技术准备
- 现有系统评估
- 技术栈选择
- 人员技能培训
-
持续改进
- 定期安全评估
- 威胁情报更新
- 技术演进跟进
零信任架构的成功实施需要技术、流程和人员的协调配合,是一个持续演进的过程。通过合理的规划、分阶段的实施和持续的优化,组织可以构建起真正安全、可靠的云安全防护体系。
## 网络微分段
网络微分段是零信任架构的重要组成部分,通过将网络划分为更小的安全区域,限制横向移动,减少攻击面。
### 软件定义边界(SDP)实现
```bash
#!/bin/bash
"""
软件定义边界(SDP)部署脚本
实现基于身份的网络访问控制
"""
set -euo pipefail
# 配置变量
SDP_NAMESPACE="zero-trust-sdp"
CONTROLLER_IMAGE="sdp-controller:latest"
GATEWAY_IMAGE="sdp-gateway:latest"
CLIENT_IMAGE="sdp-client:latest"
# 日志函数
log() {
echo "[$(date +'%Y-%m-%d %H:%M:%S')] $1"
}
# 创建命名空间
create_namespace() {
log "创建SDP命名空间..."
kubectl create namespace ${SDP_NAMESPACE} --dry-run=client -o yaml | kubectl apply -f -
# 添加标签
kubectl label namespace ${SDP_NAMESPACE} \
zero-trust=enabled \
network-policy=strict \
--overwrite
}
# 部署SDP控制器
deploy_sdp_controller() {
log "部署SDP控制器..."
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
name: sdp-controller-config
namespace: ${SDP_NAMESPACE}
data:
config.yaml: |
controller:
listen_port: 8443
tls:
cert_file: /etc/certs/tls.crt
key_file: /etc/certs/tls.key
database:
type: postgresql
host: postgres-service
port: 5432
database: sdp
username: sdp_user
password_secret: postgres-secret
authentication:
jwt_secret: jwt-secret
token_expiry: 3600
authorization:
policy_engine: opa
policy_url: http://opa-service:8181/v1/data/sdp/allow
logging:
level: info
format: json
metrics:
enabled: true
port: 9090
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: sdp-controller
namespace: ${SDP_NAMESPACE}
labels:
app: sdp-controller
component: control-plane
spec:
replicas: 2
selector:
matchLabels:
app: sdp-controller
template:
metadata:
labels:
app: sdp-controller
component: control-plane
spec:
serviceAccountName: sdp-controller
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 2000
containers:
- name: controller
image: ${CONTROLLER_IMAGE}
imagePullPolicy: Always
ports:
- containerPort: 8443
name: https
- containerPort: 9090
name: metrics
env:
- name: CONFIG_FILE
value: /etc/config/config.yaml
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
volumeMounts:
- name: config
mountPath: /etc/config
- name: certs
mountPath: /etc/certs
readOnly: true
- name: jwt-secret
mountPath: /etc/secrets/jwt
readOnly: true
livenessProbe:
httpGet:
path: /health
port: 8443
scheme: HTTPS
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8443
scheme: HTTPS
initialDelaySeconds: 5
periodSeconds: 5
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 512Mi
volumes:
- name: config
configMap:
name: sdp-controller-config
- name: certs
secret:
secretName: sdp-controller-tls
- name: jwt-secret
secret:
secretName: jwt-secret
---
apiVersion: v1
kind: Service
metadata:
name: sdp-controller-service
namespace: ${SDP_NAMESPACE}
labels:
app: sdp-controller
spec:
type: ClusterIP
ports:
- port: 8443
targetPort: 8443
protocol: TCP
name: https
- port: 9090
targetPort: 9090
protocol: TCP
name: metrics
selector:
app: sdp-controller
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: sdp-controller
namespace: ${SDP_NAMESPACE}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: sdp-controller
rules:
- apiGroups: [""]
resources: ["pods", "services", "endpoints"]
verbs: ["get", "list", "watch"]
- apiGroups: ["networking.k8s.io"]
resources: ["networkpolicies"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["extensions", "apps"]
resources: ["deployments", "replicasets"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: sdp-controller
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: sdp-controller
subjects:
- kind: ServiceAccount
name: sdp-controller
namespace: ${SDP_NAMESPACE}
EOF
}
# 部署SDP网关
deploy_sdp_gateway() {
log "部署SDP网关..."
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
name: sdp-gateway-config
namespace: ${SDP_NAMESPACE}
data:
config.yaml: |
gateway:
listen_port: 8080
controller_url: https://sdp-controller-service:8443
tls:
cert_file: /etc/certs/tls.crt
key_file: /etc/certs/tls.key
spa:
enabled: true
timeout: 30
max_attempts: 3
tunneling:
protocol: wireguard
port_range: "51820-51920"
encryption: chacha20poly1305
logging:
level: info
format: json
metrics:
enabled: true
port: 9091
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: sdp-gateway
namespace: ${SDP_NAMESPACE}
labels:
app: sdp-gateway
component: data-plane
spec:
selector:
matchLabels:
app: sdp-gateway
template:
metadata:
labels:
app: sdp-gateway
component: data-plane
spec:
serviceAccountName: sdp-gateway
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
securityContext:
runAsNonRoot: false # 需要root权限操作网络
containers:
- name: gateway
image: ${GATEWAY_IMAGE}
imagePullPolicy: Always
securityContext:
privileged: true
capabilities:
add:
- NET_ADMIN
- NET_RAW
ports:
- containerPort: 8080
name: http
- containerPort: 9091
name: metrics
env:
- name: CONFIG_FILE
value: /etc/config/config.yaml
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
volumeMounts:
- name: config
mountPath: /etc/config
- name: certs
mountPath: /etc/certs
readOnly: true
- name: lib-modules
mountPath: /lib/modules
readOnly: true
- name: dev-net-tun
mountPath: /dev/net/tun
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 1000m
memory: 1Gi
volumes:
- name: config
configMap:
name: sdp-gateway-config
- name: certs
secret:
secretName: sdp-gateway-tls
- name: lib-modules
hostPath:
path: /lib/modules
- name: dev-net-tun
hostPath:
path: /dev/net/tun
tolerations:
- operator: Exists
effect: NoSchedule
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: sdp-gateway
namespace: ${SDP_NAMESPACE}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: sdp-gateway
rules:
- apiGroups: [""]
resources: ["nodes", "pods"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: sdp-gateway
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: sdp-gateway
subjects:
- kind: ServiceAccount
name: sdp-gateway
namespace: ${SDP_NAMESPACE}
EOF
}
# 部署网络策略
deploy_network_policies() {
log "部署网络策略..."
cat <<EOF | kubectl apply -f -
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny-all
namespace: ${SDP_NAMESPACE}
spec:
podSelector: {}
policyTypes:
- Ingress
- Egress
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: allow-sdp-controller
namespace: ${SDP_NAMESPACE}
spec:
podSelector:
matchLabels:
app: sdp-controller
policyTypes:
- Ingress
- Egress
ingress:
- from:
- podSelector:
matchLabels:
app: sdp-gateway
- podSelector:
matchLabels:
app: sdp-client
ports:
- protocol: TCP
port: 8443
egress:
- to:
- namespaceSelector:
matchLabels:
name: kube-system
ports:
- protocol: TCP
port: 53
- protocol: UDP
port: 53
- to:
- podSelector:
matchLabels:
app: postgres
ports:
- protocol: TCP
port: 5432
- to:
- podSelector:
matchLabels:
app: opa
ports:
- protocol: TCP
port: 8181
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: allow-sdp-gateway
namespace: ${SDP_NAMESPACE}
spec:
podSelector:
matchLabels:
app: sdp-gateway
policyTypes:
- Ingress
- Egress
ingress:
- from: [] # 允许来自任何地方的连接(SPA端口敲门)
ports:
- protocol: TCP
port: 8080
- from:
- podSelector:
matchLabels:
app: sdp-client
ports:
- protocol: UDP
port: 51820
egress:
- to:
- namespaceSelector:
matchLabels:
name: kube-system
ports:
- protocol: TCP
port: 53
- protocol: UDP
port: 53
- to:
- podSelector:
matchLabels:
app: sdp-controller
ports:
- protocol: TCP
port: 8443
- to: [] # 允许访问受保护的资源
EOF
}
# 部署OPA策略引擎
deploy_opa_policies() {
log "部署OPA策略引擎..."
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
name: opa-policies
namespace: ${SDP_NAMESPACE}
data:
sdp.rego: |
package sdp
import future.keywords.if
import future.keywords.in
# 默认拒绝
default allow = false
# 允许访问的条件
allow if {
# 检查用户身份
valid_user
# 检查设备信任
trusted_device
# 检查访问时间
valid_time
# 检查资源权限
has_resource_permission
}
# 验证用户身份
valid_user if {
input.user.authenticated == true
input.user.mfa_verified == true
input.user.trust_score >= 70
}
# 验证设备信任
trusted_device if {
input.device.registered == true
input.device.compliant == true
input.device.trust_score >= 60
}
# 验证访问时间
valid_time if {
# 工作时间访问
hour := time.parse_rfc3339_ns(input.timestamp)[1]
hour >= 8
hour <= 18
}
valid_time if {
# 管理员可以24小时访问
"admin" in input.user.roles
}
# 验证资源权限
has_resource_permission if {
# 检查用户角色和资源的匹配
some role in input.user.roles
some permission in data.permissions[role]
permission.resource == input.resource
input.action in permission.actions
}
# 风险评估
risk_score := score if {
user_risk := calculate_user_risk
device_risk := calculate_device_risk
context_risk := calculate_context_risk
score := (user_risk + device_risk + context_risk) / 3
}
calculate_user_risk := risk if {
base_risk := 100 - input.user.trust_score
# 失败尝试增加风险
failure_risk := input.user.failed_attempts * 10
# MFA未启用增加风险
mfa_risk := 20 if not input.user.mfa_verified else 0
risk := base_risk + failure_risk + mfa_risk
}
calculate_device_risk := risk if {
base_risk := 100 - input.device.trust_score
# 设备不合规增加风险
compliance_risk := 30 if not input.device.compliant else 0
# 设备未注册增加风险
registration_risk := 40 if not input.device.registered else 0
risk := base_risk + compliance_risk + registration_risk
}
calculate_context_risk := risk if {
# 地理位置风险
location_risk := 20 if input.location.suspicious else 0
# 时间风险
time_risk := 15 if not valid_time else 0
# 网络风险
network_risk := 10 if input.network.public else 0
risk := location_risk + time_risk + network_risk
}
permissions.json: |
{
"admin": [
{
"resource": "*",
"actions": ["read", "write", "delete", "execute"]
}
],
"developer": [
{
"resource": "dev-*",
"actions": ["read", "write", "execute"]
},
{
"resource": "test-*",
"actions": ["read", "write"]
}
],
"user": [
{
"resource": "app-*",
"actions": ["read"]
}
]
}
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: opa
namespace: ${SDP_NAMESPACE}
labels:
app: opa
spec:
replicas: 2
selector:
matchLabels:
app: opa
template:
metadata:
labels:
app: opa
spec:
containers:
- name: opa
image: openpolicyagent/opa:latest-envoy
args:
- "run"
- "--server"
- "--config-file=/etc/opa/config.yaml"
- "/etc/opa/policies"
ports:
- containerPort: 8181
name: http
volumeMounts:
- name: policies
mountPath: /etc/opa/policies
- name: config
mountPath: /etc/opa
livenessProbe:
httpGet:
path: /health
port: 8181
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health?bundle=true
port: 8181
initialDelaySeconds: 5
periodSeconds: 5
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 512Mi
volumes:
- name: policies
configMap:
name: opa-policies
- name: config
configMap:
name: opa-config
---
apiVersion: v1
kind: ConfigMap
metadata:
name: opa-config
namespace: ${SDP_NAMESPACE}
data:
config.yaml: |
services:
authz:
url: http://localhost:8181
bundles:
authz:
resource: "/etc/opa/policies"
decision_logs:
console: true
status:
console: true
---
apiVersion: v1
kind: Service
metadata:
name: opa-service
namespace: ${SDP_NAMESPACE}
labels:
app: opa
spec:
type: ClusterIP
ports:
- port: 8181
targetPort: 8181
protocol: TCP
name: http
selector:
app: opa
EOF
}
# 生成TLS证书
generate_certificates() {
log "生成TLS证书..."
# 创建CA私钥
openssl genrsa -out ca.key 4096
# 创建CA证书
openssl req -new -x509 -key ca.key -sha256 -subj "/C=CN/ST=Beijing/L=Beijing/O=ZeroTrust/CN=ZeroTrust-CA" -days 3650 -out ca.crt
# 为SDP控制器生成证书
openssl genrsa -out controller.key 4096
openssl req -new -key controller.key -out controller.csr -subj "/C=CN/ST=Beijing/L=Beijing/O=ZeroTrust/CN=sdp-controller-service"
openssl x509 -req -in controller.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out controller.crt -days 365 -sha256
# 为SDP网关生成证书
openssl genrsa -out gateway.key 4096
openssl req -new -key gateway.key -out gateway.csr -subj "/C=CN/ST=Beijing/L=Beijing/O=ZeroTrust/CN=sdp-gateway"
openssl x509 -req -in gateway.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out gateway.crt -days 365 -sha256
# 创建Kubernetes secrets
kubectl create secret tls sdp-controller-tls \
--cert=controller.crt \
--key=controller.key \
--namespace=${SDP_NAMESPACE}
kubectl create secret tls sdp-gateway-tls \
--cert=gateway.crt \
--key=gateway.key \
--namespace=${SDP_NAMESPACE}
# 创建JWT密钥
JWT_SECRET=$(openssl rand -base64 32)
kubectl create secret generic jwt-secret \
--from-literal=secret=${JWT_SECRET} \
--namespace=${SDP_NAMESPACE}
# 清理临时文件
rm -f ca.key ca.crt ca.srl controller.key controller.csr controller.crt gateway.key gateway.csr gateway.crt
}
# 部署PostgreSQL数据库
deploy_postgresql() {
log "部署PostgreSQL数据库..."
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Secret
metadata:
name: postgres-secret
namespace: ${SDP_NAMESPACE}
type: Opaque
data:
username: $(echo -n "sdp_user" | base64)
password: $(echo -n "$(openssl rand -base64 16)" | base64)
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: postgres-pvc
namespace: ${SDP_NAMESPACE}
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: postgres
namespace: ${SDP_NAMESPACE}
labels:
app: postgres
spec:
replicas: 1
selector:
matchLabels:
app: postgres
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: postgres:13
env:
- name: POSTGRES_DB
value: sdp
- name: POSTGRES_USER
valueFrom:
secretKeyRef:
name: postgres-secret
key: username
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: postgres-secret
key: password
ports:
- containerPort: 5432
name: postgres
volumeMounts:
- name: postgres-storage
mountPath: /var/lib/postgresql/data
resources:
requests:
cpu: 100m
memory: 256Mi
limits:
cpu: 500m
memory: 1Gi
volumes:
- name: postgres-storage
persistentVolumeClaim:
claimName: postgres-pvc
---
apiVersion: v1
kind: Service
metadata:
name: postgres-service
namespace: ${SDP_NAMESPACE}
labels:
app: postgres
spec:
type: ClusterIP
ports:
- port: 5432
targetPort: 5432
protocol: TCP
name: postgres
selector:
app: postgres
EOF
}
# 部署监控
deploy_monitoring() {
log "部署监控组件..."
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ServiceMonitor
metadata:
name: sdp-controller-monitor
namespace: ${SDP_NAMESPACE}
labels:
app: sdp-controller
spec:
selector:
matchLabels:
app: sdp-controller
endpoints:
- port: metrics
interval: 30s
path: /metrics
---
apiVersion: v1
kind: ServiceMonitor
metadata:
name: sdp-gateway-monitor
namespace: ${SDP_NAMESPACE}
labels:
app: sdp-gateway
spec:
selector:
matchLabels:
app: sdp-gateway
endpoints:
- port: metrics
interval: 30s
path: /metrics
EOF
}
# 主函数
main() {
log "开始部署软件定义边界(SDP)..."
# 检查kubectl
if ! command -v kubectl &> /dev/null; then
log "错误: kubectl未安装"
exit 1
fi
# 检查openssl
if ! command -v openssl &> /dev/null; then
log "错误: openssl未安装"
exit 1
fi
# 执行部署步骤
create_namespace
generate_certificates
deploy_postgresql
deploy_opa_policies
deploy_sdp_controller
deploy_sdp_gateway
deploy_network_policies
deploy_monitoring
log "SDP部署完成!"
log "控制器服务: kubectl get svc sdp-controller-service -n ${SDP_NAMESPACE}"
log "网关状态: kubectl get ds sdp-gateway -n ${SDP_NAMESPACE}"
log "查看日志: kubectl logs -f deployment/sdp-controller -n ${SDP_NAMESPACE}"
}
# 清理函数
cleanup() {
log "清理SDP部署..."
kubectl delete namespace ${SDP_NAMESPACE} --ignore-not-found=true
log "清理完成"
}
# 检查参数
case "${1:-deploy}" in
deploy)
main
;;
cleanup)
cleanup
;;
*)
echo "用法: $0 [deploy|cleanup]"
exit 1
;;
esac
数据保护与加密
数据保护是零信任架构的核心要素,需要实现端到端的数据加密、数据分类和数据防泄漏。
数据加密管理系统
#!/usr/bin/env python3
"""
数据加密管理系统
实现端到端数据加密、密钥管理和数据分类
"""
import os
import json
import logging
import hashlib
import hmac
import base64
import time
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import secrets
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataClassification(Enum):
"""数据分类级别"""
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
TOP_SECRET = "top_secret"
class EncryptionAlgorithm(Enum):
"""加密算法"""
AES_256_GCM = "aes_256_gcm"
CHACHA20_POLY1305 = "chacha20_poly1305"
RSA_4096 = "rsa_4096"
FERNET = "fernet"
@dataclass
class DataPolicy:
"""数据策略"""
classification: DataClassification
encryption_required: bool
encryption_algorithm: EncryptionAlgorithm
key_rotation_days: int
access_roles: List[str]
retention_days: int
audit_required: bool
@dataclass
class EncryptionKey:
"""加密密钥"""
key_id: str
algorithm: EncryptionAlgorithm
key_data: bytes
created_at: float
expires_at: Optional[float]
classification: DataClassification
usage_count: int
max_usage: Optional[int]
@dataclass
class EncryptedData:
"""加密数据"""
data_id: str
encrypted_content: bytes
key_id: str
algorithm: EncryptionAlgorithm
iv: Optional[bytes]
tag: Optional[bytes]
classification: DataClassification
metadata: Dict[str, Any]
created_at: float
class DataEncryptionManager:
"""数据加密管理器"""
def __init__(self):
self.keys: Dict[str, EncryptionKey] = {}
self.data_store: Dict[str, EncryptedData] = {}
self.policies: Dict[DataClassification, DataPolicy] = {}
self.audit_log: List[Dict[str, Any]] = []
# 初始化默认策略
self._initialize_default_policies()
def _initialize_default_policies(self):
"""初始化默认数据策略"""
self.policies = {
DataClassification.PUBLIC: DataPolicy(
classification=DataClassification.PUBLIC,
encryption_required=False,
encryption_algorithm=EncryptionAlgorithm.FERNET,
key_rotation_days=365,
access_roles=["*"],
retention_days=2555, # 7年
audit_required=False
),
DataClassification.INTERNAL: DataPolicy(
classification=DataClassification.INTERNAL,
encryption_required=True,
encryption_algorithm=EncryptionAlgorithm.AES_256_GCM,
key_rotation_days=180,
access_roles=["employee"],
retention_days=1825, # 5年
audit_required=True
),
DataClassification.CONFIDENTIAL: DataPolicy(
classification=DataClassification.CONFIDENTIAL,
encryption_required=True,
encryption_algorithm=EncryptionAlgorithm.AES_256_GCM,
key_rotation_days=90,
access_roles=["manager", "admin"],
retention_days=1095, # 3年
audit_required=True
),
DataClassification.RESTRICTED: DataPolicy(
classification=DataClassification.RESTRICTED,
encryption_required=True,
encryption_algorithm=EncryptionAlgorithm.CHACHA20_POLY1305,
key_rotation_days=30,
access_roles=["admin"],
retention_days=365, # 1年
audit_required=True
),
DataClassification.TOP_SECRET: DataPolicy(
classification=DataClassification.TOP_SECRET,
encryption_required=True,
encryption_algorithm=EncryptionAlgorithm.CHACHA20_POLY1305,
key_rotation_days=7,
access_roles=["security_admin"],
retention_days=90,
audit_required=True
)
}
def generate_key(self, classification: DataClassification,
algorithm: EncryptionAlgorithm = None) -> str:
"""生成加密密钥"""
try:
policy = self.policies.get(classification)
if not policy:
raise ValueError(f"未找到分类 {classification} 的策略")
if algorithm is None:
algorithm = policy.encryption_algorithm
key_id = self._generate_key_id()
# 根据算法生成密钥
if algorithm == EncryptionAlgorithm.AES_256_GCM:
key_data = os.urandom(32) # 256位密钥
elif algorithm == EncryptionAlgorithm.CHACHA20_POLY1305:
key_data = os.urandom(32) # 256位密钥
elif algorithm == EncryptionAlgorithm.RSA_4096:
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=4096
)
key_data = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
elif algorithm == EncryptionAlgorithm.FERNET:
key_data = Fernet.generate_key()
else:
raise ValueError(f"不支持的加密算法: {algorithm}")
# 计算过期时间
expires_at = time.time() + (policy.key_rotation_days * 24 * 3600)
# 创建密钥对象
encryption_key = EncryptionKey(
key_id=key_id,
algorithm=algorithm,
key_data=key_data,
created_at=time.time(),
expires_at=expires_at,
classification=classification,
usage_count=0,
max_usage=None
)
self.keys[key_id] = encryption_key
# 记录审计日志
self._log_audit("key_generated", {
"key_id": key_id,
"algorithm": algorithm.value,
"classification": classification.value
})
logger.info(f"生成密钥 {key_id},算法: {algorithm.value}")
return key_id
except Exception as e:
logger.error(f"生成密钥失败: {e}")
raise
def encrypt_data(self, data: bytes, classification: DataClassification,
key_id: str = None, metadata: Dict[str, Any] = None) -> str:
"""加密数据"""
try:
policy = self.policies.get(classification)
if not policy:
raise ValueError(f"未找到分类 {classification} 的策略")
if not policy.encryption_required and classification != DataClassification.PUBLIC:
logger.warning(f"分类 {classification} 不要求加密,但仍然执行加密")
# 如果没有指定密钥,生成新密钥
if key_id is None:
key_id = self.generate_key(classification, policy.encryption_algorithm)
encryption_key = self.keys.get(key_id)
if not encryption_key:
raise ValueError(f"密钥 {key_id} 不存在")
# 检查密钥是否过期
if encryption_key.expires_at and time.time() > encryption_key.expires_at:
raise ValueError(f"密钥 {key_id} 已过期")
# 检查密钥使用次数
if encryption_key.max_usage and encryption_key.usage_count >= encryption_key.max_usage:
raise ValueError(f"密钥 {key_id} 使用次数已达上限")
data_id = self._generate_data_id()
iv = None
tag = None
# 根据算法加密数据
if encryption_key.algorithm == EncryptionAlgorithm.AES_256_GCM:
iv = os.urandom(12) # GCM推荐12字节IV
cipher = Cipher(
algorithms.AES(encryption_key.key_data),
modes.GCM(iv)
)
encryptor = cipher.encryptor()
encrypted_content = encryptor.update(data) + encryptor.finalize()
tag = encryptor.tag
elif encryption_key.algorithm == EncryptionAlgorithm.CHACHA20_POLY1305:
iv = os.urandom(12) # ChaCha20需要12字节nonce
cipher = Cipher(
algorithms.ChaCha20(encryption_key.key_data, iv),
modes.GCM(b'\x00' * 16) # ChaCha20-Poly1305
)
encryptor = cipher.encryptor()
encrypted_content = encryptor.update(data) + encryptor.finalize()
tag = encryptor.tag
elif encryption_key.algorithm == EncryptionAlgorithm.FERNET:
fernet = Fernet(encryption_key.key_data)
encrypted_content = fernet.encrypt(data)
elif encryption_key.algorithm == EncryptionAlgorithm.RSA_4096:
private_key = serialization.load_pem_private_key(
encryption_key.key_data,
password=None
)
public_key = private_key.public_key()
# RSA加密有长度限制,需要分块加密
max_chunk_size = 446 # 4096位RSA密钥的最大明文长度
encrypted_chunks = []
for i in range(0, len(data), max_chunk_size):
chunk = data[i:i + max_chunk_size]
encrypted_chunk = public_key.encrypt(
chunk,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
encrypted_chunks.append(encrypted_chunk)
encrypted_content = b''.join(encrypted_chunks)
else:
raise ValueError(f"不支持的加密算法: {encryption_key.algorithm}")
# 创建加密数据对象
encrypted_data = EncryptedData(
data_id=data_id,
encrypted_content=encrypted_content,
key_id=key_id,
algorithm=encryption_key.algorithm,
iv=iv,
tag=tag,
classification=classification,
metadata=metadata or {},
created_at=time.time()
)
self.data_store[data_id] = encrypted_data
# 更新密钥使用计数
encryption_key.usage_count += 1
# 记录审计日志
if policy.audit_required:
self._log_audit("data_encrypted", {
"data_id": data_id,
"key_id": key_id,
"classification": classification.value,
"data_size": len(data)
})
logger.info(f"数据加密完成,数据ID: {data_id}")
return data_id
except Exception as e:
logger.error(f"数据加密失败: {e}")
raise
def decrypt_data(self, data_id: str, user_roles: List[str] = None) -> bytes:
"""解密数据"""
try:
encrypted_data = self.data_store.get(data_id)
if not encrypted_data:
raise ValueError(f"数据 {data_id} 不存在")
# 检查访问权限
policy = self.policies.get(encrypted_data.classification)
if policy and user_roles:
if not any(role in policy.access_roles or "*" in policy.access_roles
for role in user_roles):
raise PermissionError(f"用户角色 {user_roles} 无权访问 {encrypted_data.classification.value} 级别数据")
encryption_key = self.keys.get(encrypted_data.key_id)
if not encryption_key:
raise ValueError(f"密钥 {encrypted_data.key_id} 不存在")
# 根据算法解密数据
if encrypted_data.algorithm == EncryptionAlgorithm.AES_256_GCM:
cipher = Cipher(
algorithms.AES(encryption_key.key_data),
modes.GCM(encrypted_data.iv, encrypted_data.tag)
)
decryptor = cipher.decryptor()
decrypted_data = decryptor.update(encrypted_data.encrypted_content) + decryptor.finalize()
elif encrypted_data.algorithm == EncryptionAlgorithm.CHACHA20_POLY1305:
cipher = Cipher(
algorithms.ChaCha20(encryption_key.key_data, encrypted_data.iv),
modes.GCM(encrypted_data.tag)
)
decryptor = cipher.decryptor()
decrypted_data = decryptor.update(encrypted_data.encrypted_content) + decryptor.finalize()
elif encrypted_data.algorithm == EncryptionAlgorithm.FERNET:
fernet = Fernet(encryption_key.key_data)
decrypted_data = fernet.decrypt(encrypted_data.encrypted_content)
elif encrypted_data.algorithm == EncryptionAlgorithm.RSA_4096:
private_key = serialization.load_pem_private_key(
encryption_key.key_data,
password=None
)
# RSA解密需要分块处理
chunk_size = 512 # 4096位RSA密钥的密文块大小
decrypted_chunks = []
for i in range(0, len(encrypted_data.encrypted_content), chunk_size):
chunk = encrypted_data.encrypted_content[i:i + chunk_size]
decrypted_chunk = private_key.decrypt(
chunk,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
decrypted_chunks.append(decrypted_chunk)
decrypted_data = b''.join(decrypted_chunks)
else:
raise ValueError(f"不支持的解密算法: {encrypted_data.algorithm}")
# 记录审计日志
if policy and policy.audit_required:
self._log_audit("data_decrypted", {
"data_id": data_id,
"key_id": encrypted_data.key_id,
"classification": encrypted_data.classification.value,
"user_roles": user_roles
})
logger.info(f"数据解密完成,数据ID: {data_id}")
return decrypted_data
except Exception as e:
logger.error(f"数据解密失败: {e}")
raise
def rotate_key(self, old_key_id: str) -> str:
"""轮换密钥"""
try:
old_key = self.keys.get(old_key_id)
if not old_key:
raise ValueError(f"密钥 {old_key_id} 不存在")
# 生成新密钥
new_key_id = self.generate_key(old_key.classification, old_key.algorithm)
# 查找使用旧密钥的数据
affected_data = [
data for data in self.data_store.values()
if data.key_id == old_key_id
]
# 重新加密数据
for data in affected_data:
# 解密数据
decrypted_content = self.decrypt_data(data.data_id)
# 删除旧数据
del self.data_store[data.data_id]
# 使用新密钥重新加密
new_data_id = self.encrypt_data(
decrypted_content,
data.classification,
new_key_id,
data.metadata
)
logger.info(f"数据 {data.data_id} 重新加密为 {new_data_id}")
# 标记旧密钥为已过期
old_key.expires_at = time.time()
# 记录审计日志
self._log_audit("key_rotated", {
"old_key_id": old_key_id,
"new_key_id": new_key_id,
"affected_data_count": len(affected_data)
})
logger.info(f"密钥轮换完成: {old_key_id} -> {new_key_id}")
return new_key_id
except Exception as e:
logger.error(f"密钥轮换失败: {e}")
raise
def classify_data(self, data: bytes, content_type: str = None) -> DataClassification:
"""自动数据分类"""
try:
# 简化的数据分类逻辑,实际应用中可以使用机器学习模型
data_str = data.decode('utf-8', errors='ignore').lower()
# 检查敏感关键词
top_secret_keywords = ['top secret', 'classified', 'confidential', 'secret key', 'password']
restricted_keywords = ['restricted', 'internal only', 'private', 'ssn', 'credit card']
confidential_keywords = ['confidential', 'proprietary', 'financial', 'personal']
internal_keywords = ['internal', 'employee', 'staff', 'company']
if any(keyword in data_str for keyword in top_secret_keywords):
return DataClassification.TOP_SECRET
elif any(keyword in data_str for keyword in restricted_keywords):
return DataClassification.RESTRICTED
elif any(keyword in data_str for keyword in confidential_keywords):
return DataClassification.CONFIDENTIAL
elif any(keyword in data_str for keyword in internal_keywords):
return DataClassification.INTERNAL
else:
return DataClassification.PUBLIC
except Exception as e:
logger.warning(f"数据分类失败,使用默认分类: {e}")
return DataClassification.INTERNAL
def get_data_info(self, data_id: str) -> Dict[str, Any]:
"""获取数据信息"""
encrypted_data = self.data_store.get(data_id)
if not encrypted_data:
return None
return {
"data_id": data_id,
"classification": encrypted_data.classification.value,
"algorithm": encrypted_data.algorithm.value,
"key_id": encrypted_data.key_id,
"created_at": encrypted_data.created_at,
"metadata": encrypted_data.metadata,
"size": len(encrypted_data.encrypted_content)
}
def get_key_info(self, key_id: str) -> Dict[str, Any]:
"""获取密钥信息"""
key = self.keys.get(key_id)
if not key:
return None
return {
"key_id": key_id,
"algorithm": key.algorithm.value,
"classification": key.classification.value,
"created_at": key.created_at,
"expires_at": key.expires_at,
"usage_count": key.usage_count,
"max_usage": key.max_usage,
"is_expired": key.expires_at and time.time() > key.expires_at
}
def cleanup_expired_keys(self) -> List[str]:
"""清理过期密钥"""
current_time = time.time()
expired_keys = []
for key_id, key in list(self.keys.items()):
if key.expires_at and current_time > key.expires_at:
# 检查是否还有数据使用此密钥
using_data = [
data for data in self.data_store.values()
if data.key_id == key_id
]
if not using_data:
del self.keys[key_id]
expired_keys.append(key_id)
logger.info(f"清理过期密钥: {key_id}")
return expired_keys
def generate_audit_report(self) -> Dict[str, Any]:
"""生成审计报告"""
current_time = time.time()
# 统计数据
total_keys = len(self.keys)
expired_keys = sum(1 for key in self.keys.values()
if key.expires_at and current_time > key.expires_at)
total_data = len(self.data_store)
data_by_classification = {}
for data in self.data_store.values():
classification = data.classification.value
data_by_classification[classification] = data_by_classification.get(classification, 0) + 1
# 最近的审计事件
recent_events = [
event for event in self.audit_log
if current_time - event['timestamp'] <= 86400 # 最近24小时
]
return {
"report_time": current_time,
"summary": {
"total_keys": total_keys,
"expired_keys": expired_keys,
"total_encrypted_data": total_data,
"data_by_classification": data_by_classification
},
"recent_events": len(recent_events),
"audit_log_size": len(self.audit_log)
}
def _generate_key_id(self) -> str:
"""生成密钥ID"""
return f"key_{int(time.time())}_{secrets.token_hex(8)}"
def _generate_data_id(self) -> str:
"""生成数据ID"""
return f"data_{int(time.time())}_{secrets.token_hex(8)}"
def _log_audit(self, action: str, details: Dict[str, Any]):
"""记录审计日志"""
audit_entry = {
"timestamp": time.time(),
"action": action,
"details": details
}
self.audit_log.append(audit_entry)
def main():
"""主函数 - 演示数据加密管理系统"""
dem = DataEncryptionManager()
# 测试数据
test_data = b"This is confidential financial data containing sensitive information."
# 自动分类数据
classification = dem.classify_data(test_data)
print(f"数据分类: {classification.value}")
# 加密数据
data_id = dem.encrypt_data(test_data, classification, metadata={"source": "financial_system"})
print(f"数据已加密,ID: {data_id}")
# 获取数据信息
data_info = dem.get_data_info(data_id)
print(f"数据信息: {json.dumps(data_info, indent=2, default=str)}")
# 解密数据
decrypted_data = dem.decrypt_data(data_id, user_roles=["admin"])
print(f"解密成功: {decrypted_data == test_data}")
# 密钥轮换
key_id = data_info["key_id"]
new_key_id = dem.rotate_key(key_id)
print(f"密钥轮换: {key_id} -> {new_key_id}")
# 生成审计报告
report = dem.generate_audit_report()
print(f"审计报告: {json.dumps(report, indent=2, default=str)}")
if __name__ == "__main__":
main()
身份与访问管理
身份与访问管理(IAM)是零信任架构的核心组件,负责验证用户身份、管理访问权限和监控用户行为。
多因子认证系统
#!/usr/bin/env python3
"""
多因子认证系统
实现基于TOTP、SMS、生物识别等多种认证方式
"""
import pyotp
import qrcode
import io
import base64
import hashlib
import hmac
import time
import random
import string
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class AuthFactorType(Enum):
"""认证因子类型"""
PASSWORD = "password"
TOTP = "totp"
SMS = "sms"
EMAIL = "email"
BIOMETRIC = "biometric"
HARDWARE_TOKEN = "hardware_token"
PUSH_NOTIFICATION = "push_notification"
@dataclass
class AuthFactor:
"""认证因子"""
factor_id: str
factor_type: AuthFactorType
user_id: str
secret: str
backup_codes: List[str]
enabled: bool
created_at: float
last_used: Optional[float] = None
@dataclass
class AuthSession:
"""认证会话"""
session_id: str
user_id: str
completed_factors: List[AuthFactorType]
required_factors: List[AuthFactorType]
created_at: float
expires_at: float
ip_address: str
user_agent: str
class MultiFactorAuthenticator:
"""多因子认证器"""
def __init__(self):
self.factors: Dict[str, AuthFactor] = {}
self.sessions: Dict[str, AuthSession] = {}
self.user_factors: Dict[str, List[str]] = {} # user_id -> factor_ids
def register_totp_factor(self, user_id: str, issuer: str = "ZeroTrust") -> Tuple[str, str]:
"""注册TOTP认证因子"""
try:
# 生成密钥
secret = pyotp.random_base32()
# 创建认证因子
factor_id = self._generate_factor_id()
backup_codes = self._generate_backup_codes()
factor = AuthFactor(
factor_id=factor_id,
factor_type=AuthFactorType.TOTP,
user_id=user_id,
secret=secret,
backup_codes=backup_codes,
enabled=True,
created_at=time.time()
)
self.factors[factor_id] = factor
# 添加到用户因子列表
if user_id not in self.user_factors:
self.user_factors[user_id] = []
self.user_factors[user_id].append(factor_id)
# 生成QR码
totp = pyotp.TOTP(secret)
provisioning_uri = totp.provisioning_uri(
name=user_id,
issuer_name=issuer
)
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(provisioning_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
img_buffer = io.BytesIO()
img.save(img_buffer, format='PNG')
img_buffer.seek(0)
qr_code_base64 = base64.b64encode(img_buffer.getvalue()).decode()
logger.info(f"为用户 {user_id} 注册TOTP认证因子: {factor_id}")
return factor_id, qr_code_base64
except Exception as e:
logger.error(f"注册TOTP认证因子失败: {e}")
raise
def register_sms_factor(self, user_id: str, phone_number: str) -> str:
"""注册SMS认证因子"""
try:
factor_id = self._generate_factor_id()
# 这里使用手机号作为密钥(实际应用中需要加密存储)
secret = hashlib.sha256(phone_number.encode()).hexdigest()
backup_codes = self._generate_backup_codes()
factor = AuthFactor(
factor_id=factor_id,
factor_type=AuthFactorType.SMS,
user_id=user_id,
secret=secret,
backup_codes=backup_codes,
enabled=True,
created_at=time.time()
)
self.factors[factor_id] = factor
if user_id not in self.user_factors:
self.user_factors[user_id] = []
self.user_factors[user_id].append(factor_id)
logger.info(f"为用户 {user_id} 注册SMS认证因子: {factor_id}")
return factor_id
except Exception as e:
logger.error(f"注册SMS认证因子失败: {e}")
raise
def start_auth_session(self, user_id: str, required_factors: List[AuthFactorType],
ip_address: str, user_agent: str, session_timeout: int = 300) -> str:
"""开始认证会话"""
try:
session_id = self._generate_session_id()
session = AuthSession(
session_id=session_id,
user_id=user_id,
completed_factors=[],
required_factors=required_factors,
created_at=time.time(),
expires_at=time.time() + session_timeout,
ip_address=ip_address,
user_agent=user_agent
)
self.sessions[session_id] = session
logger.info(f"为用户 {user_id} 开始认证会话: {session_id}")
return session_id
except Exception as e:
logger.error(f"开始认证会话失败: {e}")
raise
def verify_totp_code(self, session_id: str, code: str) -> bool:
"""验证TOTP代码"""
try:
session = self.sessions.get(session_id)
if not session:
logger.error("认证会话不存在")
return False
if time.time() > session.expires_at:
logger.error("认证会话已过期")
return False
# 查找用户的TOTP因子
user_factor_ids = self.user_factors.get(session.user_id, [])
totp_factor = None
for factor_id in user_factor_ids:
factor = self.factors.get(factor_id)
if factor and factor.factor_type == AuthFactorType.TOTP and factor.enabled:
totp_factor = factor
break
if not totp_factor:
logger.error("用户未注册TOTP认证因子")
return False
# 验证TOTP代码
totp = pyotp.TOTP(totp_factor.secret)
if totp.verify(code, valid_window=1): # 允许前后30秒的时间窗口
# 更新会话状态
if AuthFactorType.TOTP not in session.completed_factors:
session.completed_factors.append(AuthFactorType.TOTP)
# 更新因子使用时间
totp_factor.last_used = time.time()
logger.info(f"用户 {session.user_id} TOTP验证成功")
return True
# 检查备用代码
if code in totp_factor.backup_codes:
totp_factor.backup_codes.remove(code) # 备用代码只能使用一次
if AuthFactorType.TOTP not in session.completed_factors:
session.completed_factors.append(AuthFactorType.TOTP)
totp_factor.last_used = time.time()
logger.info(f"用户 {session.user_id} 备用代码验证成功")
return True
logger.error("TOTP代码验证失败")
return False
except Exception as e:
logger.error(f"验证TOTP代码失败: {e}")
return False
def send_sms_code(self, session_id: str) -> bool:
"""发送SMS验证码"""
try:
session = self.sessions.get(session_id)
if not session:
logger.error("认证会话不存在")
return False
if time.time() > session.expires_at:
logger.error("认证会话已过期")
return False
# 生成6位数字验证码
sms_code = ''.join(random.choices(string.digits, k=6))
# 存储验证码(实际应用中应该有过期时间和使用次数限制)
session_key = f"sms_code_{session_id}"
self._store_temp_code(session_key, sms_code, 300) # 5分钟过期
# 这里应该调用SMS服务发送验证码
# 为了演示,我们只是记录日志
logger.info(f"向用户 {session.user_id} 发送SMS验证码: {sms_code}")
return True
except Exception as e:
logger.error(f"发送SMS验证码失败: {e}")
return False
def verify_sms_code(self, session_id: str, code: str) -> bool:
"""验证SMS代码"""
try:
session = self.sessions.get(session_id)
if not session:
logger.error("认证会话不存在")
return False
if time.time() > session.expires_at:
logger.error("认证会话已过期")
return False
# 获取存储的验证码
session_key = f"sms_code_{session_id}"
stored_code = self._get_temp_code(session_key)
if not stored_code:
logger.error("SMS验证码不存在或已过期")
return False
if code == stored_code:
# 验证成功,删除验证码
self._delete_temp_code(session_key)
# 更新会话状态
if AuthFactorType.SMS not in session.completed_factors:
session.completed_factors.append(AuthFactorType.SMS)
logger.info(f"用户 {session.user_id} SMS验证成功")
return True
logger.error("SMS验证码错误")
return False
except Exception as e:
logger.error(f"验证SMS代码失败: {e}")
return False
def is_auth_complete(self, session_id: str) -> bool:
"""检查认证是否完成"""
session = self.sessions.get(session_id)
if not session:
return False
if time.time() > session.expires_at:
return False
# 检查是否完成了所有必需的认证因子
for required_factor in session.required_factors:
if required_factor not in session.completed_factors:
return False
return True
def get_auth_status(self, session_id: str) -> Dict[str, any]:
"""获取认证状态"""
session = self.sessions.get(session_id)
if not session:
return {'error': '认证会话不存在'}
if time.time() > session.expires_at:
return {'error': '认证会话已过期'}
remaining_factors = [
factor for factor in session.required_factors
if factor not in session.completed_factors
]
return {
'session_id': session_id,
'user_id': session.user_id,
'completed_factors': [f.value for f in session.completed_factors],
'remaining_factors': [f.value for f in remaining_factors],
'is_complete': len(remaining_factors) == 0,
'expires_at': session.expires_at
}
def _generate_factor_id(self) -> str:
"""生成认证因子ID"""
return f"factor_{int(time.time())}_{random.randint(1000, 9999)}"
def _generate_session_id(self) -> str:
"""生成会话ID"""
return f"session_{int(time.time())}_{random.randint(10000, 99999)}"
def _generate_backup_codes(self, count: int = 10) -> List[str]:
"""生成备用代码"""
codes = []
for _ in range(count):
code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
codes.append(code)
return codes
def _store_temp_code(self, key: str, code: str, ttl: int):
"""存储临时验证码(简化实现,实际应使用Redis等)"""
# 这里使用简单的内存存储,实际应用中应该使用Redis等持久化存储
if not hasattr(self, '_temp_codes'):
self._temp_codes = {}
self._temp_codes[key] = {
'code': code,
'expires_at': time.time() + ttl
}
def _get_temp_code(self, key: str) -> Optional[str]:
"""获取临时验证码"""
if not hasattr(self, '_temp_codes'):
return None
data = self._temp_codes.get(key)
if not data:
return None
if time.time() > data['expires_at']:
del self._temp_codes[key]
return None
return data['code']
def _delete_temp_code(self, key: str):
"""删除临时验证码"""
if hasattr(self, '_temp_codes') and key in self._temp_codes:
del self._temp_codes[key]
def main():
"""主函数 - 演示多因子认证系统"""
mfa = MultiFactorAuthenticator()
user_id = "alice@company.com"
# 注册TOTP认证因子
print("注册TOTP认证因子...")
factor_id, qr_code = mfa.register_totp_factor(user_id)
print(f"TOTP因子ID: {factor_id}")
print(f"QR码已生成 (Base64长度: {len(qr_code)})")
# 注册SMS认证因子
print("\n注册SMS认证因子...")
sms_factor_id = mfa.register_sms_factor(user_id, "+86138****1234")
print(f"SMS因子ID: {sms_factor_id}")
# 开始认证会话
print("\n开始认证会话...")
session_id = mfa.start_auth_session(
user_id=user_id,
required_factors=[AuthFactorType.TOTP, AuthFactorType.SMS],
ip_address="192.168.1.100",
user_agent="Mozilla/5.0"
)
print(f"会话ID: {session_id}")
# 检查认证状态
status = mfa.get_auth_status(session_id)
print(f"\n认证状态: {status}")
# 模拟TOTP验证(实际应用中用户会从认证器应用获取代码)
print("\n模拟TOTP验证...")
factor = mfa.factors[factor_id]
totp = pyotp.TOTP(factor.secret)
current_code = totp.now()
print(f"当前TOTP代码: {current_code}")
totp_result = mfa.verify_totp_code(session_id, current_code)
print(f"TOTP验证结果: {totp_result}")
# 发送SMS验证码
print("\n发送SMS验证码...")
sms_sent = mfa.send_sms_code(session_id)
print(f"SMS发送结果: {sms_sent}")
# 模拟SMS验证(实际应用中用户会从短信获取代码)
print("\n模拟SMS验证...")
# 这里我们直接从临时存储中获取验证码进行演示
temp_key = f"sms_code_{session_id}"
sms_code = mfa._get_temp_code(temp_key)
print(f"SMS验证码: {sms_code}")
sms_result = mfa.verify_sms_code(session_id, sms_code)
print(f"SMS验证结果: {sms_result}")
# 检查最终认证状态
final_status = mfa.get_auth_status(session_id)
print(f"\n最终认证状态: {final_status}")
is_complete = mfa.is_auth_complete(session_id)
print(f"认证完成: {is_complete}")
if __name__ == "__main__":
main()