跳转到主要内容

云原生应用的容器化架构策略:构建现代化、可扩展的应用平台

博主
31 分钟
6557 字
--

AI 导读

深刻理解和准确把握"云原生应用的容器化架构策略:构建现代化、可扩展的应用平台"这一重要概念的核心要义,本文从理论基础、实践应用和发展前景等多个维度进行了系统性阐述,为读者提供了全面而深入的分析视角。

内容由AI智能生成

目录

  1. 云原生容器化概述
  2. 容器设计模式与最佳实践
  3. Kubernetes编排平台架构
  4. 容器网络与存储架构
  5. 服务网格与流量管理
  6. CI/CD流水线设计
  7. 监控与可观测性
  8. 安全与合规
  9. 总结

云原生容器化概述

云原生容器化是现代应用架构的核心,它通过容器技术实现应用的标准化、可移植性和可扩展性。

容器化架构全景图

graph TB
    subgraph "开发层"
        A[应用代码] --> B[Dockerfile]
        B --> C[容器镜像]
    end
    
    subgraph "编排层"
        D[Kubernetes集群]
        E[Pod管理]
        F[Service发现]
        G[配置管理]
    end
    
    subgraph "平台层"
        H[容器运行时]
        I[网络插件]
        J[存储插件]
        K[监控系统]
    end
    
    subgraph "基础设施层"
        L[计算节点]
        M[网络设备]
        N[存储系统]
        O[安全组件]
    end
    
    C --> D
    D --> E
    D --> F
    D --> G
    E --> H
    F --> I
    G --> J
    K --> H
    H --> L
    I --> M
    J --> N
    K --> O

容器化架构分析器

#!/usr/bin/env python3
"""
云原生容器化架构分析器
分析应用的容器化适配性和架构建议
"""

import os
import json
import yaml
import docker
import kubernetes
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
import subprocess
import tempfile
from pathlib import Path

@dataclass
class ContainerizationAssessment:
    """容器化评估结果"""
    application_name: str
    containerization_score: float
    recommendations: List[str]
    architecture_patterns: List[str]
    resource_requirements: Dict[str, Any]
    security_considerations: List[str]
    migration_complexity: str
    estimated_effort: str

@dataclass
class ApplicationProfile:
    """应用画像"""
    name: str
    language: str
    framework: str
    dependencies: List[str]
    database_type: str
    external_services: List[str]
    file_system_usage: Dict[str, Any]
    network_requirements: Dict[str, Any]
    performance_requirements: Dict[str, Any]

class ContainerArchitectureAnalyzer:
    """容器架构分析器"""
    
    def __init__(self):
        self.docker_client = docker.from_env()
        self.supported_languages = {
            'python': {'base_images': ['python:3.9-slim', 'python:3.11-alpine'], 'patterns': ['multi-stage', 'distroless']},
            'java': {'base_images': ['openjdk:11-jre-slim', 'eclipse-temurin:17-jre'], 'patterns': ['multi-stage', 'jib']},
            'nodejs': {'base_images': ['node:18-alpine', 'node:20-slim'], 'patterns': ['multi-stage', 'npm-ci']},
            'go': {'base_images': ['golang:1.21-alpine', 'scratch'], 'patterns': ['multi-stage', 'static-binary']},
            'dotnet': {'base_images': ['mcr.microsoft.com/dotnet/aspnet:7.0', 'mcr.microsoft.com/dotnet/runtime:7.0'], 'patterns': ['multi-stage', 'self-contained']}
        }
    
    def analyze_application(self, app_path: str) -> ApplicationProfile:
        """分析应用特征"""
        profile = ApplicationProfile(
            name=os.path.basename(app_path),
            language=self._detect_language(app_path),
            framework=self._detect_framework(app_path),
            dependencies=self._analyze_dependencies(app_path),
            database_type=self._detect_database_usage(app_path),
            external_services=self._detect_external_services(app_path),
            file_system_usage=self._analyze_file_system_usage(app_path),
            network_requirements=self._analyze_network_requirements(app_path),
            performance_requirements=self._analyze_performance_requirements(app_path)
        )
        
        return profile
    
    def _detect_language(self, app_path: str) -> str:
        """检测编程语言"""
        language_indicators = {
            'python': ['requirements.txt', 'setup.py', 'pyproject.toml', '*.py'],
            'java': ['pom.xml', 'build.gradle', '*.java'],
            'nodejs': ['package.json', '*.js', '*.ts'],
            'go': ['go.mod', 'go.sum', '*.go'],
            'dotnet': ['*.csproj', '*.sln', '*.cs']
        }
        
        for language, indicators in language_indicators.items():
            for indicator in indicators:
                if indicator.startswith('*'):
                    # 文件扩展名检查
                    ext = indicator[1:]
                    for root, dirs, files in os.walk(app_path):
                        if any(f.endswith(ext) for f in files):
                            return language
                else:
                    # 特定文件检查
                    if os.path.exists(os.path.join(app_path, indicator)):
                        return language
        
        return 'unknown'
    
    def _detect_framework(self, app_path: str) -> str:
        """检测应用框架"""
        framework_patterns = {
            'django': ['manage.py', 'settings.py'],
            'flask': ['app.py', 'application.py'],
            'fastapi': ['main.py'],
            'spring': ['pom.xml', 'application.properties'],
            'express': ['package.json'],
            'gin': ['go.mod'],
            'aspnet': ['*.csproj']
        }
        
        for framework, patterns in framework_patterns.items():
            for pattern in patterns:
                if pattern.startswith('*'):
                    ext = pattern[1:]
                    for root, dirs, files in os.walk(app_path):
                        if any(f.endswith(ext) for f in files):
                            return framework
                else:
                    if os.path.exists(os.path.join(app_path, pattern)):
                        return framework
        
        return 'unknown'
    
    def _analyze_dependencies(self, app_path: str) -> List[str]:
        """分析依赖项"""
        dependencies = []
        
        # Python依赖
        req_file = os.path.join(app_path, 'requirements.txt')
        if os.path.exists(req_file):
            with open(req_file, 'r') as f:
                dependencies.extend([line.strip() for line in f if line.strip() and not line.startswith('#')])
        
        # Node.js依赖
        package_file = os.path.join(app_path, 'package.json')
        if os.path.exists(package_file):
            with open(package_file, 'r') as f:
                package_data = json.load(f)
                if 'dependencies' in package_data:
                    dependencies.extend(package_data['dependencies'].keys())
        
        # Java依赖 (Maven)
        pom_file = os.path.join(app_path, 'pom.xml')
        if os.path.exists(pom_file):
            # 简化的XML解析
            with open(pom_file, 'r') as f:
                content = f.read()
                # 这里应该使用XML解析器,简化处理
                dependencies.append('maven-dependencies')
        
        return dependencies
    
    def _detect_database_usage(self, app_path: str) -> str:
        """检测数据库使用"""
        db_indicators = {
            'postgresql': ['psycopg2', 'postgresql', 'postgres'],
            'mysql': ['mysql', 'pymysql', 'mysql-connector'],
            'mongodb': ['pymongo', 'mongodb', 'mongoose'],
            'redis': ['redis', 'redis-py'],
            'sqlite': ['sqlite3', 'sqlite']
        }
        
        dependencies = self._analyze_dependencies(app_path)
        dep_str = ' '.join(dependencies).lower()
        
        for db_type, indicators in db_indicators.items():
            if any(indicator in dep_str for indicator in indicators):
                return db_type
        
        return 'none'
    
    def _detect_external_services(self, app_path: str) -> List[str]:
        """检测外部服务依赖"""
        services = []
        
        # 检查配置文件中的外部服务
        config_files = ['config.yaml', 'config.json', '.env', 'docker-compose.yml']
        
        for config_file in config_files:
            config_path = os.path.join(app_path, config_file)
            if os.path.exists(config_path):
                with open(config_path, 'r') as f:
                    content = f.read().lower()
                    
                    # 检查常见服务
                    service_patterns = ['redis', 'elasticsearch', 'kafka', 'rabbitmq', 'memcached']
                    for pattern in service_patterns:
                        if pattern in content:
                            services.append(pattern)
        
        return list(set(services))
    
    def _analyze_file_system_usage(self, app_path: str) -> Dict[str, Any]:
        """分析文件系统使用"""
        usage = {
            'read_only': True,
            'temp_files': False,
            'log_files': False,
            'upload_directory': False,
            'config_files': []
        }
        
        # 检查是否有写入操作
        for root, dirs, files in os.walk(app_path):
            for file in files:
                if file.endswith(('.log', '.tmp')):
                    usage['read_only'] = False
                    usage['temp_files'] = True
                
                if 'upload' in file.lower() or 'temp' in file.lower():
                    usage['upload_directory'] = True
                    usage['read_only'] = False
                
                if file in ['config.yaml', 'config.json', '.env']:
                    usage['config_files'].append(file)
        
        return usage
    
    def _analyze_network_requirements(self, app_path: str) -> Dict[str, Any]:
        """分析网络需求"""
        requirements = {
            'ports': [],
            'protocols': ['HTTP'],
            'load_balancing': False,
            'ssl_termination': False
        }
        
        # 检查端口配置
        config_files = ['config.yaml', 'config.json', '.env', 'docker-compose.yml']
        
        for config_file in config_files:
            config_path = os.path.join(app_path, config_file)
            if os.path.exists(config_path):
                with open(config_path, 'r') as f:
                    content = f.read()
                    
                    # 简单的端口检测
                    import re
                    port_pattern = r'port[:\s]*(\d+)'
                    ports = re.findall(port_pattern, content, re.IGNORECASE)
                    requirements['ports'].extend([int(p) for p in ports])
                    
                    # 检查HTTPS/SSL
                    if 'https' in content.lower() or 'ssl' in content.lower():
                        requirements['ssl_termination'] = True
                        requirements['protocols'].append('HTTPS')
        
        return requirements
    
    def _analyze_performance_requirements(self, app_path: str) -> Dict[str, Any]:
        """分析性能需求"""
        requirements = {
            'cpu_intensive': False,
            'memory_intensive': False,
            'io_intensive': False,
            'estimated_resources': {
                'cpu': '100m',
                'memory': '128Mi'
            }
        }
        
        # 基于语言和框架估算资源需求
        language = self._detect_language(app_path)
        framework = self._detect_framework(app_path)
        
        if language == 'java':
            requirements['estimated_resources'] = {'cpu': '500m', 'memory': '512Mi'}
            requirements['memory_intensive'] = True
        elif language == 'python':
            requirements['estimated_resources'] = {'cpu': '200m', 'memory': '256Mi'}
        elif language == 'nodejs':
            requirements['estimated_resources'] = {'cpu': '100m', 'memory': '128Mi'}
        elif language == 'go':
            requirements['estimated_resources'] = {'cpu': '100m', 'memory': '64Mi'}
        
        return requirements
    
    def assess_containerization(self, profile: ApplicationProfile) -> ContainerizationAssessment:
        """评估容器化适配性"""
        score = 0.0
        recommendations = []
        patterns = []
        security_considerations = []
        
        # 语言支持评分
        if profile.language in self.supported_languages:
            score += 20
            patterns.extend(self.supported_languages[profile.language]['patterns'])
        else:
            score += 5
            recommendations.append(f"需要为{profile.language}语言创建自定义容器化方案")
        
        # 依赖管理评分
        if profile.dependencies:
            score += 15
            recommendations.append("使用多阶段构建优化镜像大小")
        else:
            score += 10
        
        # 文件系统使用评分
        if profile.file_system_usage['read_only']:
            score += 20
            patterns.append('immutable-container')
        else:
            score += 10
            recommendations.append("考虑使用持久卷存储可变数据")
            security_considerations.append("确保容器文件系统权限配置正确")
        
        # 外部服务依赖评分
        if profile.external_services:
            score += 10
            recommendations.append("使用服务发现机制管理外部服务依赖")
            patterns.append('sidecar-pattern')
        else:
            score += 15
        
        # 网络需求评分
        if profile.network_requirements['ssl_termination']:
            score += 10
            security_considerations.append("在Ingress层处理SSL终止")
        
        # 性能需求评分
        if not profile.performance_requirements['cpu_intensive']:
            score += 10
        if not profile.performance_requirements['memory_intensive']:
            score += 10
        
        # 确定迁移复杂度
        if score >= 80:
            complexity = "低"
            effort = "1-2周"
        elif score >= 60:
            complexity = "中等"
            effort = "2-4周"
        else:
            complexity = "高"
            effort = "4-8周"
        
        return ContainerizationAssessment(
            application_name=profile.name,
            containerization_score=score,
            recommendations=recommendations,
            architecture_patterns=patterns,
            resource_requirements=profile.performance_requirements['estimated_resources'],
            security_considerations=security_considerations,
            migration_complexity=complexity,
            estimated_effort=effort
        )
    
    def generate_dockerfile(self, profile: ApplicationProfile) -> str:
        """生成Dockerfile"""
        if profile.language not in self.supported_languages:
            return "# 不支持的语言类型"
        
        lang_config = self.supported_languages[profile.language]
        base_image = lang_config['base_images'][0]
        
        dockerfile_content = f"""# 多阶段构建Dockerfile for {profile.name}
# 构建阶段
FROM {base_image} AS builder

WORKDIR /app

# 复制依赖文件
"""
        
        if profile.language == 'python':
            dockerfile_content += """COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 运行阶段
FROM python:3.9-slim AS runtime

WORKDIR /app

# 创建非root用户
RUN groupadd -r appuser && useradd -r -g appuser appuser

# 复制依赖和应用
COPY --from=builder /usr/local/lib/python3.9/site-packages /usr/local/lib/python3.9/site-packages
COPY --from=builder /app .

# 设置权限
RUN chown -R appuser:appuser /app
USER appuser

# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \\
    CMD curl -f http://localhost:8000/health || exit 1

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["python", "app.py"]
"""
        
        elif profile.language == 'java':
            dockerfile_content += """COPY pom.xml .
RUN mvn dependency:go-offline

# 复制源代码并构建
COPY src ./src
RUN mvn clean package -DskipTests

# 运行阶段
FROM eclipse-temurin:17-jre AS runtime

WORKDIR /app

# 创建非root用户
RUN groupadd -r appuser && useradd -r -g appuser appuser

# 复制JAR文件
COPY --from=builder /app/target/*.jar app.jar

# 设置权限
RUN chown appuser:appuser app.jar
USER appuser

# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \\
    CMD curl -f http://localhost:8080/actuator/health || exit 1

# 暴露端口
EXPOSE 8080

# 启动命令
CMD ["java", "-jar", "app.jar"]
"""
        
        elif profile.language == 'nodejs':
            dockerfile_content += """COPY package*.json ./
RUN npm ci --only=production

# 复制应用代码
COPY . .

# 运行阶段
FROM node:18-alpine AS runtime

WORKDIR /app

# 创建非root用户
RUN addgroup -g 1001 -S nodejs && adduser -S nodejs -u 1001

# 复制依赖和应用
COPY --from=builder /app/node_modules ./node_modules
COPY --from=builder /app .

# 设置权限
RUN chown -R nodejs:nodejs /app
USER nodejs

# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \\
    CMD curl -f http://localhost:3000/health || exit 1

# 暴露端口
EXPOSE 3000

# 启动命令
CMD ["node", "index.js"]
"""
        
        return dockerfile_content
    
    def generate_kubernetes_manifests(self, profile: ApplicationProfile, assessment: ContainerizationAssessment) -> Dict[str, str]:
        """生成Kubernetes清单文件"""
        manifests = {}
        
        # Deployment
        deployment = {
            'apiVersion': 'apps/v1',
            'kind': 'Deployment',
            'metadata': {
                'name': profile.name,
                'labels': {
                    'app': profile.name,
                    'version': 'v1'
                }
            },
            'spec': {
                'replicas': 3,
                'selector': {
                    'matchLabels': {
                        'app': profile.name
                    }
                },
                'template': {
                    'metadata': {
                        'labels': {
                            'app': profile.name,
                            'version': 'v1'
                        }
                    },
                    'spec': {
                        'containers': [{
                            'name': profile.name,
                            'image': f'{profile.name}:latest',
                            'ports': [{
                                'containerPort': profile.network_requirements.get('ports', [8080])[0] if profile.network_requirements.get('ports') else 8080
                            }],
                            'resources': {
                                'requests': assessment.resource_requirements,
                                'limits': {
                                    'cpu': '1000m',
                                    'memory': '1Gi'
                                }
                            },
                            'livenessProbe': {
                                'httpGet': {
                                    'path': '/health',
                                    'port': profile.network_requirements.get('ports', [8080])[0] if profile.network_requirements.get('ports') else 8080
                                },
                                'initialDelaySeconds': 30,
                                'periodSeconds': 10
                            },
                            'readinessProbe': {
                                'httpGet': {
                                    'path': '/ready',
                                    'port': profile.network_requirements.get('ports', [8080])[0] if profile.network_requirements.get('ports') else 8080
                                },
                                'initialDelaySeconds': 5,
                                'periodSeconds': 5
                            }
                        }],
                        'securityContext': {
                            'runAsNonRoot': True,
                            'runAsUser': 1001,
                            'fsGroup': 1001
                        }
                    }
                }
            }
        }
        
        manifests['deployment.yaml'] = yaml.dump(deployment, default_flow_style=False)
        
        # Service
        service = {
            'apiVersion': 'v1',
            'kind': 'Service',
            'metadata': {
                'name': profile.name,
                'labels': {
                    'app': profile.name
                }
            },
            'spec': {
                'selector': {
                    'app': profile.name
                },
                'ports': [{
                    'port': 80,
                    'targetPort': profile.network_requirements.get('ports', [8080])[0] if profile.network_requirements.get('ports') else 8080,
                    'protocol': 'TCP'
                }],
                'type': 'ClusterIP'
            }
        }
        
        manifests['service.yaml'] = yaml.dump(service, default_flow_style=False)
        
        # ConfigMap (如果有配置文件)
        if profile.file_system_usage['config_files']:
            configmap = {
                'apiVersion': 'v1',
                'kind': 'ConfigMap',
                'metadata': {
                    'name': f'{profile.name}-config'
                },
                'data': {
                    'app.properties': '# 应用配置\nserver.port=8080\n'
                }
            }
            
            manifests['configmap.yaml'] = yaml.dump(configmap, default_flow_style=False)
        
        # Ingress (如果需要外部访问)
        if profile.network_requirements.get('ssl_termination'):
            ingress = {
                'apiVersion': 'networking.k8s.io/v1',
                'kind': 'Ingress',
                'metadata': {
                    'name': profile.name,
                    'annotations': {
                        'nginx.ingress.kubernetes.io/rewrite-target': '/',
                        'cert-manager.io/cluster-issuer': 'letsencrypt-prod'
                    }
                },
                'spec': {
                    'tls': [{
                        'hosts': [f'{profile.name}.example.com'],
                        'secretName': f'{profile.name}-tls'
                    }],
                    'rules': [{
                        'host': f'{profile.name}.example.com',
                        'http': {
                            'paths': [{
                                'path': '/',
                                'pathType': 'Prefix',
                                'backend': {
                                    'service': {
                                        'name': profile.name,
                                        'port': {
                                            'number': 80
                                        }
                                    }
                                }
                            }]
                        }
                    }]
                }
            }
            
            manifests['ingress.yaml'] = yaml.dump(ingress, default_flow_style=False)
        
        return manifests

# 使用示例
def main():
    """主函数 - 容器化架构分析示例"""
    analyzer = ContainerArchitectureAnalyzer()
    
    # 分析示例应用
    app_path = "/path/to/your/application"
    
    print("=== 云原生容器化架构分析 ===")
    
    # 分析应用
    profile = analyzer.analyze_application(app_path)
    print(f"\n应用画像:")
    print(f"  名称: {profile.name}")
    print(f"  语言: {profile.language}")
    print(f"  框架: {profile.framework}")
    print(f"  数据库: {profile.database_type}")
    print(f"  外部服务: {', '.join(profile.external_services)}")
    
    # 评估容器化适配性
    assessment = analyzer.assess_containerization(profile)
    print(f"\n容器化评估:")
    print(f"  适配性评分: {assessment.containerization_score}/100")
    print(f"  迁移复杂度: {assessment.migration_complexity}")
    print(f"  预估工作量: {assessment.estimated_effort}")
    
    print(f"\n架构建议:")
    for rec in assessment.recommendations:
        print(f"  - {rec}")
    
    print(f"\n设计模式:")
    for pattern in assessment.architecture_patterns:
        print(f"  - {pattern}")
    
    print(f"\n安全考虑:")
    for sec in assessment.security_considerations:
        print(f"  - {sec}")
    
    # 生成Dockerfile
    dockerfile = analyzer.generate_dockerfile(profile)
    print(f"\n生成的Dockerfile:")
    print(dockerfile)
    
    # 生成Kubernetes清单
    manifests = analyzer.generate_kubernetes_manifests(profile, assessment)
    print(f"\n生成的Kubernetes清单文件:")
    for filename, content in manifests.items():
        print(f"\n--- {filename} ---")
        print(content)

if __name__ == "__main__":
    main()

Kubernetes编排平台架构

Kubernetes作为容器编排平台的核心,提供了完整的容器生命周期管理能力。

Kubernetes集群架构部署

#!/bin/bash
# Kubernetes集群自动化部署脚本

set -e

# 配置变量
CLUSTER_NAME="production-k8s"
K8S_VERSION="1.28.0"
MASTER_NODES=3
WORKER_NODES=5
NETWORK_PLUGIN="calico"

echo "=== Kubernetes集群部署开始 ==="

# 1. 准备节点环境
prepare_nodes() {
    echo "准备节点环境..."
    
    # 禁用swap
    sudo swapoff -a
    sudo sed -i '/ swap / s/^\(.*\)$/#\1/g' /etc/fstab
    
    # 加载内核模块
    cat <<EOF | sudo tee /etc/modules-load.d/k8s.conf
br_netfilter
overlay
EOF
    
    sudo modprobe br_netfilter
    sudo modprobe overlay
    
    # 设置内核参数
    cat <<EOF | sudo tee /etc/sysctl.d/k8s.conf
net.bridge.bridge-nf-call-iptables  = 1
net.bridge.bridge-nf-call-ip6tables = 1
net.ipv4.ip_forward                 = 1
EOF
    
    sudo sysctl --system
    
    # 安装容器运行时 (containerd)
    sudo apt-get update
    sudo apt-get install -y containerd
    
    # 配置containerd
    sudo mkdir -p /etc/containerd
    containerd config default | sudo tee /etc/containerd/config.toml
    
    # 启用SystemdCgroup
    sudo sed -i 's/SystemdCgroup = false/SystemdCgroup = true/' /etc/containerd/config.toml
    
    sudo systemctl restart containerd
    sudo systemctl enable containerd
}

# 2. 安装kubeadm、kubelet、kubectl
install_kubernetes_tools() {
    echo "安装Kubernetes工具..."
    
    sudo apt-get update
    sudo apt-get install -y apt-transport-https ca-certificates curl
    
    # 添加Kubernetes APT仓库
    curl -fsSL https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo gpg --dearmor -o /etc/apt/keyrings/kubernetes-archive-keyring.gpg
    echo "deb [signed-by=/etc/apt/keyrings/kubernetes-archive-keyring.gpg] https://apt.kubernetes.io/ kubernetes-xenial main" | sudo tee /etc/apt/sources.list.d/kubernetes.list
    
    sudo apt-get update
    sudo apt-get install -y kubelet=$K8S_VERSION-00 kubeadm=$K8S_VERSION-00 kubectl=$K8S_VERSION-00
    sudo apt-mark hold kubelet kubeadm kubectl
}

# 3. 初始化控制平面
init_control_plane() {
    echo "初始化控制平面..."
    
    # 创建kubeadm配置文件
    cat <<EOF > kubeadm-config.yaml
apiVersion: kubeadm.k8s.io/v1beta3
kind: InitConfiguration
localAPIEndpoint:
  advertiseAddress: $(hostname -I | awk '{print $1}')
  bindPort: 6443
nodeRegistration:
  criSocket: unix:///var/run/containerd/containerd.sock
---
apiVersion: kubeadm.k8s.io/v1beta3
kind: ClusterConfiguration
clusterName: $CLUSTER_NAME
kubernetesVersion: v$K8S_VERSION
controlPlaneEndpoint: "$(hostname -I | awk '{print $1}'):6443"
networking:
  serviceSubnet: "10.96.0.0/12"
  podSubnet: "192.168.0.0/16"
  dnsDomain: "cluster.local"
apiServer:
  extraArgs:
    audit-log-maxage: "30"
    audit-log-maxbackup: "3"
    audit-log-maxsize: "100"
    audit-log-path: "/var/log/audit.log"
    enable-admission-plugins: "NamespaceLifecycle,LimitRanger,ServiceAccount,DefaultStorageClass,DefaultTolerationSeconds,MutatingAdmissionWebhook,ValidatingAdmissionWebhook,ResourceQuota,NodeRestriction"
controllerManager:
  extraArgs:
    bind-address: "0.0.0.0"
scheduler:
  extraArgs:
    bind-address: "0.0.0.0"
etcd:
  local:
    dataDir: "/var/lib/etcd"
---
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
cgroupDriver: systemd
EOF
    
    # 初始化集群
    sudo kubeadm init --config=kubeadm-config.yaml --upload-certs
    
    # 配置kubectl
    mkdir -p $HOME/.kube
    sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
    sudo chown $(id -u):$(id -g) $HOME/.kube/config
}

# 4. 安装网络插件
install_network_plugin() {
    echo "安装网络插件: $NETWORK_PLUGIN"
    
    if [ "$NETWORK_PLUGIN" = "calico" ]; then
        # 安装Calico
        kubectl create -f https://raw.githubusercontent.com/projectcalico/calico/v3.26.0/manifests/tigera-operator.yaml
        
        # 创建Calico配置
        cat <<EOF | kubectl apply -f -
apiVersion: operator.tigera.io/v1
kind: Installation
metadata:
  name: default
spec:
  calicoNetwork:
    ipPools:
    - blockSize: 26
      cidr: 192.168.0.0/16
      encapsulation: VXLANCrossSubnet
      natOutgoing: Enabled
      nodeSelector: all()
---
apiVersion: operator.tigera.io/v1
kind: APIServer
metadata:
  name: default
spec: {}
EOF
    elif [ "$NETWORK_PLUGIN" = "flannel" ]; then
        # 安装Flannel
        kubectl apply -f https://raw.githubusercontent.com/flannel-io/flannel/master/Documentation/kube-flannel.yml
    fi
}

# 5. 部署存储类
deploy_storage_class() {
    echo "部署存储类..."
    
    # 部署local-path-provisioner
    kubectl apply -f https://raw.githubusercontent.com/rancher/local-path-provisioner/v0.0.24/deploy/local-path-storage.yaml
    
    # 设置为默认存储类
    kubectl patch storageclass local-path -p '{"metadata": {"annotations":{"storageclass.kubernetes.io/is-default-class":"true"}}}'
}

# 6. 部署Ingress控制器
deploy_ingress_controller() {
    echo "部署Ingress控制器..."
    
    # 部署NGINX Ingress Controller
    kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v1.8.1/deploy/static/provider/cloud/deploy.yaml
    
    # 等待Ingress Controller就绪
    kubectl wait --namespace ingress-nginx \
        --for=condition=ready pod \
        --selector=app.kubernetes.io/component=controller \
        --timeout=300s
}

# 7. 部署监控组件
deploy_monitoring() {
    echo "部署监控组件..."
    
    # 创建monitoring命名空间
    kubectl create namespace monitoring
    
    # 部署Prometheus Operator
    kubectl apply -f https://raw.githubusercontent.com/prometheus-operator/prometheus-operator/v0.68.0/bundle.yaml
    
    # 部署kube-state-metrics
    kubectl apply -f https://raw.githubusercontent.com/kubernetes/kube-state-metrics/v2.10.0/examples/standard/
    
    # 部署node-exporter
    cat <<EOF | kubectl apply -f -
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: node-exporter
  namespace: monitoring
  labels:
    app: node-exporter
spec:
  selector:
    matchLabels:
      app: node-exporter
  template:
    metadata:
      labels:
        app: node-exporter
    spec:
      hostNetwork: true
      hostPID: true
      containers:
      - name: node-exporter
        image: prom/node-exporter:v1.6.1
        args:
        - --path.procfs=/host/proc
        - --path.sysfs=/host/sys
        - --path.rootfs=/host/root
        - --collector.filesystem.mount-points-exclude=^/(sys|proc|dev|host|etc)($$|/)
        ports:
        - containerPort: 9100
          hostPort: 9100
          name: metrics
        volumeMounts:
        - name: proc
          mountPath: /host/proc
          readOnly: true
        - name: sys
          mountPath: /host/sys
          readOnly: true
        - name: root
          mountPath: /host/root
          readOnly: true
      volumes:
      - name: proc
        hostPath:
          path: /proc
      - name: sys
        hostPath:
          path: /sys
      - name: root
        hostPath:
          path: /
EOF
}

# 8. 配置RBAC和安全策略
configure_security() {
    echo "配置安全策略..."
    
    # 创建Pod安全策略
    cat <<EOF | kubectl apply -f -
apiVersion: policy/v1beta1
kind: PodSecurityPolicy
metadata:
  name: restricted
spec:
  privileged: false
  allowPrivilegeEscalation: false
  requiredDropCapabilities:
    - ALL
  volumes:
    - 'configMap'
    - 'emptyDir'
    - 'projected'
    - 'secret'
    - 'downwardAPI'
    - 'persistentVolumeClaim'
  runAsUser:
    rule: 'MustRunAsNonRoot'
  seLinux:
    rule: 'RunAsAny'
  fsGroup:
    rule: 'RunAsAny'
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: restricted-psp-user
rules:
- apiGroups: ['policy']
  resources: ['podsecuritypolicies']
  verbs: ['use']
  resourceNames:
  - restricted
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: restricted-psp-all-serviceaccounts
subjects:
- kind: Group
  name: system:serviceaccounts
  apiGroup: rbac.authorization.k8s.io
roleRef:
  kind: ClusterRole
  name: restricted-psp-user
  apiGroup: rbac.authorization.k8s.io
EOF
    
    # 配置网络策略
    cat <<EOF | kubectl apply -f -
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: default-deny-all
  namespace: default
spec:
  podSelector: {}
  policyTypes:
  - Ingress
  - Egress
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: allow-dns
  namespace: default
spec:
  podSelector: {}
  policyTypes:
  - Egress
  egress:
  - to: []
    ports:
    - protocol: UDP
      port: 53
    - protocol: TCP
      port: 53
EOF
}

# 主执行流程
main() {
    prepare_nodes
    install_kubernetes_tools
    init_control_plane
    install_network_plugin
    deploy_storage_class
    deploy_ingress_controller
    deploy_monitoring
    configure_security
    
    echo "=== Kubernetes集群部署完成 ==="
    echo "集群信息:"
    kubectl cluster-info
    echo ""
    echo "节点状态:"
    kubectl get nodes -o wide
    echo ""
    echo "系统Pod状态:"
    kubectl get pods -A
}

# 执行部署
main

Kubernetes资源管理器

#!/usr/bin/env python3
"""
Kubernetes资源管理器
提供集群资源的自动化管理和优化
"""

import yaml
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from kubernetes import client, config, watch
from kubernetes.client.rest import ApiException
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class ResourceQuota:
    """资源配额"""
    cpu_request: str
    cpu_limit: str
    memory_request: str
    memory_limit: str
    storage_request: str
    pod_count: int

@dataclass
class NamespaceConfig:
    """命名空间配置"""
    name: str
    labels: Dict[str, str]
    resource_quota: ResourceQuota
    network_policies: List[str]
    security_context: Dict[str, Any]

class KubernetesResourceManager:
    """Kubernetes资源管理器"""
    
    def __init__(self, kubeconfig_path: Optional[str] = None):
        if kubeconfig_path:
            config.load_kube_config(config_file=kubeconfig_path)
        else:
            try:
                config.load_incluster_config()
            except:
                config.load_kube_config()
        
        self.v1 = client.CoreV1Api()
        self.apps_v1 = client.AppsV1Api()
        self.networking_v1 = client.NetworkingV1Api()
        self.rbac_v1 = client.RbacAuthorizationV1Api()
        self.metrics_v1beta1 = client.CustomObjectsApi()
    
    def create_namespace_with_policies(self, namespace_config: NamespaceConfig) -> bool:
        """创建带有策略的命名空间"""
        try:
            # 创建命名空间
            namespace = client.V1Namespace(
                metadata=client.V1ObjectMeta(
                    name=namespace_config.name,
                    labels=namespace_config.labels
                )
            )
            self.v1.create_namespace(namespace)
            logger.info(f"命名空间 {namespace_config.name} 创建成功")
            
            # 创建资源配额
            self._create_resource_quota(namespace_config)
            
            # 创建网络策略
            self._create_network_policies(namespace_config)
            
            # 创建RBAC策略
            self._create_rbac_policies(namespace_config)
            
            return True
            
        except ApiException as e:
            logger.error(f"创建命名空间失败: {e}")
            return False
    
    def _create_resource_quota(self, namespace_config: NamespaceConfig):
        """创建资源配额"""
        quota = client.V1ResourceQuota(
            metadata=client.V1ObjectMeta(
                name=f"{namespace_config.name}-quota",
                namespace=namespace_config.name
            ),
            spec=client.V1ResourceQuotaSpec(
                hard={
                    "requests.cpu": namespace_config.resource_quota.cpu_request,
                    "requests.memory": namespace_config.resource_quota.memory_request,
                    "requests.storage": namespace_config.resource_quota.storage_request,
                    "limits.cpu": namespace_config.resource_quota.cpu_limit,
                    "limits.memory": namespace_config.resource_quota.memory_limit,
                    "pods": str(namespace_config.resource_quota.pod_count),
                    "persistentvolumeclaims": "10",
                    "services": "20",
                    "secrets": "10",
                    "configmaps": "10"
                }
            )
        )
        
        self.v1.create_namespaced_resource_quota(
            namespace=namespace_config.name,
            body=quota
        )
        logger.info(f"资源配额创建成功: {namespace_config.name}")
    
    def _create_network_policies(self, namespace_config: NamespaceConfig):
        """创建网络策略"""
        # 默认拒绝所有流量
        deny_all_policy = client.V1NetworkPolicy(
            metadata=client.V1ObjectMeta(
                name="deny-all",
                namespace=namespace_config.name
            ),
            spec=client.V1NetworkPolicySpec(
                pod_selector=client.V1LabelSelector(),
                policy_types=["Ingress", "Egress"]
            )
        )
        
        self.networking_v1.create_namespaced_network_policy(
            namespace=namespace_config.name,
            body=deny_all_policy
        )
        
        # 允许DNS查询
        allow_dns_policy = client.V1NetworkPolicy(
            metadata=client.V1ObjectMeta(
                name="allow-dns",
                namespace=namespace_config.name
            ),
            spec=client.V1NetworkPolicySpec(
                pod_selector=client.V1LabelSelector(),
                policy_types=["Egress"],
                egress=[
                    client.V1NetworkPolicyEgressRule(
                        ports=[
                            client.V1NetworkPolicyPort(protocol="UDP", port=53),
                            client.V1NetworkPolicyPort(protocol="TCP", port=53)
                        ]
                    )
                ]
            )
        )
        
        self.networking_v1.create_namespaced_network_policy(
            namespace=namespace_config.name,
            body=allow_dns_policy
        )
        
        logger.info(f"网络策略创建成功: {namespace_config.name}")
    
    def _create_rbac_policies(self, namespace_config: NamespaceConfig):
        """创建RBAC策略"""
        # 创建ServiceAccount
        service_account = client.V1ServiceAccount(
            metadata=client.V1ObjectMeta(
                name=f"{namespace_config.name}-sa",
                namespace=namespace_config.name
            )
        )
        
        self.v1.create_namespaced_service_account(
            namespace=namespace_config.name,
            body=service_account
        )
        
        # 创建Role
        role = client.V1Role(
            metadata=client.V1ObjectMeta(
                name=f"{namespace_config.name}-role",
                namespace=namespace_config.name
            ),
            rules=[
                client.V1PolicyRule(
                    api_groups=[""],
                    resources=["pods", "services", "configmaps", "secrets"],
                    verbs=["get", "list", "watch", "create", "update", "patch", "delete"]
                ),
                client.V1PolicyRule(
                    api_groups=["apps"],
                    resources=["deployments", "replicasets"],
                    verbs=["get", "list", "watch", "create", "update", "patch", "delete"]
                )
            ]
        )
        
        self.rbac_v1.create_namespaced_role(
            namespace=namespace_config.name,
            body=role
        )
        
        # 创建RoleBinding
        role_binding = client.V1RoleBinding(
            metadata=client.V1ObjectMeta(
                name=f"{namespace_config.name}-binding",
                namespace=namespace_config.name
            ),
            subjects=[
                client.V1Subject(
                    kind="ServiceAccount",
                    name=f"{namespace_config.name}-sa",
                    namespace=namespace_config.name
                )
            ],
            role_ref=client.V1RoleRef(
                kind="Role",
                name=f"{namespace_config.name}-role",
                api_group="rbac.authorization.k8s.io"
            )
        )
        
        self.rbac_v1.create_namespaced_role_binding(
            namespace=namespace_config.name,
            body=role_binding
        )
        
        logger.info(f"RBAC策略创建成功: {namespace_config.name}")
    
    def deploy_application(self, app_name: str, namespace: str, image: str, 
                          replicas: int = 3, resources: Optional[Dict] = None) -> bool:
        """部署应用"""
        try:
            # 默认资源配置
            if not resources:
                resources = {
                    "requests": {"cpu": "100m", "memory": "128Mi"},
                    "limits": {"cpu": "500m", "memory": "512Mi"}
                }
            
            # 创建Deployment
            deployment = client.V1Deployment(
                metadata=client.V1ObjectMeta(
                    name=app_name,
                    namespace=namespace,
                    labels={"app": app_name}
                ),
                spec=client.V1DeploymentSpec(
                    replicas=replicas,
                    selector=client.V1LabelSelector(
                        match_labels={"app": app_name}
                    ),
                    template=client.V1PodTemplateSpec(
                        metadata=client.V1ObjectMeta(
                            labels={"app": app_name}
                        ),
                        spec=client.V1PodSpec(
                            containers=[
                                client.V1Container(
                                    name=app_name,
                                    image=image,
                                    ports=[client.V1ContainerPort(container_port=8080)],
                                    resources=client.V1ResourceRequirements(
                                        requests=resources["requests"],
                                        limits=resources["limits"]
                                    ),
                                    liveness_probe=client.V1Probe(
                                        http_get=client.V1HTTPGetAction(
                                            path="/health",
                                            port=8080
                                        ),
                                        initial_delay_seconds=30,
                                        period_seconds=10
                                    ),
                                    readiness_probe=client.V1Probe(
                                        http_get=client.V1HTTPGetAction(
                                            path="/ready",
                                            port=8080
                                        ),
                                        initial_delay_seconds=5,
                                        period_seconds=5
                                    ),
                                    security_context=client.V1SecurityContext(
                                        run_as_non_root=True,
                                        run_as_user=1001,
                                        allow_privilege_escalation=False,
                                        read_only_root_filesystem=True,
                                        capabilities=client.V1Capabilities(drop=["ALL"])
                                    )
                                )
                            ],
                            security_context=client.V1PodSecurityContext(
                                run_as_non_root=True,
                                run_as_user=1001,
                                fs_group=1001
                            ),
                            service_account_name=f"{namespace}-sa"
                        )
                    )
                )
            )
            
            self.apps_v1.create_namespaced_deployment(
                namespace=namespace,
                body=deployment
            )
            
            # 创建Service
            service = client.V1Service(
                metadata=client.V1ObjectMeta(
                    name=app_name,
                    namespace=namespace,
                    labels={"app": app_name}
                ),
                spec=client.V1ServiceSpec(
                    selector={"app": app_name},
                    ports=[
                        client.V1ServicePort(
                            port=80,
                            target_port=8080,
                            protocol="TCP"
                        )
                    ],
                    type="ClusterIP"
                )
            )
            
            self.v1.create_namespaced_service(
                namespace=namespace,
                body=service
            )
            
            logger.info(f"应用 {app_name} 部署成功")
            return True
            
        except ApiException as e:
            logger.error(f"应用部署失败: {e}")
            return False
    
    def setup_horizontal_pod_autoscaler(self, app_name: str, namespace: str, 
                                       min_replicas: int = 2, max_replicas: int = 10,
                                       target_cpu_percent: int = 70) -> bool:
        """设置水平Pod自动扩缩容"""
        try:
            hpa = {
                "apiVersion": "autoscaling/v2",
                "kind": "HorizontalPodAutoscaler",
                "metadata": {
                    "name": f"{app_name}-hpa",
                    "namespace": namespace
                },
                "spec": {
                    "scaleTargetRef": {
                        "apiVersion": "apps/v1",
                        "kind": "Deployment",
                        "name": app_name
                    },
                    "minReplicas": min_replicas,
                    "maxReplicas": max_replicas,
                    "metrics": [
                        {
                            "type": "Resource",
                            "resource": {
                                "name": "cpu",
                                "target": {
                                    "type": "Utilization",
                                    "averageUtilization": target_cpu_percent
                                }
                            }
                        },
                        {
                            "type": "Resource",
                            "resource": {
                                "name": "memory",
                                "target": {
                                    "type": "Utilization",
                                    "averageUtilization": 80
                                }
                            }
                        }
                    ],
                    "behavior": {
                        "scaleUp": {
                            "stabilizationWindowSeconds": 60,
                            "policies": [
                                {
                                    "type": "Percent",
                                    "value": 100,
                                    "periodSeconds": 15
                                }
                            ]
                        },
                        "scaleDown": {
                            "stabilizationWindowSeconds": 300,
                            "policies": [
                                {
                                    "type": "Percent",
                                    "value": 10,
                                    "periodSeconds": 60
                                }
                            ]
                        }
                    }
                }
            }
            
            self.metrics_v1beta1.create_namespaced_custom_object(
                group="autoscaling",
                version="v2",
                namespace=namespace,
                plural="horizontalpodautoscalers",
                body=hpa
            )
            
            logger.info(f"HPA {app_name}-hpa 创建成功")
            return True
            
        except ApiException as e:
            logger.error(f"HPA创建失败: {e}")
            return False
    
    def monitor_cluster_resources(self) -> Dict[str, Any]:
        """监控集群资源使用情况"""
        try:
            # 获取节点信息
            nodes = self.v1.list_node()
            node_info = []
            
            for node in nodes.items:
                node_metrics = self._get_node_metrics(node.metadata.name)
                node_info.append({
                    "name": node.metadata.name,
                    "status": node.status.conditions[-1].type if node.status.conditions else "Unknown",
                    "cpu_capacity": node.status.capacity.get("cpu", "0"),
                    "memory_capacity": node.status.capacity.get("memory", "0"),
                    "cpu_usage": node_metrics.get("cpu", "0"),
                    "memory_usage": node_metrics.get("memory", "0"),
                    "pod_count": len([pod for pod in self.v1.list_pod_for_all_namespaces().items 
                                    if pod.spec.node_name == node.metadata.name])
                })
            
            # 获取命名空间资源使用情况
            namespaces = self.v1.list_namespace()
            namespace_info = []
            
            for ns in namespaces.items:
                pods = self.v1.list_namespaced_pod(ns.metadata.name)
                namespace_info.append({
                    "name": ns.metadata.name,
                    "pod_count": len(pods.items),
                    "status": ns.status.phase,
                    "age": (datetime.now() - ns.metadata.creation_timestamp.replace(tzinfo=None)).days
                })
            
            return {
                "timestamp": datetime.now().isoformat(),
                "nodes": node_info,
                "namespaces": namespace_info,
                "cluster_summary": {
                    "total_nodes": len(node_info),
                    "total_namespaces": len(namespace_info),
                    "total_pods": sum([ns["pod_count"] for ns in namespace_info])
                }
            }
            
        except ApiException as e:
            logger.error(f"监控集群资源失败: {e}")
            return {}
    
    def _get_node_metrics(self, node_name: str) -> Dict[str, str]:
        """获取节点指标"""
        try:
            # 这里应该调用metrics-server API
            # 简化处理,返回模拟数据
            return {
                "cpu": "50m",
                "memory": "1Gi"
            }
        except:
            return {"cpu": "0", "memory": "0"}

# 使用示例
def main():
    """主函数 - Kubernetes资源管理示例"""
    manager = KubernetesResourceManager()
    
    print("=== Kubernetes资源管理示例 ===")
    
    # 创建开发环境命名空间
    dev_config = NamespaceConfig(
        name="development",
        labels={"environment": "dev", "team": "backend"},
        resource_quota=ResourceQuota(
            cpu_request="2",
            cpu_limit="4",
            memory_request="4Gi",
            memory_limit="8Gi",
            storage_request="20Gi",
            pod_count=50
        ),
        network_policies=["deny-all", "allow-dns"],
        security_context={"runAsNonRoot": True}
    )
    
    # 创建命名空间
    if manager.create_namespace_with_policies(dev_config):
        print("开发环境命名空间创建成功")
        
        # 部署示例应用
        if manager.deploy_application(
            app_name="web-app",
            namespace="development",
            image="nginx:alpine",
            replicas=3
        ):
            print("应用部署成功")
            
            # 设置自动扩缩容
            if manager.setup_horizontal_pod_autoscaler(
                app_name="web-app",
                namespace="development"
            ):
                print("HPA设置成功")
    
    # 监控集群资源
    resources = manager.monitor_cluster_resources()
    if resources:
        print("\n集群资源监控:")
        print(json.dumps(resources, indent=2, ensure_ascii=False))

if __name__ == "__main__":
    main()

容器网络与存储架构

容器网络和存储是云原生应用的基础设施核心组件。

容器网络架构配置

#!/bin/bash
# 容器网络架构配置脚本

set -e

echo "=== 容器网络架构配置 ==="

# 1. 配置Calico网络策略
configure_calico_network() {
    echo "配置Calico网络策略..."
    
    # 创建全局网络策略
    cat <<EOF | kubectl apply -f -
apiVersion: projectcalico.org/v3
kind: GlobalNetworkPolicy
metadata:
  name: default-deny
spec:
  order: 1000
  selector: all()
  types:
  - Ingress
  - Egress
---
apiVersion: projectcalico.org/v3
kind: GlobalNetworkPolicy
metadata:
  name: allow-cluster-internal
spec:
  order: 500
  selector: all()
  types:
  - Ingress
  - Egress
  ingress:
  - action: Allow
    source:
      nets:
      - 10.0.0.0/8
      - 172.16.0.0/12
      - 192.168.0.0/16
  egress:
  - action: Allow
    destination:
      nets:
      - 10.0.0.0/8
      - 172.16.0.0/12
      - 192.168.0.0/16
---
apiVersion: projectcalico.org/v3
kind: GlobalNetworkPolicy
metadata:
  name: allow-dns
spec:
  order: 100
  selector: all()
  types:
  - Egress
  egress:
  - action: Allow
    protocol: UDP
    destination:
      ports:
      - 53
  - action: Allow
    protocol: TCP
    destination:
      ports:
      - 53
EOF
    
    # 配置IP池
    cat <<EOF | kubectl apply -f -
apiVersion: projectcalico.org/v3
kind: IPPool
metadata:
  name: default-ipv4-ippool
spec:
  cidr: 192.168.0.0/16
  ipipMode: CrossSubnet
  vxlanMode: Never
  natOutgoing: true
  nodeSelector: all()
---
apiVersion: projectcalico.org/v3
kind: IPPool
metadata:
  name: vxlan-ipv4-ippool
spec:
  cidr: 172.16.0.0/16
  ipipMode: Never
  vxlanMode: Always
  natOutgoing: true
  nodeSelector: environment == "production"
EOF
}

# 2. 配置多租户网络隔离
configure_multi_tenant_network() {
    echo "配置多租户网络隔离..."
    
    # 为不同环境创建网络策略
    for env in development staging production; do
        cat <<EOF | kubectl apply -f -
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: ${env}-isolation
  namespace: ${env}
spec:
  podSelector: {}
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - namespaceSelector:
        matchLabels:
          environment: ${env}
    - namespaceSelector:
        matchLabels:
          name: kube-system
    - namespaceSelector:
        matchLabels:
          name: monitoring
  egress:
  - to:
    - namespaceSelector:
        matchLabels:
          environment: ${env}
    - namespaceSelector:
        matchLabels:
          name: kube-system
  - to: []
    ports:
    - protocol: UDP
      port: 53
    - protocol: TCP
      port: 53
  - to: []
    ports:
    - protocol: TCP
      port: 443
    - protocol: TCP
      port: 80
EOF
    done
}

# 3. 配置服务网格网络
configure_service_mesh_network() {
    echo "配置服务网格网络..."
    
    # 安装Istio
    curl -L https://istio.io/downloadIstio | sh -
    cd istio-*
    export PATH=$PWD/bin:$PATH
    
    # 安装Istio控制平面
    istioctl install --set values.defaultRevision=default -y
    
    # 启用自动注入
    kubectl label namespace default istio-injection=enabled
    kubectl label namespace development istio-injection=enabled
    kubectl label namespace staging istio-injection=enabled
    kubectl label namespace production istio-injection=enabled
    
    # 配置Gateway
    cat <<EOF | kubectl apply -f -
apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
  name: main-gateway
  namespace: istio-system
spec:
  selector:
    istio: ingressgateway
  servers:
  - port:
      number: 80
      name: http
      protocol: HTTP
    hosts:
    - "*.example.com"
    tls:
      httpsRedirect: true
  - port:
      number: 443
      name: https
      protocol: HTTPS
    tls:
      mode: SIMPLE
      credentialName: example-com-tls
    hosts:
    - "*.example.com"
EOF
    
    # 配置流量策略
    cat <<EOF | kubectl apply -f -
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: circuit-breaker
  namespace: production
spec:
  host: web-app.production.svc.cluster.local
  trafficPolicy:
    connectionPool:
      tcp:
        maxConnections: 100
      http:
        http1MaxPendingRequests: 50
        maxRequestsPerConnection: 10
    circuitBreaker:
      consecutiveGatewayErrors: 5
      interval: 30s
      baseEjectionTime: 30s
      maxEjectionPercent: 50
    outlierDetection:
      consecutive5xxErrors: 3
      interval: 30s
      baseEjectionTime: 30s
      maxEjectionPercent: 50
EOF
}

# 4. 配置负载均衡
configure_load_balancing() {
    echo "配置负载均衡..."
    
    # 部署MetalLB
    kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/v0.13.7/config/manifests/metallb-native.yaml
    
    # 等待MetalLB就绪
    kubectl wait --namespace metallb-system \
        --for=condition=ready pod \
        --selector=app=metallb \
        --timeout=90s
    
    # 配置IP地址池
    cat <<EOF | kubectl apply -f -
apiVersion: metallb.io/v1beta1
kind: IPAddressPool
metadata:
  name: production-pool
  namespace: metallb-system
spec:
  addresses:
  - 192.168.1.100-192.168.1.200
---
apiVersion: metallb.io/v1beta1
kind: L2Advertisement
metadata:
  name: production-l2
  namespace: metallb-system
spec:
  ipAddressPools:
  - production-pool
EOF
}

# 主执行流程
main() {
    configure_calico_network
    configure_multi_tenant_network
    configure_service_mesh_network
    configure_load_balancing
    
    echo "=== 容器网络架构配置完成 ==="
}

# 执行配置
main

容器存储架构管理

#!/usr/bin/env python3
"""
容器存储架构管理器
提供持久化存储、备份恢复和存储优化功能
"""

import os
import json
import yaml
import time
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from kubernetes import client, config
from kubernetes.client.rest import ApiException
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class StorageConfig:
    """存储配置"""
    name: str
    storage_class: str
    size: str
    access_modes: List[str]
    mount_path: str
    backup_enabled: bool = True
    encryption_enabled: bool = False

@dataclass
class BackupPolicy:
    """备份策略"""
    name: str
    schedule: str  # Cron格式
    retention_days: int
    storage_location: str
    compression_enabled: bool = True

class ContainerStorageManager:
    """容器存储管理器"""
    
    def __init__(self, kubeconfig_path: Optional[str] = None):
        if kubeconfig_path:
            config.load_kube_config(config_file=kubeconfig_path)
        else:
            try:
                config.load_incluster_config()
            except:
                config.load_kube_config()
        
        self.v1 = client.CoreV1Api()
        self.storage_v1 = client.StorageV1Api()
        self.apps_v1 = client.AppsV1Api()
        self.batch_v1 = client.BatchV1Api()
        self.batch_v1beta1 = client.BatchV1beta1Api()
    
    def create_storage_classes(self) -> bool:
        """创建存储类"""
        try:
            # 高性能SSD存储类
            ssd_storage_class = client.V1StorageClass(
                metadata=client.V1ObjectMeta(
                    name="fast-ssd",
                    annotations={
                        "storageclass.kubernetes.io/is-default-class": "false"
                    }
                ),
                provisioner="kubernetes.io/aws-ebs",
                parameters={
                    "type": "gp3",
                    "iops": "3000",
                    "throughput": "125",
                    "encrypted": "true"
                },
                volume_binding_mode="WaitForFirstConsumer",
                allow_volume_expansion=True,
                reclaim_policy="Delete"
            )
            
            self.storage_v1.create_storage_class(ssd_storage_class)
            
            # 标准存储类
            standard_storage_class = client.V1StorageClass(
                metadata=client.V1ObjectMeta(
                    name="standard",
                    annotations={
                        "storageclass.kubernetes.io/is-default-class": "true"
                    }
                ),
                provisioner="kubernetes.io/aws-ebs",
                parameters={
                    "type": "gp2",
                    "encrypted": "true"
                },
                volume_binding_mode="WaitForFirstConsumer",
                allow_volume_expansion=True,
                reclaim_policy="Delete"
            )
            
            self.storage_v1.create_storage_class(standard_storage_class)
            
            # 归档存储类
            archive_storage_class = client.V1StorageClass(
                metadata=client.V1ObjectMeta(
                    name="archive",
                    annotations={
                        "storageclass.kubernetes.io/is-default-class": "false"
                    }
                ),
                provisioner="kubernetes.io/aws-ebs",
                parameters={
                    "type": "sc1",
                    "encrypted": "true"
                },
                volume_binding_mode="WaitForFirstConsumer",
                allow_volume_expansion=True,
                reclaim_policy="Retain"
            )
            
            self.storage_v1.create_storage_class(archive_storage_class)
            
            logger.info("存储类创建成功")
            return True
            
        except ApiException as e:
            logger.error(f"存储类创建失败: {e}")
            return False
    
    def create_persistent_volume_claim(self, storage_config: StorageConfig, 
                                     namespace: str) -> bool:
        """创建持久化卷声明"""
        try:
            pvc = client.V1PersistentVolumeClaim(
                metadata=client.V1ObjectMeta(
                    name=storage_config.name,
                    namespace=namespace,
                    labels={
                        "app": storage_config.name,
                        "backup-enabled": str(storage_config.backup_enabled).lower()
                    }
                ),
                spec=client.V1PersistentVolumeClaimSpec(
                    access_modes=storage_config.access_modes,
                    resources=client.V1ResourceRequirements(
                        requests={"storage": storage_config.size}
                    ),
                    storage_class_name=storage_config.storage_class
                )
            )
            
            self.v1.create_namespaced_persistent_volume_claim(
                namespace=namespace,
                body=pvc
            )
            
            logger.info(f"PVC {storage_config.name} 创建成功")
            return True
            
        except ApiException as e:
            logger.error(f"PVC创建失败: {e}")
            return False
    
    def deploy_stateful_application(self, app_name: str, namespace: str, 
                                  image: str, storage_configs: List[StorageConfig],
                                  replicas: int = 3) -> bool:
        """部署有状态应用"""
        try:
            # 创建PVC模板
            volume_claim_templates = []
            volume_mounts = []
            
            for storage_config in storage_configs:
                # PVC模板
                volume_claim_templates.append(
                    client.V1PersistentVolumeClaim(
                        metadata=client.V1ObjectMeta(
                            name=storage_config.name,
                            labels={
                                "app": app_name,
                                "backup-enabled": str(storage_config.backup_enabled).lower()
                            }
                        ),
                        spec=client.V1PersistentVolumeClaimSpec(
                            access_modes=storage_config.access_modes,
                            resources=client.V1ResourceRequirements(
                                requests={"storage": storage_config.size}
                            ),
                            storage_class_name=storage_config.storage_class
                        )
                    )
                )
                
                # 卷挂载
                volume_mounts.append(
                    client.V1VolumeMount(
                        name=storage_config.name,
                        mount_path=storage_config.mount_path
                    )
                )
            
            # 创建StatefulSet
            stateful_set = client.V1StatefulSet(
                metadata=client.V1ObjectMeta(
                    name=app_name,
                    namespace=namespace,
                    labels={"app": app_name}
                ),
                spec=client.V1StatefulSetSpec(
                    service_name=app_name,
                    replicas=replicas,
                    selector=client.V1LabelSelector(
                        match_labels={"app": app_name}
                    ),
                    template=client.V1PodTemplateSpec(
                        metadata=client.V1ObjectMeta(
                            labels={"app": app_name}
                        ),
                        spec=client.V1PodSpec(
                            containers=[
                                client.V1Container(
                                    name=app_name,
                                    image=image,
                                    ports=[client.V1ContainerPort(container_port=8080)],
                                    volume_mounts=volume_mounts,
                                    resources=client.V1ResourceRequirements(
                                        requests={"cpu": "500m", "memory": "1Gi"},
                                        limits={"cpu": "1", "memory": "2Gi"}
                                    ),
                                    liveness_probe=client.V1Probe(
                                        http_get=client.V1HTTPGetAction(
                                            path="/health",
                                            port=8080
                                        ),
                                        initial_delay_seconds=30,
                                        period_seconds=10
                                    ),
                                    readiness_probe=client.V1Probe(
                                        http_get=client.V1HTTPGetAction(
                                            path="/ready",
                                            port=8080
                                        ),
                                        initial_delay_seconds=5,
                                        period_seconds=5
                                    )
                                )
                            ]
                        )
                    ),
                    volume_claim_templates=volume_claim_templates,
                    pod_management_policy="Parallel",
                    update_strategy=client.V1StatefulSetUpdateStrategy(
                        type="RollingUpdate",
                        rolling_update=client.V1RollingUpdateStatefulSetStrategy(
                            partition=0
                        )
                    )
                )
            )
            
            self.apps_v1.create_namespaced_stateful_set(
                namespace=namespace,
                body=stateful_set
            )
            
            # 创建Headless Service
            service = client.V1Service(
                metadata=client.V1ObjectMeta(
                    name=app_name,
                    namespace=namespace,
                    labels={"app": app_name}
                ),
                spec=client.V1ServiceSpec(
                    cluster_ip="None",
                    selector={"app": app_name},
                    ports=[
                        client.V1ServicePort(
                            port=8080,
                            target_port=8080,
                            protocol="TCP"
                        )
                    ]
                )
            )
            
            self.v1.create_namespaced_service(
                namespace=namespace,
                body=service
            )
            
            logger.info(f"有状态应用 {app_name} 部署成功")
            return True
            
        except ApiException as e:
            logger.error(f"有状态应用部署失败: {e}")
            return False
    
    def setup_backup_system(self, backup_policy: BackupPolicy, 
                           namespace: str) -> bool:
        """设置备份系统"""
        try:
            # 创建备份脚本ConfigMap
            backup_script = f"""#!/bin/bash
set -e

BACKUP_NAME="backup-$(date +%Y%m%d-%H%M%S)"
BACKUP_PATH="{backup_policy.storage_location}/$BACKUP_NAME"

echo "开始备份: $BACKUP_NAME"

# 创建备份目录
mkdir -p $BACKUP_PATH

# 备份PVC数据
for pvc in $(kubectl get pvc -n {namespace} -l backup-enabled=true -o jsonpath='{{.items[*].metadata.name}}'); do
    echo "备份PVC: $pvc"
    
    # 创建临时Pod进行备份
    kubectl run backup-pod-$pvc \\
        --image=alpine:latest \\
        --restart=Never \\
        --rm -i \\
        --overrides='{{
            "spec": {{
                "containers": [{{
                    "name": "backup",
                    "image": "alpine:latest",
                    "command": ["tar", "czf", "/backup/$pvc.tar.gz", "-C", "/data", "."],
                    "volumeMounts": [
                        {{"name": "data", "mountPath": "/data"}},
                        {{"name": "backup", "mountPath": "/backup"}}
                    ]
                }}],
                "volumes": [
                    {{"name": "data", "persistentVolumeClaim": {{"claimName": "$pvc"}}}},
                    {{"name": "backup", "hostPath": {{"path": "$BACKUP_PATH"}}}}
                ]
            }}
        }}' \\
        --timeout=300s
done

# 清理过期备份
find {backup_policy.storage_location} -name "backup-*" -type d -mtime +{backup_policy.retention_days} -exec rm -rf {{}} \\;

echo "备份完成: $BACKUP_NAME"
"""
            
            config_map = client.V1ConfigMap(
                metadata=client.V1ObjectMeta(
                    name=f"{backup_policy.name}-script",
                    namespace=namespace
                ),
                data={"backup.sh": backup_script}
            )
            
            self.v1.create_namespaced_config_map(
                namespace=namespace,
                body=config_map
            )
            
            # 创建CronJob
            cron_job = client.V1CronJob(
                metadata=client.V1ObjectMeta(
                    name=backup_policy.name,
                    namespace=namespace
                ),
                spec=client.V1CronJobSpec(
                    schedule=backup_policy.schedule,
                    job_template=client.V1JobTemplateSpec(
                        spec=client.V1JobSpec(
                            template=client.V1PodTemplateSpec(
                                spec=client.V1PodSpec(
                                    containers=[
                                        client.V1Container(
                                            name="backup",
                                            image="bitnami/kubectl:latest",
                                            command=["/bin/bash", "/scripts/backup.sh"],
                                            volume_mounts=[
                                                client.V1VolumeMount(
                                                    name="backup-script",
                                                    mount_path="/scripts"
                                                ),
                                                client.V1VolumeMount(
                                                    name="backup-storage",
                                                    mount_path=backup_policy.storage_location
                                                )
                                            ]
                                        )
                                    ],
                                    volumes=[
                                        client.V1Volume(
                                            name="backup-script",
                                            config_map=client.V1ConfigMapVolumeSource(
                                                name=f"{backup_policy.name}-script",
                                                default_mode=0o755
                                            )
                                        ),
                                        client.V1Volume(
                                            name="backup-storage",
                                            host_path=client.V1HostPathVolumeSource(
                                                path=backup_policy.storage_location
                                            )
                                        )
                                    ],
                                    restart_policy="OnFailure"
                                )
                            )
                        )
                    ),
                    successful_jobs_history_limit=3,
                    failed_jobs_history_limit=1
                )
            )
            
            self.batch_v1beta1.create_namespaced_cron_job(
                namespace=namespace,
                body=cron_job
            )
            
            logger.info(f"备份系统 {backup_policy.name} 设置成功")
            return True
            
        except ApiException as e:
            logger.error(f"备份系统设置失败: {e}")
            return False
    
    def monitor_storage_usage(self) -> Dict[str, Any]:
        """监控存储使用情况"""
        try:
            # 获取所有PVC
            pvcs = self.v1.list_persistent_volume_claim_for_all_namespaces()
            pvc_info = []
            
            for pvc in pvcs.items:
                pvc_info.append({
                    "name": pvc.metadata.name,
                    "namespace": pvc.metadata.namespace,
                    "storage_class": pvc.spec.storage_class_name,
                    "size": pvc.spec.resources.requests.get("storage", "Unknown"),
                    "status": pvc.status.phase,
                    "access_modes": pvc.spec.access_modes,
                    "volume_name": pvc.spec.volume_name,
                    "age": (datetime.now() - pvc.metadata.creation_timestamp.replace(tzinfo=None)).days
                })
            
            # 获取所有PV
            pvs = self.v1.list_persistent_volume()
            pv_info = []
            
            for pv in pvs.items:
                pv_info.append({
                    "name": pv.metadata.name,
                    "capacity": pv.spec.capacity.get("storage", "Unknown"),
                    "access_modes": pv.spec.access_modes,
                    "reclaim_policy": pv.spec.persistent_volume_reclaim_policy,
                    "status": pv.status.phase,
                    "storage_class": pv.spec.storage_class_name,
                    "claim": f"{pv.spec.claim_ref.namespace}/{pv.spec.claim_ref.name}" if pv.spec.claim_ref else None
                })
            
            # 获取存储类
            storage_classes = self.storage_v1.list_storage_class()
            sc_info = []
            
            for sc in storage_classes.items:
                sc_info.append({
                    "name": sc.metadata.name,
                    "provisioner": sc.provisioner,
                    "reclaim_policy": sc.reclaim_policy,
                    "volume_binding_mode": sc.volume_binding_mode,
                    "allow_volume_expansion": sc.allow_volume_expansion,
                    "is_default": sc.metadata.annotations.get(
                        "storageclass.kubernetes.io/is-default-class", "false") == "true"
                })
            
            return {
                "timestamp": datetime.now().isoformat(),
                "persistent_volume_claims": pvc_info,
                "persistent_volumes": pv_info,
                "storage_classes": sc_info,
                "summary": {
                    "total_pvcs": len(pvc_info),
                    "total_pvs": len(pv_info),
                    "total_storage_classes": len(sc_info),
                    "bound_pvcs": len([pvc for pvc in pvc_info if pvc["status"] == "Bound"]),
                    "available_pvs": len([pv for pv in pv_info if pv["status"] == "Available"])
                }
            }
            
        except ApiException as e:
            logger.error(f"监控存储使用情况失败: {e}")
            return {}

# 使用示例
def main():
    """主函数 - 容器存储管理示例"""
    manager = ContainerStorageManager()
    
    print("=== 容器存储管理示例 ===")
    
    # 创建存储类
    if manager.create_storage_classes():
        print("存储类创建成功")
    
    # 配置存储
    storage_configs = [
        StorageConfig(
            name="app-data",
            storage_class="fast-ssd",
            size="10Gi",
            access_modes=["ReadWriteOnce"],
            mount_path="/app/data",
            backup_enabled=True
        ),
        StorageConfig(
            name="app-logs",
            storage_class="standard",
            size="5Gi",
            access_modes=["ReadWriteOnce"],
            mount_path="/app/logs",
            backup_enabled=False
        )
    ]
    
    # 部署有状态应用
    if manager.deploy_stateful_application(
        app_name="database",
        namespace="production",
        image="postgres:13",
        storage_configs=storage_configs,
        replicas=3
    ):
        print("有状态应用部署成功")
    
    # 设置备份策略
    backup_policy = BackupPolicy(
        name="daily-backup",
        schedule="0 2 * * *",  # 每天凌晨2点
        retention_days=30,
        storage_location="/backup",
        compression_enabled=True
    )
    
    if manager.setup_backup_system(backup_policy, "production"):
        print("备份系统设置成功")
    
    # 监控存储使用情况
    storage_info = manager.monitor_storage_usage()
    if storage_info:
        print("\n存储使用情况:")
        print(json.dumps(storage_info, indent=2, ensure_ascii=False))

if __name__ == "__main__":
    main()

监控与可观测性

云原生容器化应用需要全面的监控和可观测性解决方案。

容器监控系统

#!/usr/bin/env python3
"""
容器监控系统
提供全面的容器和应用监控能力
"""

import json
import time
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from prometheus_client import CollectorRegistry, Gauge, Counter, Histogram, generate_latest
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class MetricConfig:
    """指标配置"""
    name: str
    metric_type: str  # gauge, counter, histogram
    description: str
    labels: List[str]
    thresholds: Dict[str, float]

@dataclass
class AlertRule:
    """告警规则"""
    name: str
    expression: str
    duration: str
    severity: str
    description: str
    runbook_url: Optional[str] = None

class ContainerMonitoringSystem:
    """容器监控系统"""
    
    def __init__(self):
        self.registry = CollectorRegistry()
        self.metrics = {}
        self.alert_rules = []
        self.prometheus_url = "http://prometheus:9090"
        self.grafana_url = "http://grafana:3000"
        self.alertmanager_url = "http://alertmanager:9093"
        
        # 初始化基础指标
        self._initialize_metrics()
    
    def _initialize_metrics(self):
        """初始化基础指标"""
        # 容器资源使用指标
        self.metrics['container_cpu_usage'] = Gauge(
            'container_cpu_usage_percent',
            'Container CPU usage percentage',
            ['container_name', 'namespace', 'pod_name'],
            registry=self.registry
        )
        
        self.metrics['container_memory_usage'] = Gauge(
            'container_memory_usage_bytes',
            'Container memory usage in bytes',
            ['container_name', 'namespace', 'pod_name'],
            registry=self.registry
        )
        
        self.metrics['container_network_rx'] = Counter(
            'container_network_receive_bytes_total',
            'Container network receive bytes',
            ['container_name', 'namespace', 'pod_name'],
            registry=self.registry
        )
        
        self.metrics['container_network_tx'] = Counter(
            'container_network_transmit_bytes_total',
            'Container network transmit bytes',
            ['container_name', 'namespace', 'pod_name'],
            registry=self.registry
        )
        
        # 应用性能指标
        self.metrics['http_requests_total'] = Counter(
            'http_requests_total',
            'Total HTTP requests',
            ['method', 'endpoint', 'status_code'],
            registry=self.registry
        )
        
        self.metrics['http_request_duration'] = Histogram(
            'http_request_duration_seconds',
            'HTTP request duration',
            ['method', 'endpoint'],
            registry=self.registry
        )
        
        # 业务指标
        self.metrics['active_users'] = Gauge(
            'active_users_total',
            'Number of active users',
            ['service'],
            registry=self.registry
        )
        
        self.metrics['database_connections'] = Gauge(
            'database_connections_active',
            'Active database connections',
            ['database', 'service'],
            registry=self.registry
        )
    
    def collect_container_metrics(self, container_stats: Dict[str, Any]):
        """收集容器指标"""
        try:
            container_name = container_stats.get('name', 'unknown')
            namespace = container_stats.get('namespace', 'default')
            pod_name = container_stats.get('pod_name', 'unknown')
            
            # CPU使用率
            cpu_usage = container_stats.get('cpu_usage_percent', 0)
            self.metrics['container_cpu_usage'].labels(
                container_name=container_name,
                namespace=namespace,
                pod_name=pod_name
            ).set(cpu_usage)
            
            # 内存使用量
            memory_usage = container_stats.get('memory_usage_bytes', 0)
            self.metrics['container_memory_usage'].labels(
                container_name=container_name,
                namespace=namespace,
                pod_name=pod_name
            ).set(memory_usage)
            
            # 网络流量
            network_rx = container_stats.get('network_rx_bytes', 0)
            network_tx = container_stats.get('network_tx_bytes', 0)
            
            self.metrics['container_network_rx'].labels(
                container_name=container_name,
                namespace=namespace,
                pod_name=pod_name
            )._value._value = network_rx
            
            self.metrics['container_network_tx'].labels(
                container_name=container_name,
                namespace=namespace,
                pod_name=pod_name
            )._value._value = network_tx
            
            logger.info(f"容器指标收集完成: {container_name}")
            
        except Exception as e:
            logger.error(f"收集容器指标失败: {e}")
    
    def record_http_request(self, method: str, endpoint: str, 
                           status_code: int, duration: float):
        """记录HTTP请求指标"""
        try:
            # 请求计数
            self.metrics['http_requests_total'].labels(
                method=method,
                endpoint=endpoint,
                status_code=str(status_code)
            ).inc()
            
            # 请求耗时
            self.metrics['http_request_duration'].labels(
                method=method,
                endpoint=endpoint
            ).observe(duration)
            
        except Exception as e:
            logger.error(f"记录HTTP请求指标失败: {e}")
    
    def update_business_metrics(self, metrics_data: Dict[str, Any]):
        """更新业务指标"""
        try:
            # 活跃用户数
            if 'active_users' in metrics_data:
                for service, count in metrics_data['active_users'].items():
                    self.metrics['active_users'].labels(service=service).set(count)
            
            # 数据库连接数
            if 'database_connections' in metrics_data:
                for db_info in metrics_data['database_connections']:
                    self.metrics['database_connections'].labels(
                        database=db_info['database'],
                        service=db_info['service']
                    ).set(db_info['connections'])
            
        except Exception as e:
            logger.error(f"更新业务指标失败: {e}")
    
    def setup_alert_rules(self):
        """设置告警规则"""
        self.alert_rules = [
            AlertRule(
                name="HighCPUUsage",
                expression="container_cpu_usage_percent > 80",
                duration="5m",
                severity="warning",
                description="Container CPU usage is above 80%"
            ),
            AlertRule(
                name="HighMemoryUsage",
                expression="container_memory_usage_bytes / container_memory_limit_bytes > 0.9",
                duration="5m",
                severity="critical",
                description="Container memory usage is above 90%"
            ),
            AlertRule(
                name="HighErrorRate",
                expression="rate(http_requests_total{status_code=~'5..'}[5m]) / rate(http_requests_total[5m]) > 0.1",
                duration="2m",
                severity="critical",
                description="HTTP error rate is above 10%"
            ),
            AlertRule(
                name="HighResponseTime",
                expression="histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 1",
                duration="5m",
                severity="warning",
                description="95th percentile response time is above 1 second"
            ),
            AlertRule(
                name="PodCrashLooping",
                expression="rate(kube_pod_container_status_restarts_total[15m]) > 0",
                duration="5m",
                severity="critical",
                description="Pod is crash looping"
            )
        ]
        
        # 生成Prometheus告警规则配置
        alert_config = {
            "groups": [
                {
                    "name": "container.rules",
                    "rules": [
                        {
                            "alert": rule.name,
                            "expr": rule.expression,
                            "for": rule.duration,
                            "labels": {"severity": rule.severity},
                            "annotations": {
                                "description": rule.description,
                                "runbook_url": rule.runbook_url or ""
                            }
                        }
                        for rule in self.alert_rules
                    ]
                }
            ]
        }
        
        return alert_config
    
    def create_grafana_dashboard(self) -> Dict[str, Any]:
        """创建Grafana仪表板"""
        dashboard = {
            "dashboard": {
                "id": None,
                "title": "Container Monitoring Dashboard",
                "tags": ["containers", "monitoring"],
                "timezone": "browser",
                "panels": [
                    {
                        "id": 1,
                        "title": "Container CPU Usage",
                        "type": "graph",
                        "targets": [
                            {
                                "expr": "container_cpu_usage_percent",
                                "legendFormat": "{{container_name}} - {{namespace}}"
                            }
                        ],
                        "yAxes": [
                            {"label": "Percentage", "max": 100, "min": 0}
                        ],
                        "gridPos": {"h": 8, "w": 12, "x": 0, "y": 0}
                    },
                    {
                        "id": 2,
                        "title": "Container Memory Usage",
                        "type": "graph",
                        "targets": [
                            {
                                "expr": "container_memory_usage_bytes / 1024 / 1024",
                                "legendFormat": "{{container_name}} - {{namespace}}"
                            }
                        ],
                        "yAxes": [
                            {"label": "MB", "min": 0}
                        ],
                        "gridPos": {"h": 8, "w": 12, "x": 12, "y": 0}
                    },
                    {
                        "id": 3,
                        "title": "HTTP Request Rate",
                        "type": "graph",
                        "targets": [
                            {
                                "expr": "rate(http_requests_total[5m])",
                                "legendFormat": "{{method}} {{endpoint}}"
                            }
                        ],
                        "yAxes": [
                            {"label": "Requests/sec", "min": 0}
                        ],
                        "gridPos": {"h": 8, "w": 12, "x": 0, "y": 8}
                    },
                    {
                        "id": 4,
                        "title": "HTTP Response Time",
                        "type": "graph",
                        "targets": [
                            {
                                "expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))",
                                "legendFormat": "95th percentile"
                            },
                            {
                                "expr": "histogram_quantile(0.50, rate(http_request_duration_seconds_bucket[5m]))",
                                "legendFormat": "50th percentile"
                            }
                        ],
                        "yAxes": [
                            {"label": "Seconds", "min": 0}
                        ],
                        "gridPos": {"h": 8, "w": 12, "x": 12, "y": 8}
                    },
                    {
                        "id": 5,
                        "title": "Active Users",
                        "type": "singlestat",
                        "targets": [
                            {
                                "expr": "sum(active_users_total)",
                                "legendFormat": "Total Active Users"
                            }
                        ],
                        "gridPos": {"h": 4, "w": 6, "x": 0, "y": 16}
                    },
                    {
                        "id": 6,
                        "title": "Database Connections",
                        "type": "graph",
                        "targets": [
                            {
                                "expr": "database_connections_active",
                                "legendFormat": "{{database}} - {{service}}"
                            }
                        ],
                        "yAxes": [
                            {"label": "Connections", "min": 0}
                        ],
                        "gridPos": {"h": 8, "w": 18, "x": 6, "y": 16}
                    }
                ],
                "time": {"from": "now-1h", "to": "now"},
                "refresh": "30s"
            }
        }
        
        return dashboard
    
    def send_alert(self, alert_name: str, message: str, severity: str):
        """发送告警"""
        try:
            alert_data = {
                "receiver": "webhook",
                "status": "firing",
                "alerts": [
                    {
                        "status": "firing",
                        "labels": {
                            "alertname": alert_name,
                            "severity": severity,
                            "instance": "container-monitoring"
                        },
                        "annotations": {
                            "description": message,
                            "timestamp": datetime.now().isoformat()
                        },
                        "startsAt": datetime.now().isoformat(),
                        "generatorURL": f"{self.prometheus_url}/alerts"
                    }
                ]
            }
            
            # 发送到Alertmanager
            response = requests.post(
                f"{self.alertmanager_url}/api/v1/alerts",
                json=alert_data,
                timeout=10
            )
            
            if response.status_code == 200:
                logger.info(f"告警发送成功: {alert_name}")
            else:
                logger.error(f"告警发送失败: {response.status_code}")
                
        except Exception as e:
            logger.error(f"发送告警失败: {e}")
    
    def get_metrics_export(self) -> str:
        """导出Prometheus格式指标"""
        return generate_latest(self.registry).decode('utf-8')
    
    def health_check(self) -> Dict[str, Any]:
        """健康检查"""
        try:
            # 检查Prometheus连接
            prometheus_healthy = False
            try:
                response = requests.get(f"{self.prometheus_url}/-/healthy", timeout=5)
                prometheus_healthy = response.status_code == 200
            except:
                pass
            
            # 检查Grafana连接
            grafana_healthy = False
            try:
                response = requests.get(f"{self.grafana_url}/api/health", timeout=5)
                grafana_healthy = response.status_code == 200
            except:
                pass
            
            # 检查Alertmanager连接
            alertmanager_healthy = False
            try:
                response = requests.get(f"{self.alertmanager_url}/-/healthy", timeout=5)
                alertmanager_healthy = response.status_code == 200
            except:
                pass
            
            return {
                "timestamp": datetime.now().isoformat(),
                "status": "healthy" if all([prometheus_healthy, grafana_healthy, alertmanager_healthy]) else "unhealthy",
                "components": {
                    "prometheus": "healthy" if prometheus_healthy else "unhealthy",
                    "grafana": "healthy" if grafana_healthy else "unhealthy",
                    "alertmanager": "healthy" if alertmanager_healthy else "unhealthy"
                },
                "metrics_count": len(self.metrics),
                "alert_rules_count": len(self.alert_rules)
            }
            
        except Exception as e:
            logger.error(f"健康检查失败: {e}")
            return {
                "timestamp": datetime.now().isoformat(),
                "status": "error",
                "error": str(e)
            }

# 使用示例
def main():
    """主函数 - 容器监控系统示例"""
    monitoring = ContainerMonitoringSystem()
    
    print("=== 容器监控系统示例 ===")
    
    # 设置告警规则
    alert_config = monitoring.setup_alert_rules()
    print("告警规则配置:")
    print(json.dumps(alert_config, indent=2, ensure_ascii=False))
    
    # 模拟收集容器指标
    container_stats = {
        "name": "web-app",
        "namespace": "production",
        "pod_name": "web-app-7d4b8c9f8-abc123",
        "cpu_usage_percent": 75.5,
        "memory_usage_bytes": 1073741824,  # 1GB
        "network_rx_bytes": 1048576,  # 1MB
        "network_tx_bytes": 2097152   # 2MB
    }
    
    monitoring.collect_container_metrics(container_stats)
    
    # 模拟HTTP请求记录
    monitoring.record_http_request("GET", "/api/users", 200, 0.15)
    monitoring.record_http_request("POST", "/api/orders", 201, 0.25)
    monitoring.record_http_request("GET", "/api/products", 500, 1.5)
    
    # 更新业务指标
    business_metrics = {
        "active_users": {
            "web-service": 1250,
            "api-service": 850
        },
        "database_connections": [
            {"database": "postgres", "service": "web-service", "connections": 25},
            {"database": "redis", "service": "cache-service", "connections": 10}
        ]
    }
    
    monitoring.update_business_metrics(business_metrics)
    
    # 创建Grafana仪表板
    dashboard = monitoring.create_grafana_dashboard()
    print("\nGrafana仪表板配置:")
    print(json.dumps(dashboard, indent=2, ensure_ascii=False))
    
    # 健康检查
    health = monitoring.health_check()
    print("\n监控系统健康状态:")
    print(json.dumps(health, indent=2, ensure_ascii=False))
    
    # 导出指标
    print("\n导出的Prometheus指标:")
    print(monitoring.get_metrics_export())

if __name__ == "__main__":
    main()

总结

云原生应用的容器化架构策略是现代应用开发和部署的核心。通过本文的深入分析,我们可以得出以下关键结论:

核心架构要素

  1. 容器化设计模式

    • Sidecar模式:提供辅助功能如日志收集、监控代理
    • Ambassador模式:简化外部服务访问
    • Adapter模式:标准化数据格式和接口
    • Init Container模式:确保应用启动前的依赖就绪
  2. Kubernetes编排平台

    • 集群架构:高可用控制平面和工作节点
    • 资源管理:命名空间隔离、资源配额、RBAC安全
    • 应用部署:Deployment、StatefulSet、Service等资源
    • 自动扩缩容:HPA基于CPU/内存使用率动态调整
  3. 网络与存储架构

    • 网络策略:Calico全局策略、多租户隔离、服务网格
    • 存储管理:多层存储类、持久化卷、备份恢复
    • 负载均衡:MetalLB、Ingress控制器、流量分发
  4. 监控与可观测性

    • 指标收集:Prometheus格式指标、容器资源监控
    • 可视化:Grafana仪表板、实时监控图表
    • 告警系统:基于阈值的智能告警、多渠道通知

最佳实践原则

  1. 设计原则

    • 单一职责:每个容器专注单一功能
    • 无状态设计:应用状态外部化存储
    • 配置分离:使用ConfigMap和Secret管理配置
    • 健康检查:实现liveness和readiness探针
  2. 运维管理

    • 基础设施即代码:使用Terraform、Helm等工具
    • CI/CD集成:自动化构建、测试、部署流程
    • 安全加固:最小权限原则、镜像安全扫描
    • 成本优化:资源右调、自动扩缩容、存储分层
  3. 团队协作

    • DevOps文化:开发运维一体化
    • 标准化流程:统一的容器构建和部署规范
    • 知识共享:文档化最佳实践和故障处理手册
    • 持续改进:定期回顾和优化架构设计

通过采用这些容器化架构策略,组织能够构建出高度可扩展、可靠且易于维护的云原生应用平台,为数字化转型提供坚实的技术基础。


发布时间:2024年1月15日
标签:云原生, 容器化, Kubernetes, Docker, 微服务架构

容器设计模式与最佳实践

容器设计模式是构建高效、安全、可维护容器应用的关键。

多阶段构建模式

# 多阶段构建示例 - Python应用
# 第一阶段:构建环境
FROM python:3.11-slim AS builder

# 安装构建依赖
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

# 复制依赖文件
COPY requirements.txt .

# 安装Python依赖
RUN pip install --user --no-cache-dir -r requirements.txt

# 复制源代码
COPY . .

# 运行测试
RUN python -m pytest tests/

# 第二阶段:运行环境
FROM python:3.11-slim AS runtime

# 创建非特权用户
RUN groupadd -r appuser && useradd -r -g appuser appuser

# 创建应用目录
WORKDIR /app

# 从构建阶段复制依赖
COPY --from=builder /root/.local /home/appuser/.local

# 复制应用代码
COPY --from=builder /app .

# 设置PATH
ENV PATH=/home/appuser/.local/bin:$PATH

# 设置权限
RUN chown -R appuser:appuser /app
USER appuser

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

容器设计模式实现

#!/usr/bin/env python3
"""
容器设计模式实现
包括Sidecar、Ambassador、Adapter等模式
"""

import yaml
import json
from typing import Dict, List, Any
from dataclasses import dataclass

@dataclass
class ContainerSpec:
    """容器规格"""
    name: str
    image: str
    ports: List[int]
    env_vars: Dict[str, str]
    volume_mounts: List[Dict[str, str]]
    resources: Dict[str, Any]

class ContainerPatternGenerator:
    """容器模式生成器"""
    
    def __init__(self):
        self.patterns = {}
    
    def generate_sidecar_pattern(self, main_container: ContainerSpec, sidecar_type: str) -> Dict[str, Any]:
        """生成Sidecar模式"""
        if sidecar_type == "logging":
            sidecar = ContainerSpec(
                name="log-collector",
                image="fluent/fluent-bit:latest",
                ports=[],
                env_vars={
                    "FLUENT_CONF": "fluent-bit.conf",
                    "FLUENT_OPT": ""
                },
                volume_mounts=[
                    {"name": "log-volume", "mountPath": "/var/log"},
                    {"name": "fluent-bit-config", "mountPath": "/fluent-bit/etc"}
                ],
                resources={
                    "requests": {"cpu": "50m", "memory": "64Mi"},
                    "limits": {"cpu": "100m", "memory": "128Mi"}
                }
            )
        elif sidecar_type == "monitoring":
            sidecar = ContainerSpec(
                name="metrics-exporter",
                image="prom/node-exporter:latest",
                ports=[9100],
                env_vars={},
                volume_mounts=[
                    {"name": "proc", "mountPath": "/host/proc", "readOnly": True},
                    {"name": "sys", "mountPath": "/host/sys", "readOnly": True}
                ],
                resources={
                    "requests": {"cpu": "50m", "memory": "32Mi"},
                    "limits": {"cpu": "100m", "memory": "64Mi"}
                }
            )
        elif sidecar_type == "proxy":
            sidecar = ContainerSpec(
                name="envoy-proxy",
                image="envoyproxy/envoy:v1.24-latest",
                ports=[8080, 9901],
                env_vars={
                    "ENVOY_UID": "0"
                },
                volume_mounts=[
                    {"name": "envoy-config", "mountPath": "/etc/envoy"}
                ],
                resources={
                    "requests": {"cpu": "100m", "memory": "128Mi"},
                    "limits": {"cpu": "200m", "memory": "256Mi"}
                }
            )
        else:
            raise ValueError(f"Unsupported sidecar type: {sidecar_type}")
        
        return self._create_pod_spec([main_container, sidecar], sidecar_type)
    
    def generate_ambassador_pattern(self, main_container: ContainerSpec, external_service: str) -> Dict[str, Any]:
        """生成Ambassador模式"""
        ambassador = ContainerSpec(
            name="ambassador",
            image="nginx:alpine",
            ports=[8080],
            env_vars={
                "EXTERNAL_SERVICE": external_service
            },
            volume_mounts=[
                {"name": "nginx-config", "mountPath": "/etc/nginx/conf.d"}
            ],
            resources={
                "requests": {"cpu": "50m", "memory": "32Mi"},
                "limits": {"cpu": "100m", "memory": "64Mi"}
            }
        )
        
        return self._create_pod_spec([main_container, ambassador], "ambassador")
    
    def generate_adapter_pattern(self, main_container: ContainerSpec, adapter_type: str) -> Dict[str, Any]:
        """生成Adapter模式"""
        if adapter_type == "metrics":
            adapter = ContainerSpec(
                name="metrics-adapter",
                image="prom/prometheus:latest",
                ports=[9090],
                env_vars={},
                volume_mounts=[
                    {"name": "prometheus-config", "mountPath": "/etc/prometheus"},
                    {"name": "prometheus-data", "mountPath": "/prometheus"}
                ],
                resources={
                    "requests": {"cpu": "100m", "memory": "256Mi"},
                    "limits": {"cpu": "500m", "memory": "1Gi"}
                }
            )
        elif adapter_type == "logging":
            adapter = ContainerSpec(
                name="log-adapter",
                image="grafana/loki:latest",
                ports=[3100],
                env_vars={},
                volume_mounts=[
                    {"name": "loki-config", "mountPath": "/etc/loki"},
                    {"name": "loki-data", "mountPath": "/loki"}
                ],
                resources={
                    "requests": {"cpu": "100m", "memory": "128Mi"},
                    "limits": {"cpu": "200m", "memory": "512Mi"}
                }
            )
        else:
            raise ValueError(f"Unsupported adapter type: {adapter_type}")
        
        return self._create_pod_spec([main_container, adapter], "adapter")
    
    def _create_pod_spec(self, containers: List[ContainerSpec], pattern_type: str) -> Dict[str, Any]:
        """创建Pod规格"""
        pod_spec = {
            "apiVersion": "v1",
            "kind": "Pod",
            "metadata": {
                "name": f"app-{pattern_type}",
                "labels": {
                    "app": "sample-app",
                    "pattern": pattern_type
                },
                "annotations": {
                    "pattern.kubernetes.io/type": pattern_type,
                    "pattern.kubernetes.io/description": f"Container pattern: {pattern_type}"
                }
            },
            "spec": {
                "containers": [],
                "volumes": [],
                "securityContext": {
                    "runAsNonRoot": True,
                    "runAsUser": 1001,
                    "fsGroup": 1001
                },
                "restartPolicy": "Always"
            }
        }
        
        # 添加容器
        for container in containers:
            container_spec = {
                "name": container.name,
                "image": container.image,
                "resources": {
                    "requests": container.resources.get("requests", {}),
                    "limits": container.resources.get("limits", {})
                },
                "securityContext": {
                    "allowPrivilegeEscalation": False,
                    "readOnlyRootFilesystem": True,
                    "capabilities": {
                        "drop": ["ALL"]
                    }
                }
            }
            
            # 添加端口
            if container.ports:
                container_spec["ports"] = [
                    {"containerPort": port} for port in container.ports
                ]
            
            # 添加环境变量
            if container.env_vars:
                container_spec["env"] = [
                    {"name": k, "value": v} for k, v in container.env_vars.items()
                ]
            
            # 添加卷挂载
            if container.volume_mounts:
                container_spec["volumeMounts"] = container.volume_mounts
            
            pod_spec["spec"]["containers"].append(container_spec)
        
        # 添加卷定义
        volumes = set()
        for container in containers:
            for mount in container.volume_mounts:
                volumes.add(mount["name"])
        
        for volume_name in volumes:
            if "config" in volume_name:
                pod_spec["spec"]["volumes"].append({
                    "name": volume_name,
                    "configMap": {
                        "name": volume_name
                    }
                })
            elif "data" in volume_name:
                pod_spec["spec"]["volumes"].append({
                    "name": volume_name,
                    "persistentVolumeClaim": {
                        "claimName": volume_name
                    }
                })
            else:
                pod_spec["spec"]["volumes"].append({
                    "name": volume_name,
                    "emptyDir": {}
                })
        
        return pod_spec
    
    def generate_init_container_pattern(self, main_container: ContainerSpec, init_tasks: List[str]) -> Dict[str, Any]:
        """生成Init Container模式"""
        init_containers = []
        
        for i, task in enumerate(init_tasks):
            if task == "database_migration":
                init_container = {
                    "name": f"init-db-migration-{i}",
                    "image": "migrate/migrate:latest",
                    "command": ["migrate"],
                    "args": ["-path", "/migrations", "-database", "$(DATABASE_URL)", "up"],
                    "env": [
                        {"name": "DATABASE_URL", "valueFrom": {"secretKeyRef": {"name": "db-secret", "key": "url"}}}
                    ],
                    "volumeMounts": [
                        {"name": "migrations", "mountPath": "/migrations"}
                    ]
                }
            elif task == "config_download":
                init_container = {
                    "name": f"init-config-download-{i}",
                    "image": "curlimages/curl:latest",
                    "command": ["sh", "-c"],
                    "args": ["curl -o /shared/config.json $CONFIG_URL"],
                    "env": [
                        {"name": "CONFIG_URL", "value": "https://config-server/app-config.json"}
                    ],
                    "volumeMounts": [
                        {"name": "shared-config", "mountPath": "/shared"}
                    ]
                }
            elif task == "dependency_check":
                init_container = {
                    "name": f"init-dependency-check-{i}",
                    "image": "busybox:latest",
                    "command": ["sh", "-c"],
                    "args": ["until nc -z $SERVICE_HOST $SERVICE_PORT; do echo waiting for service; sleep 2; done;"],
                    "env": [
                        {"name": "SERVICE_HOST", "value": "database-service"},
                        {"name": "SERVICE_PORT", "value": "5432"}
                    ]
                }
            else:
                continue
            
            init_containers.append(init_container)
        
        pod_spec = self._create_pod_spec([main_container], "init-container")
        pod_spec["spec"]["initContainers"] = init_containers
        
        return pod_spec

# 使用示例
def main():
    """主函数 - 容器模式生成示例"""
    generator = ContainerPatternGenerator()
    
    # 定义主容器
    main_app = ContainerSpec(
        name="web-app",
        image="myapp:latest",
        ports=[8080],
        env_vars={
            "APP_ENV": "production",
            "LOG_LEVEL": "info"
        },
        volume_mounts=[
            {"name": "app-logs", "mountPath": "/var/log/app"}
        ],
        resources={
            "requests": {"cpu": "200m", "memory": "256Mi"},
            "limits": {"cpu": "500m", "memory": "512Mi"}
        }
    )
    
    print("=== 容器设计模式示例 ===")
    
    # 生成Sidecar模式
    sidecar_pod = generator.generate_sidecar_pattern(main_app, "logging")
    print("\n1. Sidecar模式 (日志收集):")
    print(yaml.dump(sidecar_pod, default_flow_style=False))
    
    # 生成Ambassador模式
    ambassador_pod = generator.generate_ambassador_pattern(main_app, "external-api.example.com")
    print("\n2. Ambassador模式:")
    print(yaml.dump(ambassador_pod, default_flow_style=False))
    
    # 生成Adapter模式
    adapter_pod = generator.generate_adapter_pattern(main_app, "metrics")
    print("\n3. Adapter模式 (指标适配):")
    print(yaml.dump(adapter_pod, default_flow_style=False))
    
    # 生成Init Container模式
    init_pod = generator.generate_init_container_pattern(main_app, ["database_migration", "config_download", "dependency_check"])
    print("\n4. Init Container模式:")
    print(yaml.dump(init_pod, default_flow_style=False))

if __name__ == "__main__":
    main()

分享文章