数据库备份与恢复策略:从基础备份到灾难恢复的完整方案
数据库备份与恢复是数据库管理中最关键的环节之一,直接关系到数据的安全性和业务的连续性。本文将深入探讨现代数据库备份与恢复的最佳实践,从基础的备份策略到复杂的灾难恢复方案。
备份策略设计与规划
1. 备份策略架构
graph TB
A[备份策略规划] --> B[备份类型选择]
A --> C[备份频率设计]
A --> D[存储策略规划]
B --> E[全量备份]
B --> F[增量备份]
B --> G[差异备份]
B --> H[日志备份]
C --> I[实时备份]
C --> J[定时备份]
C --> K[触发备份]
D --> L[本地存储]
D --> M[远程存储]
D --> N[云存储]
D --> O[多地备份]
E --> P[完整数据副本]
F --> Q[变更数据]
G --> R[差异数据]
H --> S[事务日志]
P --> T[恢复基准点]
Q --> T
R --> T
S --> U[时间点恢复]
2. 备份策略配置
# config/backup_strategy.yaml
backup_strategy:
# 全局配置
global:
retention_policy:
daily_backups: 7 # 保留7天的日备份
weekly_backups: 4 # 保留4周的周备份
monthly_backups: 12 # 保留12个月的月备份
yearly_backups: 3 # 保留3年的年备份
compression:
enabled: true
algorithm: "gzip" # gzip, lz4, zstd
level: 6
encryption:
enabled: true
algorithm: "AES-256"
key_rotation_days: 90
verification:
enabled: true
checksum_algorithm: "SHA-256"
test_restore: true
test_frequency: "weekly"
# MySQL备份策略
mysql:
databases:
- name: "production_db"
backup_type: "full"
schedule: "0 2 * * *" # 每天凌晨2点
storage_location: "/backup/mysql/full"
- name: "production_db"
backup_type: "incremental"
schedule: "0 */6 * * *" # 每6小时
storage_location: "/backup/mysql/incremental"
- name: "production_db"
backup_type: "binlog"
schedule: "*/10 * * * *" # 每10分钟
storage_location: "/backup/mysql/binlog"
options:
single_transaction: true
routines: true
triggers: true
events: true
lock_tables: false
master_data: 2
# PostgreSQL备份策略
postgresql:
databases:
- name: "production_db"
backup_type: "full"
schedule: "0 3 * * *"
storage_location: "/backup/postgresql/full"
- name: "production_db"
backup_type: "wal"
continuous: true
storage_location: "/backup/postgresql/wal"
options:
format: "custom"
compression_level: 6
verbose: true
no_owner: true
no_privileges: false
# MongoDB备份策略
mongodb:
databases:
- name: "production_db"
backup_type: "full"
schedule: "0 4 * * *"
storage_location: "/backup/mongodb/full"
- name: "production_db"
backup_type: "oplog"
continuous: true
storage_location: "/backup/mongodb/oplog"
options:
gzip: true
repair: false
oplog: true
journal: true
# 存储配置
storage:
local:
path: "/backup"
max_size: "1TB"
cleanup_policy: "retention"
remote:
type: "nfs"
server: "backup-server.company.com"
path: "/shared/backups"
mount_options: "rw,sync,hard,intr"
cloud:
provider: "aws_s3"
bucket: "company-database-backups"
region: "us-west-2"
storage_class: "STANDARD_IA"
lifecycle_policy:
transition_to_glacier: 30
delete_after: 2555 # 7 years
# 监控和告警
monitoring:
alerts:
backup_failure:
enabled: true
notification_channels: ["email", "slack"]
escalation_time: 30 # minutes
backup_size_anomaly:
enabled: true
threshold_percentage: 50
notification_channels: ["email"]
storage_space_low:
enabled: true
threshold_percentage: 85
notification_channels: ["email", "slack"]
metrics:
backup_duration: true
backup_size: true
success_rate: true
storage_utilization: true
MySQL备份与恢复实现
1. 自动化备份系统
#!/bin/bash
# scripts/mysql_backup_system.sh
set -euo pipefail
# 配置文件路径
CONFIG_FILE="/etc/mysql-backup/config.conf"
LOG_FILE="/var/log/mysql-backup.log"
# 默认配置
MYSQL_HOST="${MYSQL_HOST:-localhost}"
MYSQL_PORT="${MYSQL_PORT:-3306}"
MYSQL_USER="${MYSQL_USER:-backup_user}"
MYSQL_PASSWORD="${MYSQL_PASSWORD:-}"
BACKUP_DIR="${BACKUP_DIR:-/backup/mysql}"
RETENTION_DAYS="${RETENTION_DAYS:-7}"
COMPRESSION="${COMPRESSION:-true}"
ENCRYPTION="${ENCRYPTION:-true}"
ENCRYPTION_KEY="${ENCRYPTION_KEY:-}"
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
error() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] ERROR: $1" | tee -a "$LOG_FILE" >&2
}
# 加载配置文件
load_config() {
if [[ -f "$CONFIG_FILE" ]]; then
source "$CONFIG_FILE"
log "配置文件加载完成: $CONFIG_FILE"
else
log "使用默认配置"
fi
}
# 检查依赖
check_dependencies() {
local deps=("mysqldump" "mysql" "gzip" "openssl")
for dep in "${deps[@]}"; do
if ! command -v "$dep" &> /dev/null; then
error "依赖项未找到: $dep"
exit 1
fi
done
log "依赖检查完成"
}
# 创建备份目录
create_backup_directories() {
local dirs=(
"$BACKUP_DIR/full"
"$BACKUP_DIR/incremental"
"$BACKUP_DIR/binlog"
"$BACKUP_DIR/temp"
)
for dir in "${dirs[@]}"; do
mkdir -p "$dir"
done
log "备份目录创建完成"
}
# 获取数据库列表
get_databases() {
mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" \
-e "SHOW DATABASES;" | grep -Ev '^(Database|information_schema|performance_schema|mysql|sys)$'
}
# 全量备份
full_backup() {
local database="$1"
local timestamp=$(date '+%Y%m%d_%H%M%S')
local backup_file="$BACKUP_DIR/full/${database}_full_${timestamp}.sql"
log "开始全量备份: $database"
# 执行备份
mysqldump \
-h"$MYSQL_HOST" \
-P"$MYSQL_PORT" \
-u"$MYSQL_USER" \
-p"$MYSQL_PASSWORD" \
--single-transaction \
--routines \
--triggers \
--events \
--master-data=2 \
--flush-logs \
--hex-blob \
--default-character-set=utf8mb4 \
"$database" > "$backup_file"
if [[ $? -eq 0 ]]; then
log "全量备份完成: $backup_file"
# 压缩备份文件
if [[ "$COMPRESSION" == "true" ]]; then
compress_backup "$backup_file"
fi
# 加密备份文件
if [[ "$ENCRYPTION" == "true" ]]; then
encrypt_backup "$backup_file"
fi
# 验证备份
verify_backup "$backup_file"
# 生成备份元数据
generate_backup_metadata "$backup_file" "full" "$database"
else
error "全量备份失败: $database"
return 1
fi
}
# 增量备份(基于binlog)
incremental_backup() {
local database="$1"
local timestamp=$(date '+%Y%m%d_%H%M%S')
local backup_dir="$BACKUP_DIR/incremental/${database}_${timestamp}"
log "开始增量备份: $database"
mkdir -p "$backup_dir"
# 获取当前binlog位置
local binlog_info=$(mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" \
-e "SHOW MASTER STATUS\G" | grep -E "(File|Position)")
echo "$binlog_info" > "$backup_dir/binlog_position.txt"
# 刷新binlog
mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" \
-e "FLUSH LOGS;"
# 复制binlog文件
local binlog_dir=$(mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" \
-e "SHOW VARIABLES LIKE 'log_bin_basename';" | awk 'NR==2 {print $2}' | xargs dirname)
# 获取最新的binlog文件
local latest_binlog=$(mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" \
-e "SHOW BINARY LOGS;" | tail -n 1 | awk '{print $1}')
if [[ -n "$latest_binlog" ]]; then
cp "$binlog_dir/$latest_binlog" "$backup_dir/"
log "增量备份完成: $backup_dir"
# 生成备份元数据
generate_backup_metadata "$backup_dir" "incremental" "$database"
else
error "增量备份失败: 无法获取binlog文件"
return 1
fi
}
# 压缩备份文件
compress_backup() {
local backup_file="$1"
log "压缩备份文件: $backup_file"
gzip "$backup_file"
if [[ $? -eq 0 ]]; then
log "压缩完成: ${backup_file}.gz"
else
error "压缩失败: $backup_file"
return 1
fi
}
# 加密备份文件
encrypt_backup() {
local backup_file="$1"
if [[ -z "$ENCRYPTION_KEY" ]]; then
error "加密密钥未设置"
return 1
fi
log "加密备份文件: $backup_file"
# 如果文件已压缩,处理.gz文件
if [[ "$backup_file" == *.gz ]]; then
openssl enc -aes-256-cbc -salt -in "$backup_file" -out "${backup_file}.enc" -k "$ENCRYPTION_KEY"
rm "$backup_file"
else
openssl enc -aes-256-cbc -salt -in "$backup_file" -out "${backup_file}.enc" -k "$ENCRYPTION_KEY"
rm "$backup_file"
fi
if [[ $? -eq 0 ]]; then
log "加密完成: ${backup_file}.enc"
else
error "加密失败: $backup_file"
return 1
fi
}
# 验证备份文件
verify_backup() {
local backup_file="$1"
log "验证备份文件: $backup_file"
# 检查文件是否存在且不为空
if [[ ! -f "$backup_file" ]] || [[ ! -s "$backup_file" ]]; then
error "备份文件无效: $backup_file"
return 1
fi
# 生成校验和
local checksum=$(sha256sum "$backup_file" | awk '{print $1}')
echo "$checksum" > "${backup_file}.sha256"
log "备份验证完成,校验和: $checksum"
}
# 生成备份元数据
generate_backup_metadata() {
local backup_file="$1"
local backup_type="$2"
local database="$3"
local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
local metadata_file="${backup_file}.metadata"
cat > "$metadata_file" << EOF
{
"backup_type": "$backup_type",
"database": "$database",
"timestamp": "$timestamp",
"backup_file": "$backup_file",
"file_size": $(stat -c%s "$backup_file" 2>/dev/null || echo "0"),
"mysql_version": "$(mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "SELECT VERSION();" | tail -n 1)",
"compression": "$COMPRESSION",
"encryption": "$ENCRYPTION",
"checksum": "$(cat "${backup_file}.sha256" 2>/dev/null || echo "N/A")"
}
EOF
log "备份元数据生成完成: $metadata_file"
}
# 清理过期备份
cleanup_old_backups() {
log "开始清理过期备份"
# 清理全量备份
find "$BACKUP_DIR/full" -name "*.sql*" -mtime +$RETENTION_DAYS -delete
find "$BACKUP_DIR/full" -name "*.metadata" -mtime +$RETENTION_DAYS -delete
find "$BACKUP_DIR/full" -name "*.sha256" -mtime +$RETENTION_DAYS -delete
# 清理增量备份
find "$BACKUP_DIR/incremental" -type d -mtime +$RETENTION_DAYS -exec rm -rf {} +
# 清理binlog备份
find "$BACKUP_DIR/binlog" -name "*.log*" -mtime +$RETENTION_DAYS -delete
log "过期备份清理完成"
}
# 备份状态报告
generate_backup_report() {
local report_file="$BACKUP_DIR/backup_report_$(date '+%Y%m%d').txt"
cat > "$report_file" << EOF
MySQL备份状态报告
生成时间: $(date '+%Y-%m-%d %H:%M:%S')
备份目录统计:
$(du -sh "$BACKUP_DIR"/* 2>/dev/null || echo "无备份数据")
最近的备份文件:
全量备份:
$(find "$BACKUP_DIR/full" -name "*.sql*" -mtime -1 -exec ls -lh {} \; 2>/dev/null | head -5)
增量备份:
$(find "$BACKUP_DIR/incremental" -type d -mtime -1 -exec ls -lhd {} \; 2>/dev/null | head -5)
存储空间使用:
$(df -h "$BACKUP_DIR" | tail -n 1)
备份验证状态:
$(find "$BACKUP_DIR" -name "*.sha256" -mtime -1 | wc -l) 个文件已验证
EOF
log "备份报告生成完成: $report_file"
}
# 发送告警通知
send_alert() {
local message="$1"
local severity="${2:-INFO}"
# 发送邮件告警(如果配置了邮件)
if [[ -n "${ALERT_EMAIL:-}" ]]; then
echo "$message" | mail -s "MySQL备份告警 - $severity" "$ALERT_EMAIL"
fi
# 发送Slack通知(如果配置了Webhook)
if [[ -n "${SLACK_WEBHOOK:-}" ]]; then
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"MySQL备份告警 - $severity: $message\"}" \
"$SLACK_WEBHOOK"
fi
log "告警通知已发送: $message"
}
# 主备份函数
main_backup() {
local backup_type="${1:-full}"
local database="${2:-all}"
log "开始MySQL备份任务 - 类型: $backup_type, 数据库: $database"
# 检查MySQL连接
if ! mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "SELECT 1;" &>/dev/null; then
error "无法连接到MySQL服务器"
send_alert "MySQL备份失败: 无法连接到数据库服务器" "CRITICAL"
exit 1
fi
# 获取数据库列表
local databases
if [[ "$database" == "all" ]]; then
databases=$(get_databases)
else
databases="$database"
fi
# 执行备份
local backup_count=0
local failed_count=0
for db in $databases; do
case "$backup_type" in
"full")
if full_backup "$db"; then
((backup_count++))
else
((failed_count++))
fi
;;
"incremental")
if incremental_backup "$db"; then
((backup_count++))
else
((failed_count++))
fi
;;
*)
error "不支持的备份类型: $backup_type"
exit 1
;;
esac
done
# 清理过期备份
cleanup_old_backups
# 生成备份报告
generate_backup_report
# 发送完成通知
if [[ $failed_count -eq 0 ]]; then
log "备份任务完成 - 成功: $backup_count, 失败: $failed_count"
send_alert "MySQL备份任务完成 - 成功备份 $backup_count 个数据库" "INFO"
else
error "备份任务完成但有失败 - 成功: $backup_count, 失败: $failed_count"
send_alert "MySQL备份任务部分失败 - 成功: $backup_count, 失败: $failed_count" "WARNING"
fi
}
# 恢复函数
restore_database() {
local backup_file="$1"
local target_database="$2"
local target_host="${3:-$MYSQL_HOST}"
local target_port="${4:-$MYSQL_PORT}"
log "开始恢复数据库: $target_database"
# 检查备份文件
if [[ ! -f "$backup_file" ]]; then
error "备份文件不存在: $backup_file"
return 1
fi
# 解密备份文件(如果需要)
local restore_file="$backup_file"
if [[ "$backup_file" == *.enc ]]; then
restore_file="${backup_file%.enc}"
log "解密备份文件: $backup_file"
openssl enc -aes-256-cbc -d -in "$backup_file" -out "$restore_file" -k "$ENCRYPTION_KEY"
fi
# 解压备份文件(如果需要)
if [[ "$restore_file" == *.gz ]]; then
log "解压备份文件: $restore_file"
gunzip "$restore_file"
restore_file="${restore_file%.gz}"
fi
# 创建目标数据库
mysql -h"$target_host" -P"$target_port" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" \
-e "CREATE DATABASE IF NOT EXISTS \`$target_database\`;"
# 恢复数据
log "恢复数据到数据库: $target_database"
mysql -h"$target_host" -P"$target_port" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" \
"$target_database" < "$restore_file"
if [[ $? -eq 0 ]]; then
log "数据库恢复完成: $target_database"
# 验证恢复结果
local table_count=$(mysql -h"$target_host" -P"$target_port" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" \
-e "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema='$target_database';" | tail -n 1)
log "恢复验证 - 表数量: $table_count"
else
error "数据库恢复失败: $target_database"
return 1
fi
# 清理临时文件
if [[ "$restore_file" != "$backup_file" ]]; then
rm -f "$restore_file"
fi
}
# 时间点恢复
point_in_time_recovery() {
local database="$1"
local target_time="$2"
local backup_file="$3"
log "开始时间点恢复: $database 到 $target_time"
# 恢复全量备份
restore_database "$backup_file" "${database}_pitr_temp"
# 应用binlog到指定时间点
local binlog_dir="$BACKUP_DIR/binlog"
local binlog_files=$(find "$binlog_dir" -name "*.log" -newer "$backup_file" | sort)
for binlog_file in $binlog_files; do
log "应用binlog: $binlog_file"
mysqlbinlog --stop-datetime="$target_time" "$binlog_file" | \
mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" \
"${database}_pitr_temp"
done
log "时间点恢复完成: ${database}_pitr_temp"
}
# 主函数
main() {
case "${1:-backup}" in
"backup")
load_config
check_dependencies
create_backup_directories
main_backup "${2:-full}" "${3:-all}"
;;
"restore")
if [[ $# -lt 3 ]]; then
echo "用法: $0 restore <backup_file> <target_database> [target_host] [target_port]"
exit 1
fi
load_config
restore_database "$2" "$3" "$4" "$5"
;;
"pitr")
if [[ $# -lt 4 ]]; then
echo "用法: $0 pitr <database> <target_time> <backup_file>"
exit 1
fi
load_config
point_in_time_recovery "$2" "$3" "$4"
;;
"cleanup")
load_config
cleanup_old_backups
;;
"report")
load_config
generate_backup_report
;;
*)
echo "用法: $0 {backup|restore|pitr|cleanup|report}"
echo " backup [full|incremental] [database|all] - 执行备份"
echo " restore <backup_file> <target_database> - 恢复数据库"
echo " pitr <database> <time> <backup_file> - 时间点恢复"
echo " cleanup - 清理过期备份"
echo " report - 生成备份报告"
exit 1
;;
esac
}
main "$@"
2. MySQL恢复验证脚本
#!/usr/bin/env python3
# scripts/mysql_recovery_validator.py
import mysql.connector
import argparse
import json
import logging
import hashlib
import time
from datetime import datetime
from typing import Dict, List, Tuple, Any
class MySQLRecoveryValidator:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = self._setup_logging()
# 数据库连接配置
self.source_conn = None
self.target_conn = None
def _setup_logging(self) -> logging.Logger:
"""设置日志记录"""
logger = logging.getLogger('MySQLRecoveryValidator')
logger.setLevel(logging.INFO)
handler = logging.FileHandler('/var/log/mysql_recovery_validation.log')
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
# 控制台输出
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
def connect_databases(self):
"""连接源数据库和目标数据库"""
try:
# 连接源数据库
self.source_conn = mysql.connector.connect(
**self.config['source_database']
)
# 连接目标数据库
self.target_conn = mysql.connector.connect(
**self.config['target_database']
)
self.logger.info("数据库连接建立成功")
except Exception as e:
self.logger.error(f"数据库连接失败: {e}")
raise
def validate_schema_consistency(self, database_name: str) -> Dict[str, Any]:
"""验证模式一致性"""
self.logger.info(f"开始验证模式一致性: {database_name}")
validation_result = {
'database': database_name,
'schema_consistent': True,
'differences': [],
'table_count_match': True,
'index_count_match': True,
'constraint_count_match': True
}
try:
source_cursor = self.source_conn.cursor(dictionary=True)
target_cursor = self.target_conn.cursor(dictionary=True)
# 验证表结构
table_diff = self._compare_tables(source_cursor, target_cursor, database_name)
if table_diff:
validation_result['schema_consistent'] = False
validation_result['differences'].extend(table_diff)
# 验证索引
index_diff = self._compare_indexes(source_cursor, target_cursor, database_name)
if index_diff:
validation_result['schema_consistent'] = False
validation_result['differences'].extend(index_diff)
# 验证约束
constraint_diff = self._compare_constraints(source_cursor, target_cursor, database_name)
if constraint_diff:
validation_result['schema_consistent'] = False
validation_result['differences'].extend(constraint_diff)
source_cursor.close()
target_cursor.close()
except Exception as e:
self.logger.error(f"模式验证失败: {e}")
validation_result['schema_consistent'] = False
validation_result['error'] = str(e)
return validation_result
def _compare_tables(self, source_cursor, target_cursor, database_name: str) -> List[str]:
"""比较表结构"""
differences = []
# 获取源数据库表信息
source_cursor.execute(f"""
SELECT table_name, table_type, engine, table_rows, data_length, index_length
FROM information_schema.tables
WHERE table_schema = '{database_name}'
ORDER BY table_name
""")
source_tables = {row['table_name']: row for row in source_cursor.fetchall()}
# 获取目标数据库表信息
target_cursor.execute(f"""
SELECT table_name, table_type, engine, table_rows, data_length, index_length
FROM information_schema.tables
WHERE table_schema = '{database_name}'
ORDER BY table_name
""")
target_tables = {row['table_name']: row for row in target_cursor.fetchall()}
# 比较表
source_table_names = set(source_tables.keys())
target_table_names = set(target_tables.keys())
# 检查缺失的表
missing_tables = source_table_names - target_table_names
if missing_tables:
differences.append(f"目标数据库缺失表: {', '.join(missing_tables)}")
# 检查多余的表
extra_tables = target_table_names - source_table_names
if extra_tables:
differences.append(f"目标数据库多余表: {', '.join(extra_tables)}")
# 比较共同表的结构
common_tables = source_table_names & target_table_names
for table_name in common_tables:
source_table = source_tables[table_name]
target_table = target_tables[table_name]
if source_table['engine'] != target_table['engine']:
differences.append(f"表 {table_name} 存储引擎不匹配: {source_table['engine']} vs {target_table['engine']}")
# 比较列结构
column_diff = self._compare_columns(source_cursor, target_cursor, database_name, table_name)
differences.extend(column_diff)
return differences
def _compare_columns(self, source_cursor, target_cursor, database_name: str, table_name: str) -> List[str]:
"""比较列结构"""
differences = []
# 获取源表列信息
source_cursor.execute(f"""
SELECT column_name, data_type, is_nullable, column_default, extra
FROM information_schema.columns
WHERE table_schema = '{database_name}' AND table_name = '{table_name}'
ORDER BY ordinal_position
""")
source_columns = {row['column_name']: row for row in source_cursor.fetchall()}
# 获取目标表列信息
target_cursor.execute(f"""
SELECT column_name, data_type, is_nullable, column_default, extra
FROM information_schema.columns
WHERE table_schema = '{database_name}' AND table_name = '{table_name}'
ORDER BY ordinal_position
""")
target_columns = {row['column_name']: row for row in target_cursor.fetchall()}
# 比较列
source_column_names = set(source_columns.keys())
target_column_names = set(target_columns.keys())
# 检查缺失的列
missing_columns = source_column_names - target_column_names
if missing_columns:
differences.append(f"表 {table_name} 缺失列: {', '.join(missing_columns)}")
# 检查多余的列
extra_columns = target_column_names - source_column_names
if extra_columns:
differences.append(f"表 {table_name} 多余列: {', '.join(extra_columns)}")
# 比较共同列的属性
common_columns = source_column_names & target_column_names
for column_name in common_columns:
source_col = source_columns[column_name]
target_col = target_columns[column_name]
if source_col['data_type'] != target_col['data_type']:
differences.append(f"表 {table_name} 列 {column_name} 数据类型不匹配: {source_col['data_type']} vs {target_col['data_type']}")
if source_col['is_nullable'] != target_col['is_nullable']:
differences.append(f"表 {table_name} 列 {column_name} 可空属性不匹配")
return differences
def _compare_indexes(self, source_cursor, target_cursor, database_name: str) -> List[str]:
"""比较索引"""
differences = []
# 获取源数据库索引信息
source_cursor.execute(f"""
SELECT table_name, index_name, column_name, seq_in_index, non_unique
FROM information_schema.statistics
WHERE table_schema = '{database_name}'
ORDER BY table_name, index_name, seq_in_index
""")
source_indexes = {}
for row in source_cursor.fetchall():
key = f"{row['table_name']}.{row['index_name']}"
if key not in source_indexes:
source_indexes[key] = []
source_indexes[key].append(row)
# 获取目标数据库索引信息
target_cursor.execute(f"""
SELECT table_name, index_name, column_name, seq_in_index, non_unique
FROM information_schema.statistics
WHERE table_schema = '{database_name}'
ORDER BY table_name, index_name, seq_in_index
""")
target_indexes = {}
for row in target_cursor.fetchall():
key = f"{row['table_name']}.{row['index_name']}"
if key not in target_indexes:
target_indexes[key] = []
target_indexes[key].append(row)
# 比较索引
source_index_names = set(source_indexes.keys())
target_index_names = set(target_indexes.keys())
missing_indexes = source_index_names - target_index_names
if missing_indexes:
differences.append(f"缺失索引: {', '.join(missing_indexes)}")
extra_indexes = target_index_names - source_index_names
if extra_indexes:
differences.append(f"多余索引: {', '.join(extra_indexes)}")
return differences
def _compare_constraints(self, source_cursor, target_cursor, database_name: str) -> List[str]:
"""比较约束"""
differences = []
# 获取外键约束
source_cursor.execute(f"""
SELECT constraint_name, table_name, column_name, referenced_table_name, referenced_column_name
FROM information_schema.key_column_usage
WHERE table_schema = '{database_name}' AND referenced_table_name IS NOT NULL
ORDER BY constraint_name
""")
source_constraints = set()
for row in source_cursor.fetchall():
constraint_key = f"{row['table_name']}.{row['constraint_name']}"
source_constraints.add(constraint_key)
target_cursor.execute(f"""
SELECT constraint_name, table_name, column_name, referenced_table_name, referenced_column_name
FROM information_schema.key_column_usage
WHERE table_schema = '{database_name}' AND referenced_table_name IS NOT NULL
ORDER BY constraint_name
""")
target_constraints = set()
for row in target_cursor.fetchall():
constraint_key = f"{row['table_name']}.{row['constraint_name']}"
target_constraints.add(constraint_key)
# 比较约束
missing_constraints = source_constraints - target_constraints
if missing_constraints:
differences.append(f"缺失约束: {', '.join(missing_constraints)}")
extra_constraints = target_constraints - source_constraints
if extra_constraints:
differences.append(f"多余约束: {', '.join(extra_constraints)}")
return differences
def validate_data_consistency(self, database_name: str, sample_tables: List[str] = None) -> Dict[str, Any]:
"""验证数据一致性"""
self.logger.info(f"开始验证数据一致性: {database_name}")
validation_result = {
'database': database_name,
'data_consistent': True,
'table_results': {},
'total_tables_checked': 0,
'inconsistent_tables': []
}
try:
source_cursor = self.source_conn.cursor(dictionary=True)
target_cursor = self.target_conn.cursor(dictionary=True)
# 获取要检查的表列表
if sample_tables:
tables_to_check = sample_tables
else:
source_cursor.execute(f"""
SELECT table_name FROM information_schema.tables
WHERE table_schema = '{database_name}' AND table_type = 'BASE TABLE'
ORDER BY table_name
""")
tables_to_check = [row['table_name'] for row in source_cursor.fetchall()]
validation_result['total_tables_checked'] = len(tables_to_check)
# 检查每个表的数据一致性
for table_name in tables_to_check:
table_result = self._validate_table_data(
source_cursor, target_cursor, database_name, table_name
)
validation_result['table_results'][table_name] = table_result
if not table_result['consistent']:
validation_result['data_consistent'] = False
validation_result['inconsistent_tables'].append(table_name)
source_cursor.close()
target_cursor.close()
except Exception as e:
self.logger.error(f"数据一致性验证失败: {e}")
validation_result['data_consistent'] = False
validation_result['error'] = str(e)
return validation_result
def _validate_table_data(self, source_cursor, target_cursor, database_name: str, table_name: str) -> Dict[str, Any]:
"""验证单个表的数据一致性"""
table_result = {
'table_name': table_name,
'consistent': True,
'row_count_match': True,
'checksum_match': True,
'source_row_count': 0,
'target_row_count': 0,
'source_checksum': '',
'target_checksum': ''
}
try:
# 比较行数
source_cursor.execute(f"SELECT COUNT(*) as count FROM `{database_name}`.`{table_name}`")
source_count = source_cursor.fetchone()['count']
table_result['source_row_count'] = source_count
target_cursor.execute(f"SELECT COUNT(*) as count FROM `{database_name}`.`{table_name}`")
target_count = target_cursor.fetchone()['count']
table_result['target_row_count'] = target_count
if source_count != target_count:
table_result['consistent'] = False
table_result['row_count_match'] = False
self.logger.warning(f"表 {table_name} 行数不匹配: {source_count} vs {target_count}")
# 比较数据校验和(仅对小表进行)
if source_count <= 100000: # 只对10万行以下的表进行校验和比较
source_checksum = self._calculate_table_checksum(source_cursor, database_name, table_name)
target_checksum = self._calculate_table_checksum(target_cursor, database_name, table_name)
table_result['source_checksum'] = source_checksum
table_result['target_checksum'] = target_checksum
if source_checksum != target_checksum:
table_result['consistent'] = False
table_result['checksum_match'] = False
self.logger.warning(f"表 {table_name} 校验和不匹配")
except Exception as e:
self.logger.error(f"表 {table_name} 数据验证失败: {e}")
table_result['consistent'] = False
table_result['error'] = str(e)
return table_result
def _calculate_table_checksum(self, cursor, database_name: str, table_name: str) -> str:
"""计算表数据校验和"""
try:
# 获取表的所有列
cursor.execute(f"""
SELECT column_name FROM information_schema.columns
WHERE table_schema = '{database_name}' AND table_name = '{table_name}'
ORDER BY ordinal_position
""")
columns = [row['column_name'] for row in cursor.fetchall()]
if not columns:
return ""
# 构建查询语句
column_list = ', '.join([f'COALESCE(`{col}`, "NULL")' for col in columns])
query = f"""
SELECT MD5(GROUP_CONCAT(
MD5(CONCAT({column_list}))
ORDER BY {columns[0]}
)) as checksum
FROM `{database_name}`.`{table_name}`
"""
cursor.execute(query)
result = cursor.fetchone()
return result['checksum'] if result else ""
except Exception as e:
self.logger.error(f"计算表 {table_name} 校验和失败: {e}")
return ""
def validate_performance_metrics(self, database_name: str) -> Dict[str, Any]:
"""验证性能指标"""
self.logger.info(f"开始验证性能指标: {database_name}")
validation_result = {
'database': database_name,
'performance_acceptable': True,
'query_performance': {},
'connection_test': {},
'load_test': {}
}
try:
# 测试查询性能
query_result = self._test_query_performance(database_name)
validation_result['query_performance'] = query_result
# 测试连接性能
connection_result = self._test_connection_performance()
validation_result['connection_test'] = connection_result
# 简单负载测试
load_result = self._test_simple_load(database_name)
validation_result['load_test'] = load_result
# 判断整体性能是否可接受
if (query_result.get('average_response_time', 0) > 1000 or # 1秒
connection_result.get('average_connection_time', 0) > 500 or # 0.5秒
load_result.get('success_rate', 0) < 0.95): # 95%成功率
validation_result['performance_acceptable'] = False
except Exception as e:
self.logger.error(f"性能验证失败: {e}")
validation_result['performance_acceptable'] = False
validation_result['error'] = str(e)
return validation_result
def _test_query_performance(self, database_name: str) -> Dict[str, Any]:
"""测试查询性能"""
result = {
'queries_tested': 0,
'total_time': 0,
'average_response_time': 0,
'max_response_time': 0,
'min_response_time': float('inf')
}
try:
cursor = self.target_conn.cursor()
# 测试查询列表
test_queries = [
f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = '{database_name}'",
f"SELECT table_name, table_rows FROM information_schema.tables WHERE table_schema = '{database_name}' LIMIT 10",
"SELECT NOW()",
"SELECT VERSION()",
"SHOW STATUS LIKE 'Threads_connected'"
]
response_times = []
for query in test_queries:
start_time = time.time()
cursor.execute(query)
cursor.fetchall()
end_time = time.time()
response_time = (end_time - start_time) * 1000 # 转换为毫秒
response_times.append(response_time)
result['queries_tested'] += 1
if response_times:
result['total_time'] = sum(response_times)
result['average_response_time'] = sum(response_times) / len(response_times)
result['max_response_time'] = max(response_times)
result['min_response_time'] = min(response_times)
cursor.close()
except Exception as e:
self.logger.error(f"查询性能测试失败: {e}")
result['error'] = str(e)
return result
def _test_connection_performance(self) -> Dict[str, Any]:
"""测试连接性能"""
result = {
'connections_tested': 0,
'successful_connections': 0,
'average_connection_time': 0,
'max_connection_time': 0,
'min_connection_time': float('inf')
}
try:
connection_times = []
# 测试10次连接
for i in range(10):
start_time = time.time()
try:
test_conn = mysql.connector.connect(
**self.config['target_database']
)
test_conn.close()
end_time = time.time()
connection_time = (end_time - start_time) * 1000 # 转换为毫秒
connection_times.append(connection_time)
result['successful_connections'] += 1
except Exception:
pass
result['connections_tested'] += 1
if connection_times:
result['average_connection_time'] = sum(connection_times) / len(connection_times)
result['max_connection_time'] = max(connection_times)
result['min_connection_time'] = min(connection_times)
except Exception as e:
self.logger.error(f"连接性能测试失败: {e}")
result['error'] = str(e)
return result
def _test_simple_load(self, database_name: str) -> Dict[str, Any]:
"""简单负载测试"""
result = {
'total_operations': 0,
'successful_operations': 0,
'failed_operations': 0,
'success_rate': 0,
'average_operation_time': 0
}
try:
cursor = self.target_conn.cursor()
operation_times = []
# 执行50次简单查询
for i in range(50):
start_time = time.time()
try:
cursor.execute(f"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = '{database_name}'")
cursor.fetchone()
end_time = time.time()
operation_time = (end_time - start_time) * 1000
operation_times.append(operation_time)
result['successful_operations'] += 1
except Exception:
result['failed_operations'] += 1
result['total_operations'] += 1
if result['total_operations'] > 0:
result['success_rate'] = result['successful_operations'] / result['total_operations']
if operation_times:
result['average_operation_time'] = sum(operation_times) / len(operation_times)
cursor.close()
except Exception as e:
self.logger.error(f"负载测试失败: {e}")
result['error'] = str(e)
return result
def generate_validation_report(self, validation_results: Dict[str, Any]) -> str:
"""生成验证报告"""
report_file = f"/var/reports/mysql_recovery_validation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
# 添加报告元数据
validation_results['report_metadata'] = {
'generated_at': datetime.now().isoformat(),
'validator_version': '1.0.0',
'source_database': self.config['source_database']['host'],
'target_database': self.config['target_database']['host']
}
# 写入JSON报告
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(validation_results, f, indent=2, ensure_ascii=False)
self.logger.info(f"验证报告生成完成: {report_file}")
return report_file
def run_full_validation(self, database_name: str, sample_tables: List[str] = None) -> Dict[str, Any]:
"""运行完整验证"""
self.logger.info(f"开始完整验证: {database_name}")
validation_results = {
'database': database_name,
'validation_start_time': datetime.now().isoformat(),
'overall_status': 'PASS',
'schema_validation': {},
'data_validation': {},
'performance_validation': {}
}
try:
# 连接数据库
self.connect_databases()
# 模式验证
schema_result = self.validate_schema_consistency(database_name)
validation_results['schema_validation'] = schema_result
if not schema_result['schema_consistent']:
validation_results['overall_status'] = 'FAIL'
# 数据验证
data_result = self.validate_data_consistency(database_name, sample_tables)
validation_results['data_validation'] = data_result
if not data_result['data_consistent']:
validation_results['overall_status'] = 'FAIL'
# 性能验证
performance_result = self.validate_performance_metrics(database_name)
validation_results['performance_validation'] = performance_result
if not performance_result['performance_acceptable']:
if validation_results['overall_status'] == 'PASS':
validation_results['overall_status'] = 'WARNING'
validation_results['validation_end_time'] = datetime.now().isoformat()
# 生成报告
report_file = self.generate_validation_report(validation_results)
validation_results['report_file'] = report_file
except Exception as e:
self.logger.error(f"验证过程失败: {e}")
validation_results['overall_status'] = 'ERROR'
validation_results['error'] = str(e)
finally:
# 关闭数据库连接
if self.source_conn:
self.source_conn.close()
if self.target_conn:
self.target_conn.close()
return validation_results
def main():
parser = argparse.ArgumentParser(description='MySQL恢复验证工具')
parser.add_argument('--config', required=True, help='配置文件路径')
parser.add_argument('--database', required=True, help='要验证的数据库名称')
parser.add_argument('--sample-tables', nargs='*', help='要验证的样本表列表')
parser.add_argument('--schema-only', action='store_true', help='仅验证模式')
parser.add_argument('--data-only', action='store_true', help='仅验证数据')
parser.add_argument('--performance-only', action='store_true', help='仅验证性能')
args = parser.parse_args()
# 加载配置
with open(args.config, 'r') as f:
config = json.load(f)
# 创建验证器
validator = MySQLRecoveryValidator(config)
try:
if args.schema_only:
validator.connect_databases()
result = validator.validate_schema_consistency(args.database)
print(json.dumps(result, indent=2))
elif args.data_only:
validator.connect_databases()
result = validator.validate_data_consistency(args.database, args.sample_tables)
print(json.dumps(result, indent=2))
elif args.performance_only:
validator.connect_databases()
result = validator.validate_performance_metrics(args.database)
print(json.dumps(result, indent=2))
else:
# 完整验证
result = validator.run_full_validation(args.database, args.sample_tables)
print(f"验证完成,状态: {result['overall_status']}")
print(f"报告文件: {result.get('report_file', 'N/A')}")
except Exception as e:
print(f"验证失败: {e}")
exit(1)
if __name__ == "__main__":
main()
PostgreSQL备份与恢复实现
1. PostgreSQL备份系统
#!/bin/bash
# scripts/postgresql_backup_system.sh
set -euo pipefail
# 配置文件路径
CONFIG_FILE="/etc/postgresql-backup/config.conf"
LOG_FILE="/var/log/postgresql-backup.log"
# 默认配置
PGHOST="${PGHOST:-localhost}"
PGPORT="${PGPORT:-5432}"
PGUSER="${PGUSER:-backup_user}"
PGPASSWORD="${PGPASSWORD:-}"
BACKUP_DIR="${BACKUP_DIR:-/backup/postgresql}"
WAL_ARCHIVE_DIR="${WAL_ARCHIVE_DIR:-/backup/postgresql/wal}"
RETENTION_DAYS="${RETENTION_DAYS:-7}"
COMPRESSION="${COMPRESSION:-true}"
ENCRYPTION="${ENCRYPTION:-true}"
ENCRYPTION_KEY="${ENCRYPTION_KEY:-}"
# 导出PostgreSQL环境变量
export PGHOST PGPORT PGUSER PGPASSWORD
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
error() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] ERROR: $1" | tee -a "$LOG_FILE" >&2
}
# 加载配置文件
load_config() {
if [[ -f "$CONFIG_FILE" ]]; then
source "$CONFIG_FILE"
log "配置文件加载完成: $CONFIG_FILE"
else
log "使用默认配置"
fi
}
# 检查依赖
check_dependencies() {
local deps=("pg_dump" "pg_dumpall" "pg_basebackup" "psql" "gzip" "openssl")
for dep in "${deps[@]}"; do
if ! command -v "$dep" &> /dev/null; then
error "依赖项未找到: $dep"
exit 1
fi
done
log "依赖检查完成"
}
# 创建备份目录
create_backup_directories() {
local dirs=(
"$BACKUP_DIR/full"
"$BACKUP_DIR/incremental"
"$BACKUP_DIR/wal"
"$BACKUP_DIR/basebackup"
"$BACKUP_DIR/temp"
)
for dir in "${dirs[@]}"; do
mkdir -p "$dir"
done
log "备份目录创建完成"
}
# 获取数据库列表
get_databases() {
psql -t -c "SELECT datname FROM pg_database WHERE datistemplate = false AND datname != 'postgres';" | grep -v '^$'
}
# 逻辑备份(pg_dump)
logical_backup() {
local database="$1"
local timestamp=$(date '+%Y%m%d_%H%M%S')
local backup_file="$BACKUP_DIR/full/${database}_logical_${timestamp}.sql"
log "开始逻辑备份: $database"
# 执行备份
pg_dump \
--verbose \
--format=custom \
--compress=6 \
--no-owner \
--no-privileges \
--create \
--clean \
--if-exists \
--file="$backup_file" \
"$database"
if [[ $? -eq 0 ]]; then
log "逻辑备份完成: $backup_file"
# 压缩备份文件
if [[ "$COMPRESSION" == "true" ]]; then
compress_backup "$backup_file"
fi
# 加密备份文件
if [[ "$ENCRYPTION" == "true" ]]; then
encrypt_backup "$backup_file"
fi
# 验证备份
verify_backup "$backup_file"
# 生成备份元数据
generate_backup_metadata "$backup_file" "logical" "$database"
else
error "逻辑备份失败: $database"
return 1
fi
}
# 物理备份(pg_basebackup)
physical_backup() {
local timestamp=$(date '+%Y%m%d_%H%M%S')
local backup_dir="$BACKUP_DIR/basebackup/basebackup_${timestamp}"
log "开始物理备份"
mkdir -p "$backup_dir"
# 执行基础备份
pg_basebackup \
--verbose \
--progress \
--format=tar \
--gzip \
--compress=6 \
--checkpoint=fast \
--wal-method=stream \
--pgdata="$backup_dir"
if [[ $? -eq 0 ]]; then
log "物理备份完成: $backup_dir"
# 加密备份文件
if [[ "$ENCRYPTION" == "true" ]]; then
for tar_file in "$backup_dir"/*.tar.gz; do
if [[ -f "$tar_file" ]]; then
encrypt_backup "$tar_file"
fi
done
fi
# 生成备份元数据
generate_backup_metadata "$backup_dir" "physical" "cluster"
else
error "物理备份失败"
return 1
fi
}
# WAL归档备份
wal_archive_backup() {
local wal_source_dir="$1"
local timestamp=$(date '+%Y%m%d_%H%M%S')
local archive_dir="$WAL_ARCHIVE_DIR/${timestamp}"
log "开始WAL归档备份"
mkdir -p "$archive_dir"
# 复制WAL文件
if [[ -d "$wal_source_dir" ]]; then
rsync -av --progress "$wal_source_dir"/ "$archive_dir"/
if [[ $? -eq 0 ]]; then
log "WAL归档完成: $archive_dir"
# 压缩WAL文件
if [[ "$COMPRESSION" == "true" ]]; then
find "$archive_dir" -name "*.wal" -exec gzip {} \;
fi
# 生成备份元数据
generate_backup_metadata "$archive_dir" "wal" "cluster"
else
error "WAL归档失败"
return 1
fi
else
error "WAL源目录不存在: $wal_source_dir"
return 1
fi
}
# 压缩备份文件
compress_backup() {
local backup_file="$1"
log "压缩备份文件: $backup_file"
gzip "$backup_file"
if [[ $? -eq 0 ]]; then
log "压缩完成: ${backup_file}.gz"
else
error "压缩失败: $backup_file"
return 1
fi
}
# 加密备份文件
encrypt_backup() {
local backup_file="$1"
if [[ -z "$ENCRYPTION_KEY" ]]; then
error "加密密钥未设置"
return 1
fi
log "加密备份文件: $backup_file"
openssl enc -aes-256-cbc -salt -in "$backup_file" -out "${backup_file}.enc" -k "$ENCRYPTION_KEY"
if [[ $? -eq 0 ]]; then
rm "$backup_file"
log "加密完成: ${backup_file}.enc"
else
error "加密失败: $backup_file"
return 1
fi
}
# 验证备份文件
verify_backup() {
local backup_file="$1"
log "验证备份文件: $backup_file"
# 检查文件是否存在且不为空
if [[ ! -f "$backup_file" ]] || [[ ! -s "$backup_file" ]]; then
error "备份文件无效: $backup_file"
return 1
fi
# 生成校验和
local checksum=$(sha256sum "$backup_file" | awk '{print $1}')
echo "$checksum" > "${backup_file}.sha256"
log "备份验证完成,校验和: $checksum"
}
# 生成备份元数据
generate_backup_metadata() {
local backup_file="$1"
local backup_type="$2"
local database="$3"
local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
local metadata_file="${backup_file}.metadata"
cat > "$metadata_file" << EOF
{
"backup_type": "$backup_type",
"database": "$database",
"timestamp": "$timestamp",
"backup_file": "$backup_file",
"file_size": $(stat -c%s "$backup_file" 2>/dev/null || echo "0"),
"postgresql_version": "$(psql -t -c "SELECT version();" | head -n 1 | xargs)",
"compression": "$COMPRESSION",
"encryption": "$ENCRYPTION",
"checksum": "$(cat "${backup_file}.sha256" 2>/dev/null || echo "N/A")"
}
EOF
log "备份元数据生成完成: $metadata_file"
}
# 恢复数据库
restore_database() {
local backup_file="$1"
local target_database="$2"
local target_host="${3:-$PGHOST}"
local target_port="${4:-$PGPORT}"
log "开始恢复数据库: $target_database"
# 检查备份文件
if [[ ! -f "$backup_file" ]]; then
error "备份文件不存在: $backup_file"
return 1
fi
# 解密备份文件(如果需要)
local restore_file="$backup_file"
if [[ "$backup_file" == *.enc ]]; then
restore_file="${backup_file%.enc}"
log "解密备份文件: $backup_file"
openssl enc -aes-256-cbc -d -in "$backup_file" -out "$restore_file" -k "$ENCRYPTION_KEY"
fi
# 解压备份文件(如果需要)
if [[ "$restore_file" == *.gz ]]; then
log "解压备份文件: $restore_file"
gunzip "$restore_file"
restore_file="${restore_file%.gz}"
fi
# 创建目标数据库
PGHOST="$target_host" PGPORT="$target_port" createdb "$target_database" 2>/dev/null || true
# 恢复数据
log "恢复数据到数据库: $target_database"
PGHOST="$target_host" PGPORT="$target_port" pg_restore \
--verbose \
--clean \
--if-exists \
--create \
--dbname="$target_database" \
"$restore_file"
if [[ $? -eq 0 ]]; then
log "数据库恢复完成: $target_database"
# 验证恢复结果
local table_count=$(PGHOST="$target_host" PGPORT="$target_port" psql -t -d "$target_database" \
-c "SELECT count(*) FROM information_schema.tables WHERE table_schema = 'public';" | xargs)
log "恢复验证 - 表数量: $table_count"
else
error "数据库恢复失败: $target_database"
return 1
fi
# 清理临时文件
if [[ "$restore_file" != "$backup_file" ]]; then
rm -f "$restore_file"
fi
}
# 时间点恢复
point_in_time_recovery() {
local target_time="$1"
local base_backup_dir="$2"
local wal_archive_dir="$3"
local recovery_dir="$4"
log "开始时间点恢复到: $target_time"
# 创建恢复目录
mkdir -p "$recovery_dir"
# 解压基础备份
if [[ -f "$base_backup_dir/base.tar.gz" ]]; then
tar -xzf "$base_backup_dir/base.tar.gz" -C "$recovery_dir"
else
error "基础备份文件不存在"
return 1
fi
# 创建recovery.conf文件
cat > "$recovery_dir/recovery.conf" << EOF
restore_command = 'cp $wal_archive_dir/%f %p'
recovery_target_time = '$target_time'
recovery_target_timeline = 'latest'
EOF
log "时间点恢复配置完成: $recovery_dir"
log "请手动启动PostgreSQL实例进行恢复"
}
# 清理过期备份
cleanup_old_backups() {
log "开始清理过期备份"
# 清理逻辑备份
find "$BACKUP_DIR/full" -name "*.sql*" -mtime +$RETENTION_DAYS -delete
find "$BACKUP_DIR/full" -name "*.metadata" -mtime +$RETENTION_DAYS -delete
find "$BACKUP_DIR/full" -name "*.sha256" -mtime +$RETENTION_DAYS -delete
# 清理物理备份
find "$BACKUP_DIR/basebackup" -type d -mtime +$RETENTION_DAYS -exec rm -rf {} +
# 清理WAL归档
find "$WAL_ARCHIVE_DIR" -name "*.wal*" -mtime +$RETENTION_DAYS -delete
log "过期备份清理完成"
}
# 主函数
main() {
case "${1:-backup}" in
"logical")
load_config
check_dependencies
create_backup_directories
if [[ $# -lt 2 ]]; then
echo "用法: $0 logical <database>"
exit 1
fi
logical_backup "$2"
;;
"physical")
load_config
check_dependencies
create_backup_directories
physical_backup
;;
"wal")
load_config
check_dependencies
create_backup_directories
if [[ $# -lt 2 ]]; then
echo "用法: $0 wal <wal_source_dir>"
exit 1
fi
wal_archive_backup "$2"
;;
"restore")
if [[ $# -lt 3 ]]; then
echo "用法: $0 restore <backup_file> <target_database> [target_host] [target_port]"
exit 1
fi
load_config
restore_database "$2" "$3" "$4" "$5"
;;
"pitr")
if [[ $# -lt 5 ]]; then
echo "用法: $0 pitr <target_time> <base_backup_dir> <wal_archive_dir> <recovery_dir>"
exit 1
fi
load_config
point_in_time_recovery "$2" "$3" "$4" "$5"
;;
"cleanup")
load_config
cleanup_old_backups
;;
*)
echo "用法: $0 {logical|physical|wal|restore|pitr|cleanup}"
echo " logical <database> - 逻辑备份"
echo " physical - 物理备份"
echo " wal <wal_source_dir> - WAL归档"
echo " restore <backup_file> <target_database> - 恢复数据库"
echo " pitr <time> <base_backup> <wal_dir> <recovery_dir> - 时间点恢复"
echo " cleanup - 清理过期备份"
exit 1
;;
esac
}
main "$@"
跨平台备份管理系统
1. 统一备份管理器
#!/usr/bin/env python3
# scripts/unified_backup_manager.py
import os
import json
import yaml
import logging
import subprocess
import threading
import time
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import schedule
import boto3
from azure.storage.blob import BlobServiceClient
from google.cloud import storage as gcs
class DatabaseType(Enum):
MYSQL = "mysql"
POSTGRESQL = "postgresql"
MONGODB = "mongodb"
REDIS = "redis"
ELASTICSEARCH = "elasticsearch"
class BackupType(Enum):
FULL = "full"
INCREMENTAL = "incremental"
DIFFERENTIAL = "differential"
LOG = "log"
class BackupStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class BackupJob:
id: str
database_type: DatabaseType
backup_type: BackupType
database_name: str
schedule: str
retention_days: int
compression: bool
encryption: bool
storage_locations: List[str]
status: BackupStatus = BackupStatus.PENDING
created_at: datetime = None
started_at: datetime = None
completed_at: datetime = None
error_message: str = None
backup_size: int = 0
backup_files: List[str] = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now()
if self.backup_files is None:
self.backup_files = []
class UnifiedBackupManager:
def __init__(self, config_file: str):
self.config = self._load_config(config_file)
self.logger = self._setup_logging()
self.jobs: Dict[str, BackupJob] = {}
self.running_jobs: Dict[str, threading.Thread] = {}
# 初始化云存储客户端
self._init_cloud_storage()
# 启动调度器
self.scheduler_thread = None
self.stop_scheduler = False
def _load_config(self, config_file: str) -> Dict[str, Any]:
"""加载配置文件"""
with open(config_file, 'r', encoding='utf-8') as f:
if config_file.endswith('.yaml') or config_file.endswith('.yml'):
return yaml.safe_load(f)
else:
return json.load(f)
def _setup_logging(self) -> logging.Logger:
"""设置日志记录"""
logger = logging.getLogger('UnifiedBackupManager')
logger.setLevel(logging.INFO)
# 文件处理器
file_handler = logging.FileHandler(
self.config.get('log_file', '/var/log/unified_backup.log')
)
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(file_formatter)
logger.addHandler(console_handler)
return logger
def _init_cloud_storage(self):
"""初始化云存储客户端"""
self.cloud_clients = {}
# AWS S3
if 'aws' in self.config.get('cloud_storage', {}):
aws_config = self.config['cloud_storage']['aws']
self.cloud_clients['s3'] = boto3.client(
's3',
aws_access_key_id=aws_config.get('access_key_id'),
aws_secret_access_key=aws_config.get('secret_access_key'),
region_name=aws_config.get('region', 'us-west-2')
)
# Azure Blob Storage
if 'azure' in self.config.get('cloud_storage', {}):
azure_config = self.config['cloud_storage']['azure']
self.cloud_clients['azure'] = BlobServiceClient(
account_url=f"https://{azure_config['account_name']}.blob.core.windows.net",
credential=azure_config['account_key']
)
# Google Cloud Storage
if 'gcp' in self.config.get('cloud_storage', {}):
gcp_config = self.config['cloud_storage']['gcp']
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = gcp_config['credentials_file']
self.cloud_clients['gcs'] = gcs.Client()
def create_backup_job(self, job_config: Dict[str, Any]) -> str:
"""创建备份任务"""
job_id = f"{job_config['database_type']}_{job_config['database_name']}_{int(time.time())}"
job = BackupJob(
id=job_id,
database_type=DatabaseType(job_config['database_type']),
backup_type=BackupType(job_config['backup_type']),
database_name=job_config['database_name'],
schedule=job_config['schedule'],
retention_days=job_config.get('retention_days', 7),
compression=job_config.get('compression', True),
encryption=job_config.get('encryption', True),
storage_locations=job_config.get('storage_locations', ['local'])
)
self.jobs[job_id] = job
self.logger.info(f"备份任务创建成功: {job_id}")
# 注册调度任务
self._schedule_job(job)
return job_id
def _schedule_job(self, job: BackupJob):
"""注册调度任务"""
if job.schedule == 'manual':
return
# 解析调度表达式并注册
if job.schedule.startswith('cron:'):
# 简化的cron支持
cron_expr = job.schedule[5:]
# 这里可以实现更复杂的cron解析
schedule.every().day.at("02:00").do(self._execute_backup_job, job.id)
elif job.schedule.startswith('interval:'):
# 间隔调度
interval = job.schedule[9:]
if interval.endswith('h'):
hours = int(interval[:-1])
schedule.every(hours).hours.do(self._execute_backup_job, job.id)
elif interval.endswith('d'):
days = int(interval[:-1])
schedule.every(days).days.do(self._execute_backup_job, job.id)
def execute_backup_job(self, job_id: str) -> bool:
"""执行备份任务"""
if job_id not in self.jobs:
self.logger.error(f"备份任务不存在: {job_id}")
return False
job = self.jobs[job_id]
if job.status == BackupStatus.RUNNING:
self.logger.warning(f"备份任务正在运行: {job_id}")
return False
# 在新线程中执行备份
backup_thread = threading.Thread(
target=self._execute_backup_job,
args=(job_id,)
)
backup_thread.start()
self.running_jobs[job_id] = backup_thread
return True
def _execute_backup_job(self, job_id: str):
"""执行备份任务的内部方法"""
job = self.jobs[job_id]
job.status = BackupStatus.RUNNING
job.started_at = datetime.now()
self.logger.info(f"开始执行备份任务: {job_id}")
try:
# 根据数据库类型执行相应的备份
if job.database_type == DatabaseType.MYSQL:
backup_files = self._backup_mysql(job)
elif job.database_type == DatabaseType.POSTGRESQL:
backup_files = self._backup_postgresql(job)
elif job.database_type == DatabaseType.MONGODB:
backup_files = self._backup_mongodb(job)
elif job.database_type == DatabaseType.REDIS:
backup_files = self._backup_redis(job)
elif job.database_type == DatabaseType.ELASTICSEARCH:
backup_files = self._backup_elasticsearch(job)
else:
raise ValueError(f"不支持的数据库类型: {job.database_type}")
job.backup_files = backup_files
job.backup_size = sum(os.path.getsize(f) for f in backup_files if os.path.exists(f))
# 上传到存储位置
self._upload_backups(job)
# 清理过期备份
self._cleanup_old_backups(job)
job.status = BackupStatus.COMPLETED
job.completed_at = datetime.now()
self.logger.info(f"备份任务完成: {job_id}")
except Exception as e:
job.status = BackupStatus.FAILED
job.error_message = str(e)
job.completed_at = datetime.now()
self.logger.error(f"备份任务失败: {job_id}, 错误: {e}")
finally:
# 清理运行中的任务记录
if job_id in self.running_jobs:
del self.running_jobs[job_id]
def _backup_mysql(self, job: BackupJob) -> List[str]:
"""MySQL备份"""
db_config = self.config['databases']['mysql']
backup_dir = os.path.join(self.config['backup_base_dir'], 'mysql')
os.makedirs(backup_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = os.path.join(backup_dir, f"{job.database_name}_{timestamp}.sql")
# 构建mysqldump命令
cmd = [
'mysqldump',
f"--host={db_config['host']}",
f"--port={db_config['port']}",
f"--user={db_config['user']}",
f"--password={db_config['password']}",
'--single-transaction',
'--routines',
'--triggers',
'--events',
job.database_name
]
# 执行备份
with open(backup_file, 'w') as f:
result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
raise Exception(f"MySQL备份失败: {result.stderr}")
backup_files = [backup_file]
# 压缩
if job.compression:
compressed_file = f"{backup_file}.gz"
subprocess.run(['gzip', backup_file], check=True)
backup_files = [compressed_file]
# 加密
if job.encryption:
encrypted_files = []
for file in backup_files:
encrypted_file = f"{file}.enc"
self._encrypt_file(file, encrypted_file)
os.remove(file)
encrypted_files.append(encrypted_file)
backup_files = encrypted_files
return backup_files
def _backup_postgresql(self, job: BackupJob) -> List[str]:
"""PostgreSQL备份"""
db_config = self.config['databases']['postgresql']
backup_dir = os.path.join(self.config['backup_base_dir'], 'postgresql')
os.makedirs(backup_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = os.path.join(backup_dir, f"{job.database_name}_{timestamp}.dump")
# 设置环境变量
env = os.environ.copy()
env.update({
'PGHOST': db_config['host'],
'PGPORT': str(db_config['port']),
'PGUSER': db_config['user'],
'PGPASSWORD': db_config['password']
})
# 构建pg_dump命令
cmd = [
'pg_dump',
'--verbose',
'--format=custom',
'--compress=6',
'--no-owner',
'--no-privileges',
'--file', backup_file,
job.database_name
]
# 执行备份
result = subprocess.run(cmd, env=env, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
raise Exception(f"PostgreSQL备份失败: {result.stderr}")
backup_files = [backup_file]
# 加密
if job.encryption:
encrypted_file = f"{backup_file}.enc"
self._encrypt_file(backup_file, encrypted_file)
os.remove(backup_file)
backup_files = [encrypted_file]
return backup_files
def _backup_mongodb(self, job: BackupJob) -> List[str]:
"""MongoDB备份"""
db_config = self.config['databases']['mongodb']
backup_dir = os.path.join(self.config['backup_base_dir'], 'mongodb')
os.makedirs(backup_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = os.path.join(backup_dir, f"{job.database_name}_{timestamp}")
# 构建mongodump命令
cmd = [
'mongodump',
'--host', f"{db_config['host']}:{db_config['port']}",
'--db', job.database_name,
'--out', backup_path
]
if 'username' in db_config:
cmd.extend(['--username', db_config['username']])
cmd.extend(['--password', db_config['password']])
# 执行备份
result = subprocess.run(cmd, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
raise Exception(f"MongoDB备份失败: {result.stderr}")
# 压缩备份目录
archive_file = f"{backup_path}.tar.gz"
subprocess.run(['tar', '-czf', archive_file, '-C', backup_dir, os.path.basename(backup_path)], check=True)
# 删除原始目录
subprocess.run(['rm', '-rf', backup_path], check=True)
backup_files = [archive_file]
# 加密
if job.encryption:
encrypted_file = f"{archive_file}.enc"
self._encrypt_file(archive_file, encrypted_file)
os.remove(archive_file)
backup_files = [encrypted_file]
return backup_files
def _backup_redis(self, job: BackupJob) -> List[str]:
"""Redis备份"""
db_config = self.config['databases']['redis']
backup_dir = os.path.join(self.config['backup_base_dir'], 'redis')
os.makedirs(backup_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = os.path.join(backup_dir, f"redis_{timestamp}.rdb")
# 使用redis-cli执行BGSAVE
cmd = [
'redis-cli',
'-h', db_config['host'],
'-p', str(db_config['port']),
'BGSAVE'
]
if 'password' in db_config:
cmd.extend(['-a', db_config['password']])
# 执行后台保存
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Redis BGSAVE失败: {result.stderr}")
# 等待备份完成
time.sleep(5)
# 复制RDB文件
rdb_source = db_config.get('rdb_file', '/var/lib/redis/dump.rdb')
subprocess.run(['cp', rdb_source, backup_file], check=True)
backup_files = [backup_file]
# 压缩和加密
if job.compression:
compressed_file = f"{backup_file}.gz"
subprocess.run(['gzip', backup_file], check=True)
backup_files = [compressed_file]
if job.encryption:
encrypted_files = []
for file in backup_files:
encrypted_file = f"{file}.enc"
self._encrypt_file(file, encrypted_file)
os.remove(file)
encrypted_files.append(encrypted_file)
backup_files = encrypted_files
return backup_files
def _backup_elasticsearch(self, job: BackupJob) -> List[str]:
"""Elasticsearch备份"""
# 这里实现Elasticsearch快照备份
# 由于篇幅限制,这里只是一个简化的实现
backup_dir = os.path.join(self.config['backup_base_dir'], 'elasticsearch')
os.makedirs(backup_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
snapshot_name = f"{job.database_name}_{timestamp}"
# 这里应该调用Elasticsearch API创建快照
# 简化实现,返回空列表
return []
def _encrypt_file(self, input_file: str, output_file: str):
"""加密文件"""
encryption_key = self.config.get('encryption_key', 'default_key')
cmd = [
'openssl', 'enc', '-aes-256-cbc', '-salt',
'-in', input_file,
'-out', output_file,
'-k', encryption_key
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"文件加密失败: {result.stderr}")
def _upload_backups(self, job: BackupJob):
"""上传备份文件到存储位置"""
for location in job.storage_locations:
if location == 'local':
continue # 本地存储无需上传
elif location.startswith('s3://'):
self._upload_to_s3(job, location)
elif location.startswith('azure://'):
self._upload_to_azure(job, location)
elif location.startswith('gcs://'):
self._upload_to_gcs(job, location)
def _upload_to_s3(self, job: BackupJob, s3_url: str):
"""上传到AWS S3"""
if 's3' not in self.cloud_clients:
self.logger.warning("S3客户端未配置")
return
bucket_name = s3_url.replace('s3://', '').split('/')[0]
s3_prefix = '/'.join(s3_url.replace('s3://', '').split('/')[1:])
s3_client = self.cloud_clients['s3']
for backup_file in job.backup_files:
file_name = os.path.basename(backup_file)
s3_key = f"{s3_prefix}/{job.database_type.value}/{file_name}"
try:
s3_client.upload_file(backup_file, bucket_name, s3_key)
self.logger.info(f"文件上传到S3成功: {s3_key}")
except Exception as e:
self.logger.error(f"S3上传失败: {e}")
def _upload_to_azure(self, job: BackupJob, azure_url: str):
"""上传到Azure Blob Storage"""
# 实现Azure上传逻辑
pass
def _upload_to_gcs(self, job: BackupJob, gcs_url: str):
"""上传到Google Cloud Storage"""
# 实现GCS上传逻辑
pass
def _cleanup_old_backups(self, job: BackupJob):
"""清理过期备份"""
cutoff_date = datetime.now() - timedelta(days=job.retention_days)
# 清理本地备份
backup_dir = os.path.join(
self.config['backup_base_dir'],
job.database_type.value
)
if os.path.exists(backup_dir):
for file in os.listdir(backup_dir):
file_path = os.path.join(backup_dir, file)
if os.path.isfile(file_path):
file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_mtime < cutoff_date:
os.remove(file_path)
self.logger.info(f"删除过期备份: {file_path}")
def get_job_status(self, job_id: str) -> Optional[BackupJob]:
"""获取任务状态"""
return self.jobs.get(job_id)
def list_jobs(self) -> List[BackupJob]:
"""列出所有任务"""
return list(self.jobs.values())
def cancel_job(self, job_id: str) -> bool:
"""取消任务"""
if job_id not in self.jobs:
return False
job = self.jobs[job_id]
if job.status == BackupStatus.RUNNING:
# 尝试停止运行中的线程
if job_id in self.running_jobs:
# 注意:Python线程无法强制停止,这里只是标记
job.status = BackupStatus.CANCELLED
return True
return False
def start_scheduler(self):
"""启动调度器"""
def run_scheduler():
while not self.stop_scheduler:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
self.scheduler_thread = threading.Thread(target=run_scheduler)
self.scheduler_thread.start()
self.logger.info("备份调度器已启动")
def stop_scheduler(self):
"""停止调度器"""
self.stop_scheduler = True
if self.scheduler_thread:
self.scheduler_thread.join()
self.logger.info("备份调度器已停止")
def main():
import argparse
parser = argparse.ArgumentParser(description='统一备份管理器')
parser.add_argument('--config', required=True, help='配置文件路径')
parser.add_argument('--action', choices=['start', 'create', 'execute', 'status', 'list'],
default='start', help='执行的操作')
parser.add_argument('--job-config', help='任务配置文件(用于create操作)')
parser.add_argument('--job-id', help='任务ID(用于execute和status操作)')
args = parser.parse_args()
# 创建备份管理器
manager = UnifiedBackupManager(args.config)
if args.action == 'start':
# 启动调度器
manager.start_scheduler()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
manager.stop_scheduler()
elif args.action == 'create':
if not args.job_config:
print("错误:需要提供任务配置文件")
exit(1)
with open(args.job_config, 'r') as f:
job_config = json.load(f)
job_id = manager.create_backup_job(job_config)
print(f"任务创建成功,ID: {job_id}")
elif args.action == 'execute':
if not args.job_id:
print("错误:需要提供任务ID")
exit(1)
success = manager.execute_backup_job(args.job_id)
if success:
print(f"任务 {args.job_id} 开始执行")
else:
print(f"任务 {args.job_id} 执行失败")
elif args.action == 'status':
if not args.job_id:
print("错误:需要提供任务ID")
exit(1)
job = manager.get_job_status(args.job_id)
if job:
print(f"任务状态: {job.status.value}")
print(f"创建时间: {job.created_at}")
if job.started_at:
print(f"开始时间: {job.started_at}")
if job.completed_at:
print(f"完成时间: {job.completed_at}")
if job.error_message:
print(f"错误信息: {job.error_message}")
else:
print(f"任务 {args.job_id} 不存在")
elif args.action == 'list':
jobs = manager.list_jobs()
print(f"共有 {len(jobs)} 个任务:")
for job in jobs:
print(f" {job.id}: {job.database_type.value}/{job.database_name} - {job.status.value}")
if __name__ == "__main__":
main()
总结
本文深入探讨了数据库备份与恢复的完整解决方案,涵盖了以下核心内容:
核心要点
-
备份策略设计
- 多层次备份架构(全量、增量、差异、日志)
- 灵活的调度策略和保留策略
- 压缩和加密支持
-
多数据库支持
- MySQL的逻辑和物理备份
- PostgreSQL的pg_dump和基础备份
- MongoDB的mongodump和oplog
- Redis的RDB和AOF备份
-
高级功能
- 时间点恢复(PITR)
- 跨平台备份管理
- 云存储集成
- 自动化验证和监控
-
最佳实践
- 定期备份验证
- 灾难恢复演练
- 监控和告警机制
- 安全性和合规性考虑
通过本文提供的脚本和工具,可以构建一个完整的企业级数据库备份与恢复系统,确保数据的安全性和业务的连续性。