跳转到主要内容

数据库备份与恢复策略:从基础备份到灾难恢复的完整方案

博主
27 分钟
5687 字
--

AI 导读

深刻理解和准确把握"数据库备份与恢复策略:从基础备份到灾难恢复的完整方案"这一重要概念的核心要义,本文从理论基础、实践应用和发展前景等多个维度进行了系统性阐述,为读者提供了全面而深入的分析视角。

内容由AI智能生成

数据库备份与恢复策略:从基础备份到灾难恢复的完整方案

数据库备份与恢复是数据库管理中最关键的环节之一,直接关系到数据的安全性和业务的连续性。本文将深入探讨现代数据库备份与恢复的最佳实践,从基础的备份策略到复杂的灾难恢复方案。

备份策略设计与规划

1. 备份策略架构

graph TB
    A[备份策略规划] --> B[备份类型选择]
    A --> C[备份频率设计]
    A --> D[存储策略规划]
    
    B --> E[全量备份]
    B --> F[增量备份]
    B --> G[差异备份]
    B --> H[日志备份]
    
    C --> I[实时备份]
    C --> J[定时备份]
    C --> K[触发备份]
    
    D --> L[本地存储]
    D --> M[远程存储]
    D --> N[云存储]
    D --> O[多地备份]
    
    E --> P[完整数据副本]
    F --> Q[变更数据]
    G --> R[差异数据]
    H --> S[事务日志]
    
    P --> T[恢复基准点]
    Q --> T
    R --> T
    S --> U[时间点恢复]

2. 备份策略配置

# config/backup_strategy.yaml
backup_strategy:
  # 全局配置
  global:
    retention_policy:
      daily_backups: 7      # 保留7天的日备份
      weekly_backups: 4     # 保留4周的周备份
      monthly_backups: 12   # 保留12个月的月备份
      yearly_backups: 3     # 保留3年的年备份
    
    compression:
      enabled: true
      algorithm: "gzip"     # gzip, lz4, zstd
      level: 6
    
    encryption:
      enabled: true
      algorithm: "AES-256"
      key_rotation_days: 90
    
    verification:
      enabled: true
      checksum_algorithm: "SHA-256"
      test_restore: true
      test_frequency: "weekly"
  
  # MySQL备份策略
  mysql:
    databases:
      - name: "production_db"
        backup_type: "full"
        schedule: "0 2 * * *"    # 每天凌晨2点
        storage_location: "/backup/mysql/full"
        
      - name: "production_db"
        backup_type: "incremental"
        schedule: "0 */6 * * *"  # 每6小时
        storage_location: "/backup/mysql/incremental"
        
      - name: "production_db"
        backup_type: "binlog"
        schedule: "*/10 * * * *" # 每10分钟
        storage_location: "/backup/mysql/binlog"
    
    options:
      single_transaction: true
      routines: true
      triggers: true
      events: true
      lock_tables: false
      master_data: 2
  
  # PostgreSQL备份策略
  postgresql:
    databases:
      - name: "production_db"
        backup_type: "full"
        schedule: "0 3 * * *"
        storage_location: "/backup/postgresql/full"
        
      - name: "production_db"
        backup_type: "wal"
        continuous: true
        storage_location: "/backup/postgresql/wal"
    
    options:
      format: "custom"
      compression_level: 6
      verbose: true
      no_owner: true
      no_privileges: false
  
  # MongoDB备份策略
  mongodb:
    databases:
      - name: "production_db"
        backup_type: "full"
        schedule: "0 4 * * *"
        storage_location: "/backup/mongodb/full"
        
      - name: "production_db"
        backup_type: "oplog"
        continuous: true
        storage_location: "/backup/mongodb/oplog"
    
    options:
      gzip: true
      repair: false
      oplog: true
      journal: true

# 存储配置
storage:
  local:
    path: "/backup"
    max_size: "1TB"
    cleanup_policy: "retention"
  
  remote:
    type: "nfs"
    server: "backup-server.company.com"
    path: "/shared/backups"
    mount_options: "rw,sync,hard,intr"
  
  cloud:
    provider: "aws_s3"
    bucket: "company-database-backups"
    region: "us-west-2"
    storage_class: "STANDARD_IA"
    lifecycle_policy:
      transition_to_glacier: 30
      delete_after: 2555  # 7 years

# 监控和告警
monitoring:
  alerts:
    backup_failure:
      enabled: true
      notification_channels: ["email", "slack"]
      escalation_time: 30  # minutes
    
    backup_size_anomaly:
      enabled: true
      threshold_percentage: 50
      notification_channels: ["email"]
    
    storage_space_low:
      enabled: true
      threshold_percentage: 85
      notification_channels: ["email", "slack"]
  
  metrics:
    backup_duration: true
    backup_size: true
    success_rate: true
    storage_utilization: true

MySQL备份与恢复实现

1. 自动化备份系统

#!/bin/bash
# scripts/mysql_backup_system.sh

set -euo pipefail

# 配置文件路径
CONFIG_FILE="/etc/mysql-backup/config.conf"
LOG_FILE="/var/log/mysql-backup.log"

# 默认配置
MYSQL_HOST="${MYSQL_HOST:-localhost}"
MYSQL_PORT="${MYSQL_PORT:-3306}"
MYSQL_USER="${MYSQL_USER:-backup_user}"
MYSQL_PASSWORD="${MYSQL_PASSWORD:-}"
BACKUP_DIR="${BACKUP_DIR:-/backup/mysql}"
RETENTION_DAYS="${RETENTION_DAYS:-7}"
COMPRESSION="${COMPRESSION:-true}"
ENCRYPTION="${ENCRYPTION:-true}"
ENCRYPTION_KEY="${ENCRYPTION_KEY:-}"

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}

error() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] ERROR: $1" | tee -a "$LOG_FILE" >&2
}

# 加载配置文件
load_config() {
    if [[ -f "$CONFIG_FILE" ]]; then
        source "$CONFIG_FILE"
        log "配置文件加载完成: $CONFIG_FILE"
    else
        log "使用默认配置"
    fi
}

# 检查依赖
check_dependencies() {
    local deps=("mysqldump" "mysql" "gzip" "openssl")
    
    for dep in "${deps[@]}"; do
        if ! command -v "$dep" &> /dev/null; then
            error "依赖项未找到: $dep"
            exit 1
        fi
    done
    
    log "依赖检查完成"
}

# 创建备份目录
create_backup_directories() {
    local dirs=(
        "$BACKUP_DIR/full"
        "$BACKUP_DIR/incremental"
        "$BACKUP_DIR/binlog"
        "$BACKUP_DIR/temp"
    )
    
    for dir in "${dirs[@]}"; do
        mkdir -p "$dir"
    done
    
    log "备份目录创建完成"
}

# 获取数据库列表
get_databases() {
    mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" \
        -e "SHOW DATABASES;" | grep -Ev '^(Database|information_schema|performance_schema|mysql|sys)$'
}

# 全量备份
full_backup() {
    local database="$1"
    local timestamp=$(date '+%Y%m%d_%H%M%S')
    local backup_file="$BACKUP_DIR/full/${database}_full_${timestamp}.sql"
    
    log "开始全量备份: $database"
    
    # 执行备份
    mysqldump \
        -h"$MYSQL_HOST" \
        -P"$MYSQL_PORT" \
        -u"$MYSQL_USER" \
        -p"$MYSQL_PASSWORD" \
        --single-transaction \
        --routines \
        --triggers \
        --events \
        --master-data=2 \
        --flush-logs \
        --hex-blob \
        --default-character-set=utf8mb4 \
        "$database" > "$backup_file"
    
    if [[ $? -eq 0 ]]; then
        log "全量备份完成: $backup_file"
        
        # 压缩备份文件
        if [[ "$COMPRESSION" == "true" ]]; then
            compress_backup "$backup_file"
        fi
        
        # 加密备份文件
        if [[ "$ENCRYPTION" == "true" ]]; then
            encrypt_backup "$backup_file"
        fi
        
        # 验证备份
        verify_backup "$backup_file"
        
        # 生成备份元数据
        generate_backup_metadata "$backup_file" "full" "$database"
        
    else
        error "全量备份失败: $database"
        return 1
    fi
}

# 增量备份(基于binlog)
incremental_backup() {
    local database="$1"
    local timestamp=$(date '+%Y%m%d_%H%M%S')
    local backup_dir="$BACKUP_DIR/incremental/${database}_${timestamp}"
    
    log "开始增量备份: $database"
    
    mkdir -p "$backup_dir"
    
    # 获取当前binlog位置
    local binlog_info=$(mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" \
        -e "SHOW MASTER STATUS\G" | grep -E "(File|Position)")
    
    echo "$binlog_info" > "$backup_dir/binlog_position.txt"
    
    # 刷新binlog
    mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" \
        -e "FLUSH LOGS;"
    
    # 复制binlog文件
    local binlog_dir=$(mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" \
        -e "SHOW VARIABLES LIKE 'log_bin_basename';" | awk 'NR==2 {print $2}' | xargs dirname)
    
    # 获取最新的binlog文件
    local latest_binlog=$(mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" \
        -e "SHOW BINARY LOGS;" | tail -n 1 | awk '{print $1}')
    
    if [[ -n "$latest_binlog" ]]; then
        cp "$binlog_dir/$latest_binlog" "$backup_dir/"
        log "增量备份完成: $backup_dir"
        
        # 生成备份元数据
        generate_backup_metadata "$backup_dir" "incremental" "$database"
    else
        error "增量备份失败: 无法获取binlog文件"
        return 1
    fi
}

# 压缩备份文件
compress_backup() {
    local backup_file="$1"
    
    log "压缩备份文件: $backup_file"
    
    gzip "$backup_file"
    
    if [[ $? -eq 0 ]]; then
        log "压缩完成: ${backup_file}.gz"
    else
        error "压缩失败: $backup_file"
        return 1
    fi
}

# 加密备份文件
encrypt_backup() {
    local backup_file="$1"
    
    if [[ -z "$ENCRYPTION_KEY" ]]; then
        error "加密密钥未设置"
        return 1
    fi
    
    log "加密备份文件: $backup_file"
    
    # 如果文件已压缩,处理.gz文件
    if [[ "$backup_file" == *.gz ]]; then
        openssl enc -aes-256-cbc -salt -in "$backup_file" -out "${backup_file}.enc" -k "$ENCRYPTION_KEY"
        rm "$backup_file"
    else
        openssl enc -aes-256-cbc -salt -in "$backup_file" -out "${backup_file}.enc" -k "$ENCRYPTION_KEY"
        rm "$backup_file"
    fi
    
    if [[ $? -eq 0 ]]; then
        log "加密完成: ${backup_file}.enc"
    else
        error "加密失败: $backup_file"
        return 1
    fi
}

# 验证备份文件
verify_backup() {
    local backup_file="$1"
    
    log "验证备份文件: $backup_file"
    
    # 检查文件是否存在且不为空
    if [[ ! -f "$backup_file" ]] || [[ ! -s "$backup_file" ]]; then
        error "备份文件无效: $backup_file"
        return 1
    fi
    
    # 生成校验和
    local checksum=$(sha256sum "$backup_file" | awk '{print $1}')
    echo "$checksum" > "${backup_file}.sha256"
    
    log "备份验证完成,校验和: $checksum"
}

# 生成备份元数据
generate_backup_metadata() {
    local backup_file="$1"
    local backup_type="$2"
    local database="$3"
    local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
    
    local metadata_file="${backup_file}.metadata"
    
    cat > "$metadata_file" << EOF
{
    "backup_type": "$backup_type",
    "database": "$database",
    "timestamp": "$timestamp",
    "backup_file": "$backup_file",
    "file_size": $(stat -c%s "$backup_file" 2>/dev/null || echo "0"),
    "mysql_version": "$(mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "SELECT VERSION();" | tail -n 1)",
    "compression": "$COMPRESSION",
    "encryption": "$ENCRYPTION",
    "checksum": "$(cat "${backup_file}.sha256" 2>/dev/null || echo "N/A")"
}
EOF
    
    log "备份元数据生成完成: $metadata_file"
}

# 清理过期备份
cleanup_old_backups() {
    log "开始清理过期备份"
    
    # 清理全量备份
    find "$BACKUP_DIR/full" -name "*.sql*" -mtime +$RETENTION_DAYS -delete
    find "$BACKUP_DIR/full" -name "*.metadata" -mtime +$RETENTION_DAYS -delete
    find "$BACKUP_DIR/full" -name "*.sha256" -mtime +$RETENTION_DAYS -delete
    
    # 清理增量备份
    find "$BACKUP_DIR/incremental" -type d -mtime +$RETENTION_DAYS -exec rm -rf {} +
    
    # 清理binlog备份
    find "$BACKUP_DIR/binlog" -name "*.log*" -mtime +$RETENTION_DAYS -delete
    
    log "过期备份清理完成"
}

# 备份状态报告
generate_backup_report() {
    local report_file="$BACKUP_DIR/backup_report_$(date '+%Y%m%d').txt"
    
    cat > "$report_file" << EOF
MySQL备份状态报告
生成时间: $(date '+%Y-%m-%d %H:%M:%S')

备份目录统计:
$(du -sh "$BACKUP_DIR"/* 2>/dev/null || echo "无备份数据")

最近的备份文件:
全量备份:
$(find "$BACKUP_DIR/full" -name "*.sql*" -mtime -1 -exec ls -lh {} \; 2>/dev/null | head -5)

增量备份:
$(find "$BACKUP_DIR/incremental" -type d -mtime -1 -exec ls -lhd {} \; 2>/dev/null | head -5)

存储空间使用:
$(df -h "$BACKUP_DIR" | tail -n 1)

备份验证状态:
$(find "$BACKUP_DIR" -name "*.sha256" -mtime -1 | wc -l) 个文件已验证

EOF
    
    log "备份报告生成完成: $report_file"
}

# 发送告警通知
send_alert() {
    local message="$1"
    local severity="${2:-INFO}"
    
    # 发送邮件告警(如果配置了邮件)
    if [[ -n "${ALERT_EMAIL:-}" ]]; then
        echo "$message" | mail -s "MySQL备份告警 - $severity" "$ALERT_EMAIL"
    fi
    
    # 发送Slack通知(如果配置了Webhook)
    if [[ -n "${SLACK_WEBHOOK:-}" ]]; then
        curl -X POST -H 'Content-type: application/json' \
            --data "{\"text\":\"MySQL备份告警 - $severity: $message\"}" \
            "$SLACK_WEBHOOK"
    fi
    
    log "告警通知已发送: $message"
}

# 主备份函数
main_backup() {
    local backup_type="${1:-full}"
    local database="${2:-all}"
    
    log "开始MySQL备份任务 - 类型: $backup_type, 数据库: $database"
    
    # 检查MySQL连接
    if ! mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "SELECT 1;" &>/dev/null; then
        error "无法连接到MySQL服务器"
        send_alert "MySQL备份失败: 无法连接到数据库服务器" "CRITICAL"
        exit 1
    fi
    
    # 获取数据库列表
    local databases
    if [[ "$database" == "all" ]]; then
        databases=$(get_databases)
    else
        databases="$database"
    fi
    
    # 执行备份
    local backup_count=0
    local failed_count=0
    
    for db in $databases; do
        case "$backup_type" in
            "full")
                if full_backup "$db"; then
                    ((backup_count++))
                else
                    ((failed_count++))
                fi
                ;;
            "incremental")
                if incremental_backup "$db"; then
                    ((backup_count++))
                else
                    ((failed_count++))
                fi
                ;;
            *)
                error "不支持的备份类型: $backup_type"
                exit 1
                ;;
        esac
    done
    
    # 清理过期备份
    cleanup_old_backups
    
    # 生成备份报告
    generate_backup_report
    
    # 发送完成通知
    if [[ $failed_count -eq 0 ]]; then
        log "备份任务完成 - 成功: $backup_count, 失败: $failed_count"
        send_alert "MySQL备份任务完成 - 成功备份 $backup_count 个数据库" "INFO"
    else
        error "备份任务完成但有失败 - 成功: $backup_count, 失败: $failed_count"
        send_alert "MySQL备份任务部分失败 - 成功: $backup_count, 失败: $failed_count" "WARNING"
    fi
}

# 恢复函数
restore_database() {
    local backup_file="$1"
    local target_database="$2"
    local target_host="${3:-$MYSQL_HOST}"
    local target_port="${4:-$MYSQL_PORT}"
    
    log "开始恢复数据库: $target_database"
    
    # 检查备份文件
    if [[ ! -f "$backup_file" ]]; then
        error "备份文件不存在: $backup_file"
        return 1
    fi
    
    # 解密备份文件(如果需要)
    local restore_file="$backup_file"
    if [[ "$backup_file" == *.enc ]]; then
        restore_file="${backup_file%.enc}"
        log "解密备份文件: $backup_file"
        openssl enc -aes-256-cbc -d -in "$backup_file" -out "$restore_file" -k "$ENCRYPTION_KEY"
    fi
    
    # 解压备份文件(如果需要)
    if [[ "$restore_file" == *.gz ]]; then
        log "解压备份文件: $restore_file"
        gunzip "$restore_file"
        restore_file="${restore_file%.gz}"
    fi
    
    # 创建目标数据库
    mysql -h"$target_host" -P"$target_port" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" \
        -e "CREATE DATABASE IF NOT EXISTS \`$target_database\`;"
    
    # 恢复数据
    log "恢复数据到数据库: $target_database"
    mysql -h"$target_host" -P"$target_port" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" \
        "$target_database" < "$restore_file"
    
    if [[ $? -eq 0 ]]; then
        log "数据库恢复完成: $target_database"
        
        # 验证恢复结果
        local table_count=$(mysql -h"$target_host" -P"$target_port" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" \
            -e "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema='$target_database';" | tail -n 1)
        
        log "恢复验证 - 表数量: $table_count"
        
    else
        error "数据库恢复失败: $target_database"
        return 1
    fi
    
    # 清理临时文件
    if [[ "$restore_file" != "$backup_file" ]]; then
        rm -f "$restore_file"
    fi
}

# 时间点恢复
point_in_time_recovery() {
    local database="$1"
    local target_time="$2"
    local backup_file="$3"
    
    log "开始时间点恢复: $database$target_time"
    
    # 恢复全量备份
    restore_database "$backup_file" "${database}_pitr_temp"
    
    # 应用binlog到指定时间点
    local binlog_dir="$BACKUP_DIR/binlog"
    local binlog_files=$(find "$binlog_dir" -name "*.log" -newer "$backup_file" | sort)
    
    for binlog_file in $binlog_files; do
        log "应用binlog: $binlog_file"
        
        mysqlbinlog --stop-datetime="$target_time" "$binlog_file" | \
        mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" \
            "${database}_pitr_temp"
    done
    
    log "时间点恢复完成: ${database}_pitr_temp"
}

# 主函数
main() {
    case "${1:-backup}" in
        "backup")
            load_config
            check_dependencies
            create_backup_directories
            main_backup "${2:-full}" "${3:-all}"
            ;;
        "restore")
            if [[ $# -lt 3 ]]; then
                echo "用法: $0 restore <backup_file> <target_database> [target_host] [target_port]"
                exit 1
            fi
            load_config
            restore_database "$2" "$3" "$4" "$5"
            ;;
        "pitr")
            if [[ $# -lt 4 ]]; then
                echo "用法: $0 pitr <database> <target_time> <backup_file>"
                exit 1
            fi
            load_config
            point_in_time_recovery "$2" "$3" "$4"
            ;;
        "cleanup")
            load_config
            cleanup_old_backups
            ;;
        "report")
            load_config
            generate_backup_report
            ;;
        *)
            echo "用法: $0 {backup|restore|pitr|cleanup|report}"
            echo "  backup [full|incremental] [database|all] - 执行备份"
            echo "  restore <backup_file> <target_database>   - 恢复数据库"
            echo "  pitr <database> <time> <backup_file>      - 时间点恢复"
            echo "  cleanup                                   - 清理过期备份"
            echo "  report                                    - 生成备份报告"
            exit 1
            ;;
    esac
}

main "$@"

2. MySQL恢复验证脚本

#!/usr/bin/env python3
# scripts/mysql_recovery_validator.py

import mysql.connector
import argparse
import json
import logging
import hashlib
import time
from datetime import datetime
from typing import Dict, List, Tuple, Any

class MySQLRecoveryValidator:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.logger = self._setup_logging()
        
        # 数据库连接配置
        self.source_conn = None
        self.target_conn = None
        
    def _setup_logging(self) -> logging.Logger:
        """设置日志记录"""
        logger = logging.getLogger('MySQLRecoveryValidator')
        logger.setLevel(logging.INFO)
        
        handler = logging.FileHandler('/var/log/mysql_recovery_validation.log')
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        
        # 控制台输出
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)
        logger.addHandler(console_handler)
        
        return logger
    
    def connect_databases(self):
        """连接源数据库和目标数据库"""
        try:
            # 连接源数据库
            self.source_conn = mysql.connector.connect(
                **self.config['source_database']
            )
            
            # 连接目标数据库
            self.target_conn = mysql.connector.connect(
                **self.config['target_database']
            )
            
            self.logger.info("数据库连接建立成功")
            
        except Exception as e:
            self.logger.error(f"数据库连接失败: {e}")
            raise
    
    def validate_schema_consistency(self, database_name: str) -> Dict[str, Any]:
        """验证模式一致性"""
        self.logger.info(f"开始验证模式一致性: {database_name}")
        
        validation_result = {
            'database': database_name,
            'schema_consistent': True,
            'differences': [],
            'table_count_match': True,
            'index_count_match': True,
            'constraint_count_match': True
        }
        
        try:
            source_cursor = self.source_conn.cursor(dictionary=True)
            target_cursor = self.target_conn.cursor(dictionary=True)
            
            # 验证表结构
            table_diff = self._compare_tables(source_cursor, target_cursor, database_name)
            if table_diff:
                validation_result['schema_consistent'] = False
                validation_result['differences'].extend(table_diff)
            
            # 验证索引
            index_diff = self._compare_indexes(source_cursor, target_cursor, database_name)
            if index_diff:
                validation_result['schema_consistent'] = False
                validation_result['differences'].extend(index_diff)
            
            # 验证约束
            constraint_diff = self._compare_constraints(source_cursor, target_cursor, database_name)
            if constraint_diff:
                validation_result['schema_consistent'] = False
                validation_result['differences'].extend(constraint_diff)
            
            source_cursor.close()
            target_cursor.close()
            
        except Exception as e:
            self.logger.error(f"模式验证失败: {e}")
            validation_result['schema_consistent'] = False
            validation_result['error'] = str(e)
        
        return validation_result
    
    def _compare_tables(self, source_cursor, target_cursor, database_name: str) -> List[str]:
        """比较表结构"""
        differences = []
        
        # 获取源数据库表信息
        source_cursor.execute(f"""
            SELECT table_name, table_type, engine, table_rows, data_length, index_length
            FROM information_schema.tables
            WHERE table_schema = '{database_name}'
            ORDER BY table_name
        """)
        source_tables = {row['table_name']: row for row in source_cursor.fetchall()}
        
        # 获取目标数据库表信息
        target_cursor.execute(f"""
            SELECT table_name, table_type, engine, table_rows, data_length, index_length
            FROM information_schema.tables
            WHERE table_schema = '{database_name}'
            ORDER BY table_name
        """)
        target_tables = {row['table_name']: row for row in target_cursor.fetchall()}
        
        # 比较表
        source_table_names = set(source_tables.keys())
        target_table_names = set(target_tables.keys())
        
        # 检查缺失的表
        missing_tables = source_table_names - target_table_names
        if missing_tables:
            differences.append(f"目标数据库缺失表: {', '.join(missing_tables)}")
        
        # 检查多余的表
        extra_tables = target_table_names - source_table_names
        if extra_tables:
            differences.append(f"目标数据库多余表: {', '.join(extra_tables)}")
        
        # 比较共同表的结构
        common_tables = source_table_names & target_table_names
        for table_name in common_tables:
            source_table = source_tables[table_name]
            target_table = target_tables[table_name]
            
            if source_table['engine'] != target_table['engine']:
                differences.append(f"表 {table_name} 存储引擎不匹配: {source_table['engine']} vs {target_table['engine']}")
            
            # 比较列结构
            column_diff = self._compare_columns(source_cursor, target_cursor, database_name, table_name)
            differences.extend(column_diff)
        
        return differences
    
    def _compare_columns(self, source_cursor, target_cursor, database_name: str, table_name: str) -> List[str]:
        """比较列结构"""
        differences = []
        
        # 获取源表列信息
        source_cursor.execute(f"""
            SELECT column_name, data_type, is_nullable, column_default, extra
            FROM information_schema.columns
            WHERE table_schema = '{database_name}' AND table_name = '{table_name}'
            ORDER BY ordinal_position
        """)
        source_columns = {row['column_name']: row for row in source_cursor.fetchall()}
        
        # 获取目标表列信息
        target_cursor.execute(f"""
            SELECT column_name, data_type, is_nullable, column_default, extra
            FROM information_schema.columns
            WHERE table_schema = '{database_name}' AND table_name = '{table_name}'
            ORDER BY ordinal_position
        """)
        target_columns = {row['column_name']: row for row in target_cursor.fetchall()}
        
        # 比较列
        source_column_names = set(source_columns.keys())
        target_column_names = set(target_columns.keys())
        
        # 检查缺失的列
        missing_columns = source_column_names - target_column_names
        if missing_columns:
            differences.append(f"表 {table_name} 缺失列: {', '.join(missing_columns)}")
        
        # 检查多余的列
        extra_columns = target_column_names - source_column_names
        if extra_columns:
            differences.append(f"表 {table_name} 多余列: {', '.join(extra_columns)}")
        
        # 比较共同列的属性
        common_columns = source_column_names & target_column_names
        for column_name in common_columns:
            source_col = source_columns[column_name]
            target_col = target_columns[column_name]
            
            if source_col['data_type'] != target_col['data_type']:
                differences.append(f"表 {table_name}{column_name} 数据类型不匹配: {source_col['data_type']} vs {target_col['data_type']}")
            
            if source_col['is_nullable'] != target_col['is_nullable']:
                differences.append(f"表 {table_name}{column_name} 可空属性不匹配")
        
        return differences
    
    def _compare_indexes(self, source_cursor, target_cursor, database_name: str) -> List[str]:
        """比较索引"""
        differences = []
        
        # 获取源数据库索引信息
        source_cursor.execute(f"""
            SELECT table_name, index_name, column_name, seq_in_index, non_unique
            FROM information_schema.statistics
            WHERE table_schema = '{database_name}'
            ORDER BY table_name, index_name, seq_in_index
        """)
        source_indexes = {}
        for row in source_cursor.fetchall():
            key = f"{row['table_name']}.{row['index_name']}"
            if key not in source_indexes:
                source_indexes[key] = []
            source_indexes[key].append(row)
        
        # 获取目标数据库索引信息
        target_cursor.execute(f"""
            SELECT table_name, index_name, column_name, seq_in_index, non_unique
            FROM information_schema.statistics
            WHERE table_schema = '{database_name}'
            ORDER BY table_name, index_name, seq_in_index
        """)
        target_indexes = {}
        for row in target_cursor.fetchall():
            key = f"{row['table_name']}.{row['index_name']}"
            if key not in target_indexes:
                target_indexes[key] = []
            target_indexes[key].append(row)
        
        # 比较索引
        source_index_names = set(source_indexes.keys())
        target_index_names = set(target_indexes.keys())
        
        missing_indexes = source_index_names - target_index_names
        if missing_indexes:
            differences.append(f"缺失索引: {', '.join(missing_indexes)}")
        
        extra_indexes = target_index_names - source_index_names
        if extra_indexes:
            differences.append(f"多余索引: {', '.join(extra_indexes)}")
        
        return differences
    
    def _compare_constraints(self, source_cursor, target_cursor, database_name: str) -> List[str]:
        """比较约束"""
        differences = []
        
        # 获取外键约束
        source_cursor.execute(f"""
            SELECT constraint_name, table_name, column_name, referenced_table_name, referenced_column_name
            FROM information_schema.key_column_usage
            WHERE table_schema = '{database_name}' AND referenced_table_name IS NOT NULL
            ORDER BY constraint_name
        """)
        source_constraints = set()
        for row in source_cursor.fetchall():
            constraint_key = f"{row['table_name']}.{row['constraint_name']}"
            source_constraints.add(constraint_key)
        
        target_cursor.execute(f"""
            SELECT constraint_name, table_name, column_name, referenced_table_name, referenced_column_name
            FROM information_schema.key_column_usage
            WHERE table_schema = '{database_name}' AND referenced_table_name IS NOT NULL
            ORDER BY constraint_name
        """)
        target_constraints = set()
        for row in target_cursor.fetchall():
            constraint_key = f"{row['table_name']}.{row['constraint_name']}"
            target_constraints.add(constraint_key)
        
        # 比较约束
        missing_constraints = source_constraints - target_constraints
        if missing_constraints:
            differences.append(f"缺失约束: {', '.join(missing_constraints)}")
        
        extra_constraints = target_constraints - source_constraints
        if extra_constraints:
            differences.append(f"多余约束: {', '.join(extra_constraints)}")
        
        return differences
    
    def validate_data_consistency(self, database_name: str, sample_tables: List[str] = None) -> Dict[str, Any]:
        """验证数据一致性"""
        self.logger.info(f"开始验证数据一致性: {database_name}")
        
        validation_result = {
            'database': database_name,
            'data_consistent': True,
            'table_results': {},
            'total_tables_checked': 0,
            'inconsistent_tables': []
        }
        
        try:
            source_cursor = self.source_conn.cursor(dictionary=True)
            target_cursor = self.target_conn.cursor(dictionary=True)
            
            # 获取要检查的表列表
            if sample_tables:
                tables_to_check = sample_tables
            else:
                source_cursor.execute(f"""
                    SELECT table_name FROM information_schema.tables
                    WHERE table_schema = '{database_name}' AND table_type = 'BASE TABLE'
                    ORDER BY table_name
                """)
                tables_to_check = [row['table_name'] for row in source_cursor.fetchall()]
            
            validation_result['total_tables_checked'] = len(tables_to_check)
            
            # 检查每个表的数据一致性
            for table_name in tables_to_check:
                table_result = self._validate_table_data(
                    source_cursor, target_cursor, database_name, table_name
                )
                validation_result['table_results'][table_name] = table_result
                
                if not table_result['consistent']:
                    validation_result['data_consistent'] = False
                    validation_result['inconsistent_tables'].append(table_name)
            
            source_cursor.close()
            target_cursor.close()
            
        except Exception as e:
            self.logger.error(f"数据一致性验证失败: {e}")
            validation_result['data_consistent'] = False
            validation_result['error'] = str(e)
        
        return validation_result
    
    def _validate_table_data(self, source_cursor, target_cursor, database_name: str, table_name: str) -> Dict[str, Any]:
        """验证单个表的数据一致性"""
        table_result = {
            'table_name': table_name,
            'consistent': True,
            'row_count_match': True,
            'checksum_match': True,
            'source_row_count': 0,
            'target_row_count': 0,
            'source_checksum': '',
            'target_checksum': ''
        }
        
        try:
            # 比较行数
            source_cursor.execute(f"SELECT COUNT(*) as count FROM `{database_name}`.`{table_name}`")
            source_count = source_cursor.fetchone()['count']
            table_result['source_row_count'] = source_count
            
            target_cursor.execute(f"SELECT COUNT(*) as count FROM `{database_name}`.`{table_name}`")
            target_count = target_cursor.fetchone()['count']
            table_result['target_row_count'] = target_count
            
            if source_count != target_count:
                table_result['consistent'] = False
                table_result['row_count_match'] = False
                self.logger.warning(f"表 {table_name} 行数不匹配: {source_count} vs {target_count}")
            
            # 比较数据校验和(仅对小表进行)
            if source_count <= 100000:  # 只对10万行以下的表进行校验和比较
                source_checksum = self._calculate_table_checksum(source_cursor, database_name, table_name)
                target_checksum = self._calculate_table_checksum(target_cursor, database_name, table_name)
                
                table_result['source_checksum'] = source_checksum
                table_result['target_checksum'] = target_checksum
                
                if source_checksum != target_checksum:
                    table_result['consistent'] = False
                    table_result['checksum_match'] = False
                    self.logger.warning(f"表 {table_name} 校验和不匹配")
            
        except Exception as e:
            self.logger.error(f"表 {table_name} 数据验证失败: {e}")
            table_result['consistent'] = False
            table_result['error'] = str(e)
        
        return table_result
    
    def _calculate_table_checksum(self, cursor, database_name: str, table_name: str) -> str:
        """计算表数据校验和"""
        try:
            # 获取表的所有列
            cursor.execute(f"""
                SELECT column_name FROM information_schema.columns
                WHERE table_schema = '{database_name}' AND table_name = '{table_name}'
                ORDER BY ordinal_position
            """)
            columns = [row['column_name'] for row in cursor.fetchall()]
            
            if not columns:
                return ""
            
            # 构建查询语句
            column_list = ', '.join([f'COALESCE(`{col}`, "NULL")' for col in columns])
            query = f"""
                SELECT MD5(GROUP_CONCAT(
                    MD5(CONCAT({column_list}))
                    ORDER BY {columns[0]}
                )) as checksum
                FROM `{database_name}`.`{table_name}`
            """
            
            cursor.execute(query)
            result = cursor.fetchone()
            return result['checksum'] if result else ""
            
        except Exception as e:
            self.logger.error(f"计算表 {table_name} 校验和失败: {e}")
            return ""
    
    def validate_performance_metrics(self, database_name: str) -> Dict[str, Any]:
        """验证性能指标"""
        self.logger.info(f"开始验证性能指标: {database_name}")
        
        validation_result = {
            'database': database_name,
            'performance_acceptable': True,
            'query_performance': {},
            'connection_test': {},
            'load_test': {}
        }
        
        try:
            # 测试查询性能
            query_result = self._test_query_performance(database_name)
            validation_result['query_performance'] = query_result
            
            # 测试连接性能
            connection_result = self._test_connection_performance()
            validation_result['connection_test'] = connection_result
            
            # 简单负载测试
            load_result = self._test_simple_load(database_name)
            validation_result['load_test'] = load_result
            
            # 判断整体性能是否可接受
            if (query_result.get('average_response_time', 0) > 1000 or  # 1秒
                connection_result.get('average_connection_time', 0) > 500 or  # 0.5秒
                load_result.get('success_rate', 0) < 0.95):  # 95%成功率
                validation_result['performance_acceptable'] = False
            
        except Exception as e:
            self.logger.error(f"性能验证失败: {e}")
            validation_result['performance_acceptable'] = False
            validation_result['error'] = str(e)
        
        return validation_result
    
    def _test_query_performance(self, database_name: str) -> Dict[str, Any]:
        """测试查询性能"""
        result = {
            'queries_tested': 0,
            'total_time': 0,
            'average_response_time': 0,
            'max_response_time': 0,
            'min_response_time': float('inf')
        }
        
        try:
            cursor = self.target_conn.cursor()
            
            # 测试查询列表
            test_queries = [
                f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = '{database_name}'",
                f"SELECT table_name, table_rows FROM information_schema.tables WHERE table_schema = '{database_name}' LIMIT 10",
                "SELECT NOW()",
                "SELECT VERSION()",
                "SHOW STATUS LIKE 'Threads_connected'"
            ]
            
            response_times = []
            
            for query in test_queries:
                start_time = time.time()
                cursor.execute(query)
                cursor.fetchall()
                end_time = time.time()
                
                response_time = (end_time - start_time) * 1000  # 转换为毫秒
                response_times.append(response_time)
                result['queries_tested'] += 1
            
            if response_times:
                result['total_time'] = sum(response_times)
                result['average_response_time'] = sum(response_times) / len(response_times)
                result['max_response_time'] = max(response_times)
                result['min_response_time'] = min(response_times)
            
            cursor.close()
            
        except Exception as e:
            self.logger.error(f"查询性能测试失败: {e}")
            result['error'] = str(e)
        
        return result
    
    def _test_connection_performance(self) -> Dict[str, Any]:
        """测试连接性能"""
        result = {
            'connections_tested': 0,
            'successful_connections': 0,
            'average_connection_time': 0,
            'max_connection_time': 0,
            'min_connection_time': float('inf')
        }
        
        try:
            connection_times = []
            
            # 测试10次连接
            for i in range(10):
                start_time = time.time()
                
                try:
                    test_conn = mysql.connector.connect(
                        **self.config['target_database']
                    )
                    test_conn.close()
                    
                    end_time = time.time()
                    connection_time = (end_time - start_time) * 1000  # 转换为毫秒
                    connection_times.append(connection_time)
                    result['successful_connections'] += 1
                    
                except Exception:
                    pass
                
                result['connections_tested'] += 1
            
            if connection_times:
                result['average_connection_time'] = sum(connection_times) / len(connection_times)
                result['max_connection_time'] = max(connection_times)
                result['min_connection_time'] = min(connection_times)
            
        except Exception as e:
            self.logger.error(f"连接性能测试失败: {e}")
            result['error'] = str(e)
        
        return result
    
    def _test_simple_load(self, database_name: str) -> Dict[str, Any]:
        """简单负载测试"""
        result = {
            'total_operations': 0,
            'successful_operations': 0,
            'failed_operations': 0,
            'success_rate': 0,
            'average_operation_time': 0
        }
        
        try:
            cursor = self.target_conn.cursor()
            operation_times = []
            
            # 执行50次简单查询
            for i in range(50):
                start_time = time.time()
                
                try:
                    cursor.execute(f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = '{database_name}'")
                    cursor.fetchone()
                    
                    end_time = time.time()
                    operation_time = (end_time - start_time) * 1000
                    operation_times.append(operation_time)
                    result['successful_operations'] += 1
                    
                except Exception:
                    result['failed_operations'] += 1
                
                result['total_operations'] += 1
            
            if result['total_operations'] > 0:
                result['success_rate'] = result['successful_operations'] / result['total_operations']
            
            if operation_times:
                result['average_operation_time'] = sum(operation_times) / len(operation_times)
            
            cursor.close()
            
        except Exception as e:
            self.logger.error(f"负载测试失败: {e}")
            result['error'] = str(e)
        
        return result
    
    def generate_validation_report(self, validation_results: Dict[str, Any]) -> str:
        """生成验证报告"""
        report_file = f"/var/reports/mysql_recovery_validation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        
        # 添加报告元数据
        validation_results['report_metadata'] = {
            'generated_at': datetime.now().isoformat(),
            'validator_version': '1.0.0',
            'source_database': self.config['source_database']['host'],
            'target_database': self.config['target_database']['host']
        }
        
        # 写入JSON报告
        with open(report_file, 'w', encoding='utf-8') as f:
            json.dump(validation_results, f, indent=2, ensure_ascii=False)
        
        self.logger.info(f"验证报告生成完成: {report_file}")
        return report_file
    
    def run_full_validation(self, database_name: str, sample_tables: List[str] = None) -> Dict[str, Any]:
        """运行完整验证"""
        self.logger.info(f"开始完整验证: {database_name}")
        
        validation_results = {
            'database': database_name,
            'validation_start_time': datetime.now().isoformat(),
            'overall_status': 'PASS',
            'schema_validation': {},
            'data_validation': {},
            'performance_validation': {}
        }
        
        try:
            # 连接数据库
            self.connect_databases()
            
            # 模式验证
            schema_result = self.validate_schema_consistency(database_name)
            validation_results['schema_validation'] = schema_result
            
            if not schema_result['schema_consistent']:
                validation_results['overall_status'] = 'FAIL'
            
            # 数据验证
            data_result = self.validate_data_consistency(database_name, sample_tables)
            validation_results['data_validation'] = data_result
            
            if not data_result['data_consistent']:
                validation_results['overall_status'] = 'FAIL'
            
            # 性能验证
            performance_result = self.validate_performance_metrics(database_name)
            validation_results['performance_validation'] = performance_result
            
            if not performance_result['performance_acceptable']:
                if validation_results['overall_status'] == 'PASS':
                    validation_results['overall_status'] = 'WARNING'
            
            validation_results['validation_end_time'] = datetime.now().isoformat()
            
            # 生成报告
            report_file = self.generate_validation_report(validation_results)
            validation_results['report_file'] = report_file
            
        except Exception as e:
            self.logger.error(f"验证过程失败: {e}")
            validation_results['overall_status'] = 'ERROR'
            validation_results['error'] = str(e)
        
        finally:
            # 关闭数据库连接
            if self.source_conn:
                self.source_conn.close()
            if self.target_conn:
                self.target_conn.close()
        
        return validation_results

def main():
    parser = argparse.ArgumentParser(description='MySQL恢复验证工具')
    parser.add_argument('--config', required=True, help='配置文件路径')
    parser.add_argument('--database', required=True, help='要验证的数据库名称')
    parser.add_argument('--sample-tables', nargs='*', help='要验证的样本表列表')
    parser.add_argument('--schema-only', action='store_true', help='仅验证模式')
    parser.add_argument('--data-only', action='store_true', help='仅验证数据')
    parser.add_argument('--performance-only', action='store_true', help='仅验证性能')
    
    args = parser.parse_args()
    
    # 加载配置
    with open(args.config, 'r') as f:
        config = json.load(f)
    
    # 创建验证器
    validator = MySQLRecoveryValidator(config)
    
    try:
        if args.schema_only:
            validator.connect_databases()
            result = validator.validate_schema_consistency(args.database)
            print(json.dumps(result, indent=2))
        elif args.data_only:
            validator.connect_databases()
            result = validator.validate_data_consistency(args.database, args.sample_tables)
            print(json.dumps(result, indent=2))
        elif args.performance_only:
            validator.connect_databases()
            result = validator.validate_performance_metrics(args.database)
            print(json.dumps(result, indent=2))
        else:
            # 完整验证
            result = validator.run_full_validation(args.database, args.sample_tables)
            print(f"验证完成,状态: {result['overall_status']}")
            print(f"报告文件: {result.get('report_file', 'N/A')}")
    
    except Exception as e:
        print(f"验证失败: {e}")
        exit(1)

if __name__ == "__main__":
    main()

PostgreSQL备份与恢复实现

1. PostgreSQL备份系统

#!/bin/bash
# scripts/postgresql_backup_system.sh

set -euo pipefail

# 配置文件路径
CONFIG_FILE="/etc/postgresql-backup/config.conf"
LOG_FILE="/var/log/postgresql-backup.log"

# 默认配置
PGHOST="${PGHOST:-localhost}"
PGPORT="${PGPORT:-5432}"
PGUSER="${PGUSER:-backup_user}"
PGPASSWORD="${PGPASSWORD:-}"
BACKUP_DIR="${BACKUP_DIR:-/backup/postgresql}"
WAL_ARCHIVE_DIR="${WAL_ARCHIVE_DIR:-/backup/postgresql/wal}"
RETENTION_DAYS="${RETENTION_DAYS:-7}"
COMPRESSION="${COMPRESSION:-true}"
ENCRYPTION="${ENCRYPTION:-true}"
ENCRYPTION_KEY="${ENCRYPTION_KEY:-}"

# 导出PostgreSQL环境变量
export PGHOST PGPORT PGUSER PGPASSWORD

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}

error() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] ERROR: $1" | tee -a "$LOG_FILE" >&2
}

# 加载配置文件
load_config() {
    if [[ -f "$CONFIG_FILE" ]]; then
        source "$CONFIG_FILE"
        log "配置文件加载完成: $CONFIG_FILE"
    else
        log "使用默认配置"
    fi
}

# 检查依赖
check_dependencies() {
    local deps=("pg_dump" "pg_dumpall" "pg_basebackup" "psql" "gzip" "openssl")
    
    for dep in "${deps[@]}"; do
        if ! command -v "$dep" &> /dev/null; then
            error "依赖项未找到: $dep"
            exit 1
        fi
    done
    
    log "依赖检查完成"
}

# 创建备份目录
create_backup_directories() {
    local dirs=(
        "$BACKUP_DIR/full"
        "$BACKUP_DIR/incremental"
        "$BACKUP_DIR/wal"
        "$BACKUP_DIR/basebackup"
        "$BACKUP_DIR/temp"
    )
    
    for dir in "${dirs[@]}"; do
        mkdir -p "$dir"
    done
    
    log "备份目录创建完成"
}

# 获取数据库列表
get_databases() {
    psql -t -c "SELECT datname FROM pg_database WHERE datistemplate = false AND datname != 'postgres';" | grep -v '^$'
}

# 逻辑备份(pg_dump)
logical_backup() {
    local database="$1"
    local timestamp=$(date '+%Y%m%d_%H%M%S')
    local backup_file="$BACKUP_DIR/full/${database}_logical_${timestamp}.sql"
    
    log "开始逻辑备份: $database"
    
    # 执行备份
    pg_dump \
        --verbose \
        --format=custom \
        --compress=6 \
        --no-owner \
        --no-privileges \
        --create \
        --clean \
        --if-exists \
        --file="$backup_file" \
        "$database"
    
    if [[ $? -eq 0 ]]; then
        log "逻辑备份完成: $backup_file"
        
        # 压缩备份文件
        if [[ "$COMPRESSION" == "true" ]]; then
            compress_backup "$backup_file"
        fi
        
        # 加密备份文件
        if [[ "$ENCRYPTION" == "true" ]]; then
            encrypt_backup "$backup_file"
        fi
        
        # 验证备份
        verify_backup "$backup_file"
        
        # 生成备份元数据
        generate_backup_metadata "$backup_file" "logical" "$database"
        
    else
        error "逻辑备份失败: $database"
        return 1
    fi
}

# 物理备份(pg_basebackup)
physical_backup() {
    local timestamp=$(date '+%Y%m%d_%H%M%S')
    local backup_dir="$BACKUP_DIR/basebackup/basebackup_${timestamp}"
    
    log "开始物理备份"
    
    mkdir -p "$backup_dir"
    
    # 执行基础备份
    pg_basebackup \
        --verbose \
        --progress \
        --format=tar \
        --gzip \
        --compress=6 \
        --checkpoint=fast \
        --wal-method=stream \
        --pgdata="$backup_dir"
    
    if [[ $? -eq 0 ]]; then
        log "物理备份完成: $backup_dir"
        
        # 加密备份文件
        if [[ "$ENCRYPTION" == "true" ]]; then
            for tar_file in "$backup_dir"/*.tar.gz; do
                if [[ -f "$tar_file" ]]; then
                    encrypt_backup "$tar_file"
                fi
            done
        fi
        
        # 生成备份元数据
        generate_backup_metadata "$backup_dir" "physical" "cluster"
        
    else
        error "物理备份失败"
        return 1
    fi
}

# WAL归档备份
wal_archive_backup() {
    local wal_source_dir="$1"
    local timestamp=$(date '+%Y%m%d_%H%M%S')
    local archive_dir="$WAL_ARCHIVE_DIR/${timestamp}"
    
    log "开始WAL归档备份"
    
    mkdir -p "$archive_dir"
    
    # 复制WAL文件
    if [[ -d "$wal_source_dir" ]]; then
        rsync -av --progress "$wal_source_dir"/ "$archive_dir"/
        
        if [[ $? -eq 0 ]]; then
            log "WAL归档完成: $archive_dir"
            
            # 压缩WAL文件
            if [[ "$COMPRESSION" == "true" ]]; then
                find "$archive_dir" -name "*.wal" -exec gzip {} \;
            fi
            
            # 生成备份元数据
            generate_backup_metadata "$archive_dir" "wal" "cluster"
        else
            error "WAL归档失败"
            return 1
        fi
    else
        error "WAL源目录不存在: $wal_source_dir"
        return 1
    fi
}

# 压缩备份文件
compress_backup() {
    local backup_file="$1"
    
    log "压缩备份文件: $backup_file"
    
    gzip "$backup_file"
    
    if [[ $? -eq 0 ]]; then
        log "压缩完成: ${backup_file}.gz"
    else
        error "压缩失败: $backup_file"
        return 1
    fi
}

# 加密备份文件
encrypt_backup() {
    local backup_file="$1"
    
    if [[ -z "$ENCRYPTION_KEY" ]]; then
        error "加密密钥未设置"
        return 1
    fi
    
    log "加密备份文件: $backup_file"
    
    openssl enc -aes-256-cbc -salt -in "$backup_file" -out "${backup_file}.enc" -k "$ENCRYPTION_KEY"
    
    if [[ $? -eq 0 ]]; then
        rm "$backup_file"
        log "加密完成: ${backup_file}.enc"
    else
        error "加密失败: $backup_file"
        return 1
    fi
}

# 验证备份文件
verify_backup() {
    local backup_file="$1"
    
    log "验证备份文件: $backup_file"
    
    # 检查文件是否存在且不为空
    if [[ ! -f "$backup_file" ]] || [[ ! -s "$backup_file" ]]; then
        error "备份文件无效: $backup_file"
        return 1
    fi
    
    # 生成校验和
    local checksum=$(sha256sum "$backup_file" | awk '{print $1}')
    echo "$checksum" > "${backup_file}.sha256"
    
    log "备份验证完成,校验和: $checksum"
}

# 生成备份元数据
generate_backup_metadata() {
    local backup_file="$1"
    local backup_type="$2"
    local database="$3"
    local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
    
    local metadata_file="${backup_file}.metadata"
    
    cat > "$metadata_file" << EOF
{
    "backup_type": "$backup_type",
    "database": "$database",
    "timestamp": "$timestamp",
    "backup_file": "$backup_file",
    "file_size": $(stat -c%s "$backup_file" 2>/dev/null || echo "0"),
    "postgresql_version": "$(psql -t -c "SELECT version();" | head -n 1 | xargs)",
    "compression": "$COMPRESSION",
    "encryption": "$ENCRYPTION",
    "checksum": "$(cat "${backup_file}.sha256" 2>/dev/null || echo "N/A")"
}
EOF
    
    log "备份元数据生成完成: $metadata_file"
}

# 恢复数据库
restore_database() {
    local backup_file="$1"
    local target_database="$2"
    local target_host="${3:-$PGHOST}"
    local target_port="${4:-$PGPORT}"
    
    log "开始恢复数据库: $target_database"
    
    # 检查备份文件
    if [[ ! -f "$backup_file" ]]; then
        error "备份文件不存在: $backup_file"
        return 1
    fi
    
    # 解密备份文件(如果需要)
    local restore_file="$backup_file"
    if [[ "$backup_file" == *.enc ]]; then
        restore_file="${backup_file%.enc}"
        log "解密备份文件: $backup_file"
        openssl enc -aes-256-cbc -d -in "$backup_file" -out "$restore_file" -k "$ENCRYPTION_KEY"
    fi
    
    # 解压备份文件(如果需要)
    if [[ "$restore_file" == *.gz ]]; then
        log "解压备份文件: $restore_file"
        gunzip "$restore_file"
        restore_file="${restore_file%.gz}"
    fi
    
    # 创建目标数据库
    PGHOST="$target_host" PGPORT="$target_port" createdb "$target_database" 2>/dev/null || true
    
    # 恢复数据
    log "恢复数据到数据库: $target_database"
    PGHOST="$target_host" PGPORT="$target_port" pg_restore \
        --verbose \
        --clean \
        --if-exists \
        --create \
        --dbname="$target_database" \
        "$restore_file"
    
    if [[ $? -eq 0 ]]; then
        log "数据库恢复完成: $target_database"
        
        # 验证恢复结果
        local table_count=$(PGHOST="$target_host" PGPORT="$target_port" psql -t -d "$target_database" \
            -c "SELECT count(*) FROM information_schema.tables WHERE table_schema = 'public';" | xargs)
        
        log "恢复验证 - 表数量: $table_count"
        
    else
        error "数据库恢复失败: $target_database"
        return 1
    fi
    
    # 清理临时文件
    if [[ "$restore_file" != "$backup_file" ]]; then
        rm -f "$restore_file"
    fi
}

# 时间点恢复
point_in_time_recovery() {
    local target_time="$1"
    local base_backup_dir="$2"
    local wal_archive_dir="$3"
    local recovery_dir="$4"
    
    log "开始时间点恢复到: $target_time"
    
    # 创建恢复目录
    mkdir -p "$recovery_dir"
    
    # 解压基础备份
    if [[ -f "$base_backup_dir/base.tar.gz" ]]; then
        tar -xzf "$base_backup_dir/base.tar.gz" -C "$recovery_dir"
    else
        error "基础备份文件不存在"
        return 1
    fi
    
    # 创建recovery.conf文件
    cat > "$recovery_dir/recovery.conf" << EOF
restore_command = 'cp $wal_archive_dir/%f %p'
recovery_target_time = '$target_time'
recovery_target_timeline = 'latest'
EOF
    
    log "时间点恢复配置完成: $recovery_dir"
    log "请手动启动PostgreSQL实例进行恢复"
}

# 清理过期备份
cleanup_old_backups() {
    log "开始清理过期备份"
    
    # 清理逻辑备份
    find "$BACKUP_DIR/full" -name "*.sql*" -mtime +$RETENTION_DAYS -delete
    find "$BACKUP_DIR/full" -name "*.metadata" -mtime +$RETENTION_DAYS -delete
    find "$BACKUP_DIR/full" -name "*.sha256" -mtime +$RETENTION_DAYS -delete
    
    # 清理物理备份
    find "$BACKUP_DIR/basebackup" -type d -mtime +$RETENTION_DAYS -exec rm -rf {} +
    
    # 清理WAL归档
    find "$WAL_ARCHIVE_DIR" -name "*.wal*" -mtime +$RETENTION_DAYS -delete
    
    log "过期备份清理完成"
}

# 主函数
main() {
    case "${1:-backup}" in
        "logical")
            load_config
            check_dependencies
            create_backup_directories
            if [[ $# -lt 2 ]]; then
                echo "用法: $0 logical <database>"
                exit 1
            fi
            logical_backup "$2"
            ;;
        "physical")
            load_config
            check_dependencies
            create_backup_directories
            physical_backup
            ;;
        "wal")
            load_config
            check_dependencies
            create_backup_directories
            if [[ $# -lt 2 ]]; then
                echo "用法: $0 wal <wal_source_dir>"
                exit 1
            fi
            wal_archive_backup "$2"
            ;;
        "restore")
            if [[ $# -lt 3 ]]; then
                echo "用法: $0 restore <backup_file> <target_database> [target_host] [target_port]"
                exit 1
            fi
            load_config
            restore_database "$2" "$3" "$4" "$5"
            ;;
        "pitr")
            if [[ $# -lt 5 ]]; then
                echo "用法: $0 pitr <target_time> <base_backup_dir> <wal_archive_dir> <recovery_dir>"
                exit 1
            fi
            load_config
            point_in_time_recovery "$2" "$3" "$4" "$5"
            ;;
        "cleanup")
            load_config
            cleanup_old_backups
            ;;
        *)
            echo "用法: $0 {logical|physical|wal|restore|pitr|cleanup}"
            echo "  logical <database>                           - 逻辑备份"
            echo "  physical                                     - 物理备份"
            echo "  wal <wal_source_dir>                        - WAL归档"
            echo "  restore <backup_file> <target_database>     - 恢复数据库"
            echo "  pitr <time> <base_backup> <wal_dir> <recovery_dir> - 时间点恢复"
            echo "  cleanup                                      - 清理过期备份"
            exit 1
            ;;
    esac
}

main "$@"

跨平台备份管理系统

1. 统一备份管理器

#!/usr/bin/env python3
# scripts/unified_backup_manager.py

import os
import json
import yaml
import logging
import subprocess
import threading
import time
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import schedule
import boto3
from azure.storage.blob import BlobServiceClient
from google.cloud import storage as gcs

class DatabaseType(Enum):
    MYSQL = "mysql"
    POSTGRESQL = "postgresql"
    MONGODB = "mongodb"
    REDIS = "redis"
    ELASTICSEARCH = "elasticsearch"

class BackupType(Enum):
    FULL = "full"
    INCREMENTAL = "incremental"
    DIFFERENTIAL = "differential"
    LOG = "log"

class BackupStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class BackupJob:
    id: str
    database_type: DatabaseType
    backup_type: BackupType
    database_name: str
    schedule: str
    retention_days: int
    compression: bool
    encryption: bool
    storage_locations: List[str]
    status: BackupStatus = BackupStatus.PENDING
    created_at: datetime = None
    started_at: datetime = None
    completed_at: datetime = None
    error_message: str = None
    backup_size: int = 0
    backup_files: List[str] = None

    def __post_init__(self):
        if self.created_at is None:
            self.created_at = datetime.now()
        if self.backup_files is None:
            self.backup_files = []

class UnifiedBackupManager:
    def __init__(self, config_file: str):
        self.config = self._load_config(config_file)
        self.logger = self._setup_logging()
        self.jobs: Dict[str, BackupJob] = {}
        self.running_jobs: Dict[str, threading.Thread] = {}
        
        # 初始化云存储客户端
        self._init_cloud_storage()
        
        # 启动调度器
        self.scheduler_thread = None
        self.stop_scheduler = False
    
    def _load_config(self, config_file: str) -> Dict[str, Any]:
        """加载配置文件"""
        with open(config_file, 'r', encoding='utf-8') as f:
            if config_file.endswith('.yaml') or config_file.endswith('.yml'):
                return yaml.safe_load(f)
            else:
                return json.load(f)
    
    def _setup_logging(self) -> logging.Logger:
        """设置日志记录"""
        logger = logging.getLogger('UnifiedBackupManager')
        logger.setLevel(logging.INFO)
        
        # 文件处理器
        file_handler = logging.FileHandler(
            self.config.get('log_file', '/var/log/unified_backup.log')
        )
        file_formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        file_handler.setFormatter(file_formatter)
        logger.addHandler(file_handler)
        
        # 控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(file_formatter)
        logger.addHandler(console_handler)
        
        return logger
    
    def _init_cloud_storage(self):
        """初始化云存储客户端"""
        self.cloud_clients = {}
        
        # AWS S3
        if 'aws' in self.config.get('cloud_storage', {}):
            aws_config = self.config['cloud_storage']['aws']
            self.cloud_clients['s3'] = boto3.client(
                's3',
                aws_access_key_id=aws_config.get('access_key_id'),
                aws_secret_access_key=aws_config.get('secret_access_key'),
                region_name=aws_config.get('region', 'us-west-2')
            )
        
        # Azure Blob Storage
        if 'azure' in self.config.get('cloud_storage', {}):
            azure_config = self.config['cloud_storage']['azure']
            self.cloud_clients['azure'] = BlobServiceClient(
                account_url=f"https://{azure_config['account_name']}.blob.core.windows.net",
                credential=azure_config['account_key']
            )
        
        # Google Cloud Storage
        if 'gcp' in self.config.get('cloud_storage', {}):
            gcp_config = self.config['cloud_storage']['gcp']
            os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = gcp_config['credentials_file']
            self.cloud_clients['gcs'] = gcs.Client()
    
    def create_backup_job(self, job_config: Dict[str, Any]) -> str:
        """创建备份任务"""
        job_id = f"{job_config['database_type']}_{job_config['database_name']}_{int(time.time())}"
        
        job = BackupJob(
            id=job_id,
            database_type=DatabaseType(job_config['database_type']),
            backup_type=BackupType(job_config['backup_type']),
            database_name=job_config['database_name'],
            schedule=job_config['schedule'],
            retention_days=job_config.get('retention_days', 7),
            compression=job_config.get('compression', True),
            encryption=job_config.get('encryption', True),
            storage_locations=job_config.get('storage_locations', ['local'])
        )
        
        self.jobs[job_id] = job
        self.logger.info(f"备份任务创建成功: {job_id}")
        
        # 注册调度任务
        self._schedule_job(job)
        
        return job_id
    
    def _schedule_job(self, job: BackupJob):
        """注册调度任务"""
        if job.schedule == 'manual':
            return
        
        # 解析调度表达式并注册
        if job.schedule.startswith('cron:'):
            # 简化的cron支持
            cron_expr = job.schedule[5:]
            # 这里可以实现更复杂的cron解析
            schedule.every().day.at("02:00").do(self._execute_backup_job, job.id)
        elif job.schedule.startswith('interval:'):
            # 间隔调度
            interval = job.schedule[9:]
            if interval.endswith('h'):
                hours = int(interval[:-1])
                schedule.every(hours).hours.do(self._execute_backup_job, job.id)
            elif interval.endswith('d'):
                days = int(interval[:-1])
                schedule.every(days).days.do(self._execute_backup_job, job.id)
    
    def execute_backup_job(self, job_id: str) -> bool:
        """执行备份任务"""
        if job_id not in self.jobs:
            self.logger.error(f"备份任务不存在: {job_id}")
            return False
        
        job = self.jobs[job_id]
        
        if job.status == BackupStatus.RUNNING:
            self.logger.warning(f"备份任务正在运行: {job_id}")
            return False
        
        # 在新线程中执行备份
        backup_thread = threading.Thread(
            target=self._execute_backup_job,
            args=(job_id,)
        )
        backup_thread.start()
        self.running_jobs[job_id] = backup_thread
        
        return True
    
    def _execute_backup_job(self, job_id: str):
        """执行备份任务的内部方法"""
        job = self.jobs[job_id]
        job.status = BackupStatus.RUNNING
        job.started_at = datetime.now()
        
        self.logger.info(f"开始执行备份任务: {job_id}")
        
        try:
            # 根据数据库类型执行相应的备份
            if job.database_type == DatabaseType.MYSQL:
                backup_files = self._backup_mysql(job)
            elif job.database_type == DatabaseType.POSTGRESQL:
                backup_files = self._backup_postgresql(job)
            elif job.database_type == DatabaseType.MONGODB:
                backup_files = self._backup_mongodb(job)
            elif job.database_type == DatabaseType.REDIS:
                backup_files = self._backup_redis(job)
            elif job.database_type == DatabaseType.ELASTICSEARCH:
                backup_files = self._backup_elasticsearch(job)
            else:
                raise ValueError(f"不支持的数据库类型: {job.database_type}")
            
            job.backup_files = backup_files
            job.backup_size = sum(os.path.getsize(f) for f in backup_files if os.path.exists(f))
            
            # 上传到存储位置
            self._upload_backups(job)
            
            # 清理过期备份
            self._cleanup_old_backups(job)
            
            job.status = BackupStatus.COMPLETED
            job.completed_at = datetime.now()
            
            self.logger.info(f"备份任务完成: {job_id}")
            
        except Exception as e:
            job.status = BackupStatus.FAILED
            job.error_message = str(e)
            job.completed_at = datetime.now()
            
            self.logger.error(f"备份任务失败: {job_id}, 错误: {e}")
        
        finally:
            # 清理运行中的任务记录
            if job_id in self.running_jobs:
                del self.running_jobs[job_id]
    
    def _backup_mysql(self, job: BackupJob) -> List[str]:
        """MySQL备份"""
        db_config = self.config['databases']['mysql']
        backup_dir = os.path.join(self.config['backup_base_dir'], 'mysql')
        os.makedirs(backup_dir, exist_ok=True)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_file = os.path.join(backup_dir, f"{job.database_name}_{timestamp}.sql")
        
        # 构建mysqldump命令
        cmd = [
            'mysqldump',
            f"--host={db_config['host']}",
            f"--port={db_config['port']}",
            f"--user={db_config['user']}",
            f"--password={db_config['password']}",
            '--single-transaction',
            '--routines',
            '--triggers',
            '--events',
            job.database_name
        ]
        
        # 执行备份
        with open(backup_file, 'w') as f:
            result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True)
        
        if result.returncode != 0:
            raise Exception(f"MySQL备份失败: {result.stderr}")
        
        backup_files = [backup_file]
        
        # 压缩
        if job.compression:
            compressed_file = f"{backup_file}.gz"
            subprocess.run(['gzip', backup_file], check=True)
            backup_files = [compressed_file]
        
        # 加密
        if job.encryption:
            encrypted_files = []
            for file in backup_files:
                encrypted_file = f"{file}.enc"
                self._encrypt_file(file, encrypted_file)
                os.remove(file)
                encrypted_files.append(encrypted_file)
            backup_files = encrypted_files
        
        return backup_files
    
    def _backup_postgresql(self, job: BackupJob) -> List[str]:
        """PostgreSQL备份"""
        db_config = self.config['databases']['postgresql']
        backup_dir = os.path.join(self.config['backup_base_dir'], 'postgresql')
        os.makedirs(backup_dir, exist_ok=True)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_file = os.path.join(backup_dir, f"{job.database_name}_{timestamp}.dump")
        
        # 设置环境变量
        env = os.environ.copy()
        env.update({
            'PGHOST': db_config['host'],
            'PGPORT': str(db_config['port']),
            'PGUSER': db_config['user'],
            'PGPASSWORD': db_config['password']
        })
        
        # 构建pg_dump命令
        cmd = [
            'pg_dump',
            '--verbose',
            '--format=custom',
            '--compress=6',
            '--no-owner',
            '--no-privileges',
            '--file', backup_file,
            job.database_name
        ]
        
        # 执行备份
        result = subprocess.run(cmd, env=env, stderr=subprocess.PIPE, text=True)
        
        if result.returncode != 0:
            raise Exception(f"PostgreSQL备份失败: {result.stderr}")
        
        backup_files = [backup_file]
        
        # 加密
        if job.encryption:
            encrypted_file = f"{backup_file}.enc"
            self._encrypt_file(backup_file, encrypted_file)
            os.remove(backup_file)
            backup_files = [encrypted_file]
        
        return backup_files
    
    def _backup_mongodb(self, job: BackupJob) -> List[str]:
        """MongoDB备份"""
        db_config = self.config['databases']['mongodb']
        backup_dir = os.path.join(self.config['backup_base_dir'], 'mongodb')
        os.makedirs(backup_dir, exist_ok=True)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_path = os.path.join(backup_dir, f"{job.database_name}_{timestamp}")
        
        # 构建mongodump命令
        cmd = [
            'mongodump',
            '--host', f"{db_config['host']}:{db_config['port']}",
            '--db', job.database_name,
            '--out', backup_path
        ]
        
        if 'username' in db_config:
            cmd.extend(['--username', db_config['username']])
            cmd.extend(['--password', db_config['password']])
        
        # 执行备份
        result = subprocess.run(cmd, stderr=subprocess.PIPE, text=True)
        
        if result.returncode != 0:
            raise Exception(f"MongoDB备份失败: {result.stderr}")
        
        # 压缩备份目录
        archive_file = f"{backup_path}.tar.gz"
        subprocess.run(['tar', '-czf', archive_file, '-C', backup_dir, os.path.basename(backup_path)], check=True)
        
        # 删除原始目录
        subprocess.run(['rm', '-rf', backup_path], check=True)
        
        backup_files = [archive_file]
        
        # 加密
        if job.encryption:
            encrypted_file = f"{archive_file}.enc"
            self._encrypt_file(archive_file, encrypted_file)
            os.remove(archive_file)
            backup_files = [encrypted_file]
        
        return backup_files
    
    def _backup_redis(self, job: BackupJob) -> List[str]:
        """Redis备份"""
        db_config = self.config['databases']['redis']
        backup_dir = os.path.join(self.config['backup_base_dir'], 'redis')
        os.makedirs(backup_dir, exist_ok=True)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_file = os.path.join(backup_dir, f"redis_{timestamp}.rdb")
        
        # 使用redis-cli执行BGSAVE
        cmd = [
            'redis-cli',
            '-h', db_config['host'],
            '-p', str(db_config['port']),
            'BGSAVE'
        ]
        
        if 'password' in db_config:
            cmd.extend(['-a', db_config['password']])
        
        # 执行后台保存
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode != 0:
            raise Exception(f"Redis BGSAVE失败: {result.stderr}")
        
        # 等待备份完成
        time.sleep(5)
        
        # 复制RDB文件
        rdb_source = db_config.get('rdb_file', '/var/lib/redis/dump.rdb')
        subprocess.run(['cp', rdb_source, backup_file], check=True)
        
        backup_files = [backup_file]
        
        # 压缩和加密
        if job.compression:
            compressed_file = f"{backup_file}.gz"
            subprocess.run(['gzip', backup_file], check=True)
            backup_files = [compressed_file]
        
        if job.encryption:
            encrypted_files = []
            for file in backup_files:
                encrypted_file = f"{file}.enc"
                self._encrypt_file(file, encrypted_file)
                os.remove(file)
                encrypted_files.append(encrypted_file)
            backup_files = encrypted_files
        
        return backup_files
    
    def _backup_elasticsearch(self, job: BackupJob) -> List[str]:
        """Elasticsearch备份"""
        # 这里实现Elasticsearch快照备份
        # 由于篇幅限制,这里只是一个简化的实现
        backup_dir = os.path.join(self.config['backup_base_dir'], 'elasticsearch')
        os.makedirs(backup_dir, exist_ok=True)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        snapshot_name = f"{job.database_name}_{timestamp}"
        
        # 这里应该调用Elasticsearch API创建快照
        # 简化实现,返回空列表
        return []
    
    def _encrypt_file(self, input_file: str, output_file: str):
        """加密文件"""
        encryption_key = self.config.get('encryption_key', 'default_key')
        
        cmd = [
            'openssl', 'enc', '-aes-256-cbc', '-salt',
            '-in', input_file,
            '-out', output_file,
            '-k', encryption_key
        ]
        
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode != 0:
            raise Exception(f"文件加密失败: {result.stderr}")
    
    def _upload_backups(self, job: BackupJob):
        """上传备份文件到存储位置"""
        for location in job.storage_locations:
            if location == 'local':
                continue  # 本地存储无需上传
            elif location.startswith('s3://'):
                self._upload_to_s3(job, location)
            elif location.startswith('azure://'):
                self._upload_to_azure(job, location)
            elif location.startswith('gcs://'):
                self._upload_to_gcs(job, location)
    
    def _upload_to_s3(self, job: BackupJob, s3_url: str):
        """上传到AWS S3"""
        if 's3' not in self.cloud_clients:
            self.logger.warning("S3客户端未配置")
            return
        
        bucket_name = s3_url.replace('s3://', '').split('/')[0]
        s3_prefix = '/'.join(s3_url.replace('s3://', '').split('/')[1:])
        
        s3_client = self.cloud_clients['s3']
        
        for backup_file in job.backup_files:
            file_name = os.path.basename(backup_file)
            s3_key = f"{s3_prefix}/{job.database_type.value}/{file_name}"
            
            try:
                s3_client.upload_file(backup_file, bucket_name, s3_key)
                self.logger.info(f"文件上传到S3成功: {s3_key}")
            except Exception as e:
                self.logger.error(f"S3上传失败: {e}")
    
    def _upload_to_azure(self, job: BackupJob, azure_url: str):
        """上传到Azure Blob Storage"""
        # 实现Azure上传逻辑
        pass
    
    def _upload_to_gcs(self, job: BackupJob, gcs_url: str):
        """上传到Google Cloud Storage"""
        # 实现GCS上传逻辑
        pass
    
    def _cleanup_old_backups(self, job: BackupJob):
        """清理过期备份"""
        cutoff_date = datetime.now() - timedelta(days=job.retention_days)
        
        # 清理本地备份
        backup_dir = os.path.join(
            self.config['backup_base_dir'],
            job.database_type.value
        )
        
        if os.path.exists(backup_dir):
            for file in os.listdir(backup_dir):
                file_path = os.path.join(backup_dir, file)
                if os.path.isfile(file_path):
                    file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
                    if file_mtime < cutoff_date:
                        os.remove(file_path)
                        self.logger.info(f"删除过期备份: {file_path}")
    
    def get_job_status(self, job_id: str) -> Optional[BackupJob]:
        """获取任务状态"""
        return self.jobs.get(job_id)
    
    def list_jobs(self) -> List[BackupJob]:
        """列出所有任务"""
        return list(self.jobs.values())
    
    def cancel_job(self, job_id: str) -> bool:
        """取消任务"""
        if job_id not in self.jobs:
            return False
        
        job = self.jobs[job_id]
        
        if job.status == BackupStatus.RUNNING:
            # 尝试停止运行中的线程
            if job_id in self.running_jobs:
                # 注意:Python线程无法强制停止,这里只是标记
                job.status = BackupStatus.CANCELLED
                return True
        
        return False
    
    def start_scheduler(self):
        """启动调度器"""
        def run_scheduler():
            while not self.stop_scheduler:
                schedule.run_pending()
                time.sleep(60)  # 每分钟检查一次
        
        self.scheduler_thread = threading.Thread(target=run_scheduler)
        self.scheduler_thread.start()
        self.logger.info("备份调度器已启动")
    
    def stop_scheduler(self):
        """停止调度器"""
        self.stop_scheduler = True
        if self.scheduler_thread:
            self.scheduler_thread.join()
        self.logger.info("备份调度器已停止")

def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='统一备份管理器')
    parser.add_argument('--config', required=True, help='配置文件路径')
    parser.add_argument('--action', choices=['start', 'create', 'execute', 'status', 'list'], 
                       default='start', help='执行的操作')
    parser.add_argument('--job-config', help='任务配置文件(用于create操作)')
    parser.add_argument('--job-id', help='任务ID(用于execute和status操作)')
    
    args = parser.parse_args()
    
    # 创建备份管理器
    manager = UnifiedBackupManager(args.config)
    
    if args.action == 'start':
        # 启动调度器
        manager.start_scheduler()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            manager.stop_scheduler()
    
    elif args.action == 'create':
        if not args.job_config:
            print("错误:需要提供任务配置文件")
            exit(1)
        
        with open(args.job_config, 'r') as f:
            job_config = json.load(f)
        
        job_id = manager.create_backup_job(job_config)
        print(f"任务创建成功,ID: {job_id}")
    
    elif args.action == 'execute':
        if not args.job_id:
            print("错误:需要提供任务ID")
            exit(1)
        
        success = manager.execute_backup_job(args.job_id)
        if success:
            print(f"任务 {args.job_id} 开始执行")
        else:
            print(f"任务 {args.job_id} 执行失败")
    
    elif args.action == 'status':
        if not args.job_id:
            print("错误:需要提供任务ID")
            exit(1)
        
        job = manager.get_job_status(args.job_id)
        if job:
            print(f"任务状态: {job.status.value}")
            print(f"创建时间: {job.created_at}")
            if job.started_at:
                print(f"开始时间: {job.started_at}")
            if job.completed_at:
                print(f"完成时间: {job.completed_at}")
            if job.error_message:
                print(f"错误信息: {job.error_message}")
        else:
            print(f"任务 {args.job_id} 不存在")
    
    elif args.action == 'list':
        jobs = manager.list_jobs()
        print(f"共有 {len(jobs)} 个任务:")
        for job in jobs:
            print(f"  {job.id}: {job.database_type.value}/{job.database_name} - {job.status.value}")

if __name__ == "__main__":
    main()

总结

本文深入探讨了数据库备份与恢复的完整解决方案,涵盖了以下核心内容:

核心要点

  1. 备份策略设计

    • 多层次备份架构(全量、增量、差异、日志)
    • 灵活的调度策略和保留策略
    • 压缩和加密支持
  2. 多数据库支持

    • MySQL的逻辑和物理备份
    • PostgreSQL的pg_dump和基础备份
    • MongoDB的mongodump和oplog
    • Redis的RDB和AOF备份
  3. 高级功能

    • 时间点恢复(PITR)
    • 跨平台备份管理
    • 云存储集成
    • 自动化验证和监控
  4. 最佳实践

    • 定期备份验证
    • 灾难恢复演练
    • 监控和告警机制
    • 安全性和合规性考虑

通过本文提供的脚本和工具,可以构建一个完整的企业级数据库备份与恢复系统,确保数据的安全性和业务的连续性。

分享文章