跳转到主要内容

MongoDB分布式架构设计与实践:从副本集到分片集群的完整方案

博主
21 分钟
4327 字
--

AI 导读

深刻理解和准确把握"MongoDB分布式架构设计与实践:从副本集到分片集群的完整方案"这一重要概念的核心要义,本文从理论基础、实践应用和发展前景等多个维度进行了系统性阐述,为读者提供了全面而深入的分析视角。

内容由AI智能生成

MongoDB分布式架构设计与实践:从副本集到分片集群的完整方案

MongoDB作为领先的NoSQL文档数据库,在现代应用架构中广泛应用。随着数据量的增长和业务复杂度的提升,单机MongoDB已无法满足高可用、高性能的需求。本文将深入探讨MongoDB分布式架构的设计原理和实践方案,从基础的副本集到复杂的分片集群部署。

MongoDB分布式架构概述

MongoDB集群架构对比

架构模式 特点 适用场景 优点 缺点
单机模式 单个MongoDB实例 开发测试、小型应用 简单易用、性能高 无高可用、容量受限
副本集 一主多从,自动故障转移 中等规模应用 高可用、读扩展 写性能无法扩展
分片集群 水平分片,分布式存储 大规模应用 水平扩展、高性能 复杂度高、运维成本高
混合架构 分片+副本集 企业级应用 高可用+高性能 架构复杂、资源消耗大

MongoDB分布式架构演进

graph TB
    subgraph "单机模式"
        A[MongoDB Server]
        A1[Application]
        A1 --> A
    end
    
    subgraph "副本集模式"
        B[Primary]
        B1[Secondary 1]
        B2[Secondary 2]
        B3[Application]
        B --> B1
        B --> B2
        B3 --> B
        B3 -.-> B1
        B3 -.-> B2
    end
    
    subgraph "分片集群模式"
        C[mongos]
        C1[Config Server RS]
        C2[Shard 1 RS]
        C3[Shard 2 RS]
        C4[Shard 3 RS]
        C5[Application]
        C --> C1
        C --> C2
        C --> C3
        C --> C4
        C5 --> C
    end

MongoDB副本集配置与管理

副本集架构设计

MongoDB副本集是一组维护相同数据集的MongoDB服务器,提供冗余和高可用性。

副本集成员类型

  1. Primary(主节点):接收所有写操作
  2. Secondary(从节点):复制主节点数据,可提供读服务
  3. Arbiter(仲裁节点):参与选举但不存储数据

副本集配置文件

# mongod-primary.conf
systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod-primary.log
  logRotate: reopen

storage:
  dbPath: /var/lib/mongodb/primary
  journal:
    enabled: true
  wiredTiger:
    engineConfig:
      cacheSizeGB: 4
      journalCompressor: snappy
      directoryForIndexes: true
    collectionConfig:
      blockCompressor: snappy
    indexConfig:
      prefixCompression: true

processManagement:
  fork: true
  pidFilePath: /var/run/mongodb/mongod-primary.pid

net:
  port: 27017
  bindIp: 0.0.0.0
  maxIncomingConnections: 1000

security:
  authorization: enabled
  keyFile: /etc/mongodb/mongodb-keyfile

replication:
  replSetName: "rs0"
  oplogSizeMB: 2048

setParameter:
  enableLocalhostAuthBypass: false
  authenticationMechanisms: SCRAM-SHA-1,SCRAM-SHA-256

operationProfiling:
  mode: slowOp
  slowOpThresholdMs: 100

副本集部署脚本

#!/bin/bash
# scripts/deploy_mongodb_replica_set.sh

set -euo pipefail

# 配置参数
REPLICA_SET_NAME="rs0"
MONGODB_VERSION="6.0"
MONGODB_PORT="27017"
MONGODB_USER="admin"
MONGODB_PASSWORD="your_secure_password"
KEYFILE_CONTENT="your_keyfile_content_here"

# 节点配置
declare -A NODES=(
    ["primary"]="192.168.1.10"
    ["secondary1"]="192.168.1.11"
    ["secondary2"]="192.168.1.12"
)

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >&2
}

# 创建MongoDB配置文件
create_mongodb_config() {
    local node_type="$1"
    local node_ip="$2"
    local config_file="/tmp/mongod-${node_type}.conf"
    
    cat > "${config_file}" << EOF
systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod-${node_type}.log
  logRotate: reopen

storage:
  dbPath: /var/lib/mongodb/${node_type}
  journal:
    enabled: true
  wiredTiger:
    engineConfig:
      cacheSizeGB: 4
      journalCompressor: snappy
      directoryForIndexes: true
    collectionConfig:
      blockCompressor: snappy
    indexConfig:
      prefixCompression: true

processManagement:
  fork: true
  pidFilePath: /var/run/mongodb/mongod-${node_type}.pid

net:
  port: ${MONGODB_PORT}
  bindIp: 0.0.0.0
  maxIncomingConnections: 1000

security:
  authorization: enabled
  keyFile: /etc/mongodb/mongodb-keyfile

replication:
  replSetName: "${REPLICA_SET_NAME}"
  oplogSizeMB: 2048

setParameter:
  enableLocalhostAuthBypass: false
  authenticationMechanisms: SCRAM-SHA-1,SCRAM-SHA-256

operationProfiling:
  mode: slowOp
  slowOpThresholdMs: 100
EOF
    
    echo "${config_file}"
}

# 创建keyfile
create_keyfile() {
    local keyfile="/tmp/mongodb-keyfile"
    
    echo "${KEYFILE_CONTENT}" > "${keyfile}"
    chmod 600 "${keyfile}"
    
    echo "${keyfile}"
}

# 部署MongoDB节点
deploy_mongodb_node() {
    local node_type="$1"
    local node_ip="$2"
    
    log "部署MongoDB节点: ${node_type} (${node_ip})"
    
    # 创建配置文件
    local config_file
    config_file=$(create_mongodb_config "${node_type}" "${node_ip}")
    
    # 创建keyfile
    local keyfile
    keyfile=$(create_keyfile)
    
    # 复制文件到目标节点
    scp "${config_file}" "root@${node_ip}:/etc/mongod.conf"
    scp "${keyfile}" "root@${node_ip}:/etc/mongodb/mongodb-keyfile"
    
    # 在目标节点上创建必要的目录
    ssh "root@${node_ip}" "mkdir -p /var/lib/mongodb/${node_type} /var/log/mongodb /var/run/mongodb /etc/mongodb"
    
    # 设置权限
    ssh "root@${node_ip}" "chmod 600 /etc/mongodb/mongodb-keyfile"
    ssh "root@${node_ip}" "chown -R mongod:mongod /var/lib/mongodb /var/log/mongodb /var/run/mongodb /etc/mongodb"
    
    # 启动MongoDB
    ssh "root@${node_ip}" "systemctl start mongod"
    ssh "root@${node_ip}" "systemctl enable mongod"
    
    # 检查启动状态
    sleep 5
    if ssh "root@${node_ip}" "systemctl is-active mongod" | grep -q "active"; then
        log "MongoDB节点 ${node_type} 启动成功"
    else
        log "ERROR: MongoDB节点 ${node_type} 启动失败"
        exit 1
    fi
    
    # 清理临时文件
    rm -f "${config_file}" "${keyfile}"
}

# 初始化副本集
initialize_replica_set() {
    log "初始化副本集..."
    
    local primary_ip="${NODES[primary]}"
    
    # 等待MongoDB启动完成
    sleep 10
    
    # 创建副本集配置
    local rs_config=$(cat << EOF
rs.initiate({
  _id: "${REPLICA_SET_NAME}",
  members: [
    { _id: 0, host: "${NODES[primary]}:${MONGODB_PORT}", priority: 2 },
    { _id: 1, host: "${NODES[secondary1]}:${MONGODB_PORT}", priority: 1 },
    { _id: 2, host: "${NODES[secondary2]}:${MONGODB_PORT}", priority: 1 }
  ]
})
EOF
)
    
    # 连接到主节点并初始化副本集
    mongo --host "${primary_ip}:${MONGODB_PORT}" --eval "${rs_config}"
    
    # 等待副本集初始化完成
    log "等待副本集初始化完成..."
    sleep 30
    
    # 创建管理员用户
    local create_user_script=$(cat << EOF
use admin
db.createUser({
  user: "${MONGODB_USER}",
  pwd: "${MONGODB_PASSWORD}",
  roles: [
    { role: "root", db: "admin" }
  ]
})
EOF
)
    
    mongo --host "${primary_ip}:${MONGODB_PORT}" --eval "${create_user_script}"
    
    log "副本集初始化完成"
}

# 验证副本集状态
verify_replica_set() {
    log "验证副本集状态..."
    
    local primary_ip="${NODES[primary]}"
    
    # 检查副本集状态
    local rs_status
    rs_status=$(mongo --host "${primary_ip}:${MONGODB_PORT}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "rs.status()" --quiet)
    
    echo "${rs_status}"
    
    # 检查副本集配置
    local rs_config
    rs_config=$(mongo --host "${primary_ip}:${MONGODB_PORT}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "rs.conf()" --quiet)
    
    echo "${rs_config}"
}

# 测试副本集功能
test_replica_set() {
    log "测试副本集功能..."
    
    local primary_ip="${NODES[primary]}"
    
    # 测试写入数据
    local test_write=$(cat << 'EOF'
use testdb
db.testcollection.insertOne({
  name: "test_document",
  timestamp: new Date(),
  data: "This is a test document for replica set"
})
EOF
)
    
    mongo --host "${primary_ip}:${MONGODB_PORT}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "${test_write}"
    
    # 等待复制完成
    sleep 5
    
    # 从从节点读取数据
    local secondary_ip="${NODES[secondary1]}"
    local test_read=$(cat << 'EOF'
rs.slaveOk()
use testdb
db.testcollection.findOne({name: "test_document"})
EOF
)
    
    local read_result
    read_result=$(mongo --host "${secondary_ip}:${MONGODB_PORT}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "${test_read}" --quiet)
    
    if echo "${read_result}" | grep -q "test_document"; then
        log "副本集功能测试成功"
    else
        log "ERROR: 副本集功能测试失败"
        exit 1
    fi
}

# 主函数
main() {
    log "开始部署MongoDB副本集..."
    
    # 部署所有节点
    for node_type in "${!NODES[@]}"; do
        deploy_mongodb_node "${node_type}" "${NODES[${node_type}]}"
    done
    
    # 初始化副本集
    initialize_replica_set
    
    # 验证副本集状态
    verify_replica_set
    
    # 测试副本集功能
    test_replica_set
    
    log "MongoDB副本集部署完成!"
}

# 执行主函数
main "$@"

MongoDB数据分布策略

分片键选择原则

选择合适的分片键是分片集群性能的关键因素:

分片键特征要求

  1. 高基数(High Cardinality):分片键值应该有足够的唯一性
  2. 低频率(Low Frequency):避免某个值出现过于频繁
  3. 非单调性(Non-Monotonic):避免热点写入

常见分片键策略

// 1. 哈希分片键 - 适用于单调递增的字段
db.users.createIndex({ "user_id": "hashed" })
sh.shardCollection("myapp.users", { "user_id": "hashed" })

// 2. 范围分片键 - 适用于范围查询
db.orders.createIndex({ "order_date": 1, "customer_id": 1 })
sh.shardCollection("myapp.orders", { "order_date": 1, "customer_id": 1 })

// 3. 复合分片键 - 提高查询性能
db.products.createIndex({ "category": 1, "product_id": 1 })
sh.shardCollection("myapp.products", { "category": 1, "product_id": 1 })

// 4. 标签感知分片 - 地理位置分布
sh.addShardTag("shard1rs", "US-EAST")
sh.addShardTag("shard2rs", "US-WEST")
sh.addTagRange("myapp.users", { "region": "east" }, { "region": "east\uffff" }, "US-EAST")
sh.addTagRange("myapp.users", { "region": "west" }, { "region": "west\uffff" }, "US-WEST")

数据分布监控脚本

#!/usr/bin/env python3
# scripts/mongodb_shard_distribution_monitor.py

import pymongo
import json
import time
import argparse
from datetime import datetime
from collections import defaultdict

class MongoShardMonitor:
    def __init__(self, mongos_uri, username, password):
        self.client = pymongo.MongoClient(
            mongos_uri,
            username=username,
            password=password,
            authSource='admin'
        )
        self.admin_db = self.client.admin
        
    def get_shard_distribution(self, database_name, collection_name):
        """获取集合的分片分布信息"""
        try:
            # 获取集合统计信息
            stats = self.admin_db.command("collStats", f"{database_name}.{collection_name}")
            
            shard_distribution = {}
            
            if 'sharded' in stats and stats['sharded']:
                for shard_name, shard_stats in stats['shards'].items():
                    shard_distribution[shard_name] = {
                        'count': shard_stats.get('count', 0),
                        'size': shard_stats.get('size', 0),
                        'avgObjSize': shard_stats.get('avgObjSize', 0),
                        'storageSize': shard_stats.get('storageSize', 0),
                        'indexes': shard_stats.get('nindexes', 0),
                        'indexSize': shard_stats.get('totalIndexSize', 0)
                    }
            else:
                # 非分片集合
                shard_distribution['unsharded'] = {
                    'count': stats.get('count', 0),
                    'size': stats.get('size', 0),
                    'avgObjSize': stats.get('avgObjSize', 0),
                    'storageSize': stats.get('storageSize', 0),
                    'indexes': stats.get('nindexes', 0),
                    'indexSize': stats.get('totalIndexSize', 0)
                }
                
            return shard_distribution
            
        except Exception as e:
            print(f"Error getting shard distribution for {database_name}.{collection_name}: {e}")
            return {}
    
    def get_chunk_distribution(self, database_name, collection_name):
        """获取chunk分布信息"""
        try:
            config_db = self.client.config
            
            # 获取chunk信息
            chunks = list(config_db.chunks.find({
                "ns": f"{database_name}.{collection_name}"
            }))
            
            chunk_distribution = defaultdict(int)
            for chunk in chunks:
                shard = chunk['shard']
                chunk_distribution[shard] += 1
                
            return dict(chunk_distribution)
            
        except Exception as e:
            print(f"Error getting chunk distribution for {database_name}.{collection_name}: {e}")
            return {}
    
    def get_balancer_status(self):
        """获取均衡器状态"""
        try:
            balancer_status = self.admin_db.command("balancerStatus")
            return balancer_status
        except Exception as e:
            print(f"Error getting balancer status: {e}")
            return {}
    
    def get_shard_list(self):
        """获取分片列表"""
        try:
            shards = list(self.admin_db.command("listShards")['shards'])
            return shards
        except Exception as e:
            print(f"Error getting shard list: {e}")
            return []
    
    def analyze_distribution_balance(self, shard_distribution):
        """分析分布均衡性"""
        if not shard_distribution:
            return {}
            
        total_docs = sum(shard['count'] for shard in shard_distribution.values())
        total_size = sum(shard['size'] for shard in shard_distribution.values())
        
        if total_docs == 0:
            return {}
            
        shard_count = len(shard_distribution)
        expected_docs_per_shard = total_docs / shard_count
        expected_size_per_shard = total_size / shard_count
        
        balance_analysis = {
            'total_documents': total_docs,
            'total_size_bytes': total_size,
            'shard_count': shard_count,
            'expected_docs_per_shard': expected_docs_per_shard,
            'expected_size_per_shard': expected_size_per_shard,
            'shard_details': {}
        }
        
        for shard_name, stats in shard_distribution.items():
            doc_variance = abs(stats['count'] - expected_docs_per_shard) / expected_docs_per_shard * 100
            size_variance = abs(stats['size'] - expected_size_per_shard) / expected_size_per_shard * 100 if expected_size_per_shard > 0 else 0
            
            balance_analysis['shard_details'][shard_name] = {
                'documents': stats['count'],
                'size_bytes': stats['size'],
                'doc_percentage': stats['count'] / total_docs * 100,
                'size_percentage': stats['size'] / total_size * 100 if total_size > 0 else 0,
                'doc_variance_percent': doc_variance,
                'size_variance_percent': size_variance,
                'balance_score': (doc_variance + size_variance) / 2
            }
            
        return balance_analysis
    
    def generate_report(self, databases_collections):
        """生成分布报告"""
        report = {
            'timestamp': datetime.now().isoformat(),
            'cluster_info': {
                'shards': self.get_shard_list(),
                'balancer_status': self.get_balancer_status()
            },
            'collections': {}
        }
        
        for db_name, collections in databases_collections.items():
            for collection_name in collections:
                collection_key = f"{db_name}.{collection_name}"
                
                print(f"Analyzing {collection_key}...")
                
                shard_distribution = self.get_shard_distribution(db_name, collection_name)
                chunk_distribution = self.get_chunk_distribution(db_name, collection_name)
                balance_analysis = self.analyze_distribution_balance(shard_distribution)
                
                report['collections'][collection_key] = {
                    'shard_distribution': shard_distribution,
                    'chunk_distribution': chunk_distribution,
                    'balance_analysis': balance_analysis
                }
                
        return report
    
    def print_summary_report(self, report):
        """打印摘要报告"""
        print("\n" + "="*80)
        print("MongoDB分片集群分布报告")
        print("="*80)
        print(f"生成时间: {report['timestamp']}")
        
        # 集群信息
        print(f"\n集群信息:")
        print(f"  分片数量: {len(report['cluster_info']['shards'])}")
        
        balancer = report['cluster_info']['balancer_status']
        if balancer:
            print(f"  均衡器状态: {'启用' if balancer.get('mode') == 'full' else '禁用'}")
            if 'inBalancerRound' in balancer:
                print(f"  正在均衡: {'是' if balancer['inBalancerRound'] else '否'}")
        
        # 集合分析
        print(f"\n集合分析:")
        for collection_name, collection_data in report['collections'].items():
            print(f"\n  集合: {collection_name}")
            
            balance_analysis = collection_data['balance_analysis']
            if balance_analysis:
                print(f"    总文档数: {balance_analysis['total_documents']:,}")
                print(f"    总大小: {balance_analysis['total_size_bytes']:,} bytes")
                print(f"    分片数量: {balance_analysis['shard_count']}")
                
                # 分片详情
                print(f"    分片分布:")
                for shard_name, shard_details in balance_analysis['shard_details'].items():
                    print(f"      {shard_name}:")
                    print(f"        文档数: {shard_details['documents']:,} ({shard_details['doc_percentage']:.1f}%)")
                    print(f"        大小: {shard_details['size_bytes']:,} bytes ({shard_details['size_percentage']:.1f}%)")
                    print(f"        均衡评分: {shard_details['balance_score']:.1f}")
                    
                    # 警告不均衡的分片
                    if shard_details['balance_score'] > 20:
                        print(f"        ⚠️  警告: 分布不均衡!")
            
            # Chunk分布
            chunk_distribution = collection_data['chunk_distribution']
            if chunk_distribution:
                print(f"    Chunk分布:")
                for shard_name, chunk_count in chunk_distribution.items():
                    print(f"      {shard_name}: {chunk_count} chunks")
    
    def save_report(self, report, filename):
        """保存报告到文件"""
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2, ensure_ascii=False, default=str)
        print(f"\n报告已保存到: {filename}")

def main():
    parser = argparse.ArgumentParser(description='MongoDB分片分布监控工具')
    parser.add_argument('--mongos-uri', required=True, help='mongos连接URI')
    parser.add_argument('--username', required=True, help='MongoDB用户名')
    parser.add_argument('--password', required=True, help='MongoDB密码')
    parser.add_argument('--databases', required=True, help='要监控的数据库和集合,格式: db1:col1,col2;db2:col3')
    parser.add_argument('--output', help='输出文件名')
    parser.add_argument('--interval', type=int, default=0, help='监控间隔(秒),0表示只运行一次')
    
    args = parser.parse_args()
    
    # 解析数据库和集合
    databases_collections = {}
    for db_spec in args.databases.split(';'):
        db_name, collections_str = db_spec.split(':')
        databases_collections[db_name] = collections_str.split(',')
    
    monitor = MongoShardMonitor(args.mongos_uri, args.username, args.password)
    
    try:
        while True:
            report = monitor.generate_report(databases_collections)
            monitor.print_summary_report(report)
            
            if args.output:
                timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
                filename = f"{args.output}_{timestamp}.json"
                monitor.save_report(report, filename)
            
            if args.interval <= 0:
                break
                
            print(f"\n等待 {args.interval} 秒后进行下次检查...")
            time.sleep(args.interval)
            
    except KeyboardInterrupt:
        print("\n监控已停止")
    except Exception as e:
        print(f"监控过程中发生错误: {e}")

if __name__ == "__main__":
    main()

MongoDB性能优化

索引优化策略

// 1. 复合索引优化
// ESR规则:Equality, Sort, Range
db.orders.createIndex({
    "status": 1,        // Equality
    "created_at": -1,   // Sort
    "amount": 1         // Range
})

// 2. 部分索引 - 减少索引大小
db.users.createIndex(
    { "email": 1 },
    { 
        partialFilterExpression: { 
            "email": { $exists: true, $ne: null } 
        } 
    }
)

// 3. 稀疏索引 - 只索引存在字段的文档
db.products.createIndex(
    { "discount_code": 1 },
    { sparse: true }
)

// 4. TTL索引 - 自动删除过期数据
db.sessions.createIndex(
    { "created_at": 1 },
    { expireAfterSeconds: 3600 }
)

// 5. 文本索引 - 全文搜索
db.articles.createIndex({
    "title": "text",
    "content": "text"
}, {
    weights: {
        "title": 10,
        "content": 5
    },
    name: "article_text_index"
})

查询优化脚本

#!/usr/bin/env python3
# scripts/mongodb_query_optimizer.py

import pymongo
import time
import json
from datetime import datetime, timedelta

class MongoQueryOptimizer:
    def __init__(self, connection_uri, username, password):
        self.client = pymongo.MongoClient(
            connection_uri,
            username=username,
            password=password,
            authSource='admin'
        )
    
    def analyze_slow_queries(self, database_name, threshold_ms=100):
        """分析慢查询"""
        db = self.client[database_name]
        
        # 启用profiler
        db.set_profiling_level(2, slow_ms=threshold_ms)
        
        print(f"分析数据库 {database_name} 的慢查询 (阈值: {threshold_ms}ms)")
        
        # 等待收集一些查询数据
        time.sleep(60)
        
        # 获取慢查询
        slow_queries = list(db.system.profile.find({
            "ts": {"$gte": datetime.now() - timedelta(minutes=5)},
            "millis": {"$gte": threshold_ms}
        }).sort("millis", -1).limit(20))
        
        return slow_queries
    
    def suggest_indexes(self, slow_queries):
        """根据慢查询建议索引"""
        index_suggestions = []
        
        for query in slow_queries:
            suggestion = {
                'collection': query.get('ns', '').split('.')[-1],
                'query_pattern': query.get('command', {}),
                'execution_time_ms': query.get('millis', 0),
                'suggested_indexes': []
            }
            
            # 分析查询模式
            command = query.get('command', {})
            
            if 'find' in command:
                filter_fields = command.get('filter', {})
                sort_fields = command.get('sort', {})
                
                # 构建建议的复合索引
                index_fields = []
                
                # 添加等值查询字段
                for field, value in filter_fields.items():
                    if not isinstance(value, dict):
                        index_fields.append((field, 1))
                
                # 添加排序字段
                for field, direction in sort_fields.items():
                    if (field, direction) not in index_fields:
                        index_fields.append((field, direction))
                
                # 添加范围查询字段
                for field, value in filter_fields.items():
                    if isinstance(value, dict) and any(op in value for op in ['$gt', '$gte', '$lt', '$lte']):
                        if (field, 1) not in index_fields:
                            index_fields.append((field, 1))
                
                if index_fields:
                    suggestion['suggested_indexes'].append({
                        'fields': index_fields,
                        'type': 'compound'
                    })
            
            if suggestion['suggested_indexes']:
                index_suggestions.append(suggestion)
        
        return index_suggestions
    
    def analyze_index_usage(self, database_name, collection_name):
        """分析索引使用情况"""
        db = self.client[database_name]
        collection = db[collection_name]
        
        # 获取索引统计
        index_stats = list(collection.aggregate([
            {"$indexStats": {}}
        ]))
        
        # 获取索引信息
        indexes = collection.list_indexes()
        
        analysis = {
            'collection': f"{database_name}.{collection_name}",
            'indexes': [],
            'recommendations': []
        }
        
        for index_stat in index_stats:
            index_name = index_stat['name']
            usage_count = index_stat['accesses']['ops']
            
            # 查找对应的索引定义
            index_def = None
            for idx in indexes:
                if idx['name'] == index_name:
                    index_def = idx
                    break
            
            index_info = {
                'name': index_name,
                'usage_count': usage_count,
                'definition': index_def['key'] if index_def else {},
                'size_bytes': 0  # 需要通过collStats获取
            }
            
            # 添加建议
            if usage_count == 0 and index_name != '_id_':
                analysis['recommendations'].append({
                    'type': 'unused_index',
                    'index_name': index_name,
                    'suggestion': f"考虑删除未使用的索引: {index_name}"
                })
            
            analysis['indexes'].append(index_info)
        
        return analysis
    
    def generate_optimization_report(self, databases):
        """生成优化报告"""
        report = {
            'timestamp': datetime.now().isoformat(),
            'databases': {}
        }
        
        for db_name in databases:
            print(f"分析数据库: {db_name}")
            
            db_report = {
                'slow_queries': [],
                'index_suggestions': [],
                'collections': {}
            }
            
            # 分析慢查询
            slow_queries = self.analyze_slow_queries(db_name)
            db_report['slow_queries'] = slow_queries
            
            # 生成索引建议
            index_suggestions = self.suggest_indexes(slow_queries)
            db_report['index_suggestions'] = index_suggestions
            
            # 分析每个集合的索引使用情况
            db = self.client[db_name]
            collections = db.list_collection_names()
            
            for collection_name in collections:
                if not collection_name.startswith('system.'):
                    print(f"  分析集合: {collection_name}")
                    collection_analysis = self.analyze_index_usage(db_name, collection_name)
                    db_report['collections'][collection_name] = collection_analysis
            
            report['databases'][db_name] = db_report
        
        return report
    
    def print_optimization_report(self, report):
        """打印优化报告"""
        print("\n" + "="*80)
        print("MongoDB查询优化报告")
        print("="*80)
        print(f"生成时间: {report['timestamp']}")
        
        for db_name, db_report in report['databases'].items():
            print(f"\n数据库: {db_name}")
            print("-" * 40)
            
            # 慢查询统计
            slow_queries = db_report['slow_queries']
            print(f"慢查询数量: {len(slow_queries)}")
            
            if slow_queries:
                print("最慢的5个查询:")
                for i, query in enumerate(slow_queries[:5]):
                    collection = query.get('ns', '').split('.')[-1]
                    duration = query.get('millis', 0)
                    print(f"  {i+1}. 集合: {collection}, 耗时: {duration}ms")
            
            # 索引建议
            index_suggestions = db_report['index_suggestions']
            if index_suggestions:
                print(f"\n索引建议 ({len(index_suggestions)} 个):")
                for suggestion in index_suggestions:
                    collection = suggestion['collection']
                    print(f"  集合: {collection}")
                    for idx_suggestion in suggestion['suggested_indexes']:
                        fields = idx_suggestion['fields']
                        print(f"    建议索引: {fields}")
            
            # 集合分析
            print(f"\n集合索引分析:")
            for collection_name, collection_analysis in db_report['collections'].items():
                print(f"  集合: {collection_name}")
                
                unused_indexes = [rec for rec in collection_analysis['recommendations'] 
                                if rec['type'] == 'unused_index']
                if unused_indexes:
                    print(f"    未使用索引: {len(unused_indexes)} 个")
                    for rec in unused_indexes:
                        print(f"      - {rec['index_name']}")

def main():
    # 配置参数
    MONGOS_URI = "mongodb://localhost:27017"
    USERNAME = "admin"
    PASSWORD = "your_password"
    DATABASES = ["myapp", "analytics"]
    
    optimizer = MongoQueryOptimizer(MONGOS_URI, USERNAME, PASSWORD)
    
    try:
        report = optimizer.generate_optimization_report(DATABASES)
        optimizer.print_optimization_report(report)
        
        # 保存报告
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        filename = f"mongodb_optimization_report_{timestamp}.json"
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2, ensure_ascii=False, default=str)
        
        print(f"\n详细报告已保存到: {filename}")
        
    except Exception as e:
        print(f"优化分析过程中发生错误: {e}")

if __name__ == "__main__":
    main()

故障转移与灾难恢复

自动故障转移配置

# 副本集故障转移配置
replication:
  replSetName: "rs0"
  enableMajorityReadConcern: true

# 选举超时配置
settings:
  electionTimeoutMillis: 10000
  heartbeatIntervalMillis: 2000
  heartbeatTimeoutSecs: 10
  catchUpTimeoutMillis: 60000

备份恢复脚本

#!/bin/bash
# scripts/mongodb_backup_restore.sh

set -euo pipefail

# 配置参数
MONGODB_URI="mongodb://admin:password@localhost:27017/admin"
BACKUP_DIR="/backup/mongodb"
RETENTION_DAYS=7
S3_BUCKET="mongodb-backups"
ENCRYPTION_KEY="your-encryption-key"

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >&2
}

# 创建备份
create_backup() {
    local backup_type="$1"  # full, incremental
    local timestamp
    timestamp=$(date '+%Y%m%d_%H%M%S')
    local backup_path="${BACKUP_DIR}/${backup_type}_${timestamp}"
    
    log "开始创建 ${backup_type} 备份..."
    
    mkdir -p "${backup_path}"
    
    case "${backup_type}" in
        "full")
            # 全量备份
            mongodump --uri="${MONGODB_URI}" --out="${backup_path}" --gzip
            ;;
        "incremental")
            # 增量备份(基于oplog)
            local last_backup_time
            last_backup_time=$(find "${BACKUP_DIR}" -name "full_*" -type d | sort | tail -1 | xargs basename | cut -d_ -f2-)
            
            if [[ -n "${last_backup_time}" ]]; then
                local start_time
                start_time=$(date -d "${last_backup_time:0:8} ${last_backup_time:9:2}:${last_backup_time:11:2}:${last_backup_time:13:2}" '+%s')
                
                mongodump --uri="${MONGODB_URI}" --out="${backup_path}" \
                    --oplog --query "{\"ts\": {\"\$gte\": {\"t\": ${start_time}, \"i\": 0}}}" --gzip
            else
                log "ERROR: 未找到全量备份,无法创建增量备份"
                return 1
            fi
            ;;
    esac
    
    # 压缩备份
    tar -czf "${backup_path}.tar.gz" -C "${BACKUP_DIR}" "$(basename "${backup_path}")"
    rm -rf "${backup_path}"
    
    # 加密备份
    if [[ -n "${ENCRYPTION_KEY}" ]]; then
        openssl enc -aes-256-cbc -salt -in "${backup_path}.tar.gz" \
            -out "${backup_path}.tar.gz.enc" -k "${ENCRYPTION_KEY}"
        rm "${backup_path}.tar.gz"
        backup_file="${backup_path}.tar.gz.enc"
    else
        backup_file="${backup_path}.tar.gz"
    fi
    
    # 上传到S3
    if command -v aws &> /dev/null && [[ -n "${S3_BUCKET}" ]]; then
        aws s3 cp "${backup_file}" "s3://${S3_BUCKET}/$(basename "${backup_file}")"
        log "备份已上传到S3: s3://${S3_BUCKET}/$(basename "${backup_file}")"
    fi
    
    log "${backup_type} 备份完成: ${backup_file}"
    echo "${backup_file}"
}

# 恢复备份
restore_backup() {
    local backup_file="$1"
    local target_uri="$2"
    local restore_path="/tmp/mongodb_restore_$(date '+%Y%m%d_%H%M%S')"
    
    log "开始恢复备份: ${backup_file}"
    
    mkdir -p "${restore_path}"
    
    # 解密备份
    if [[ "${backup_file}" == *.enc ]]; then
        openssl enc -aes-256-cbc -d -in "${backup_file}" \
            -out "${restore_path}/backup.tar.gz" -k "${ENCRYPTION_KEY}"
        backup_file="${restore_path}/backup.tar.gz"
    fi
    
    # 解压备份
    tar -xzf "${backup_file}" -C "${restore_path}"
    
    # 查找备份目录
    local backup_dir
    backup_dir=$(find "${restore_path}" -type d -name "*_*" | head -1)
    
    if [[ -z "${backup_dir}" ]]; then
        log "ERROR: 未找到有效的备份目录"
        return 1
    fi
    
    # 恢复数据
    mongorestore --uri="${target_uri}" --dir="${backup_dir}" --gzip --drop
    
    # 清理临时文件
    rm -rf "${restore_path}"
    
    log "备份恢复完成"
}

# 清理旧备份
cleanup_old_backups() {
    log "清理 ${RETENTION_DAYS} 天前的备份..."
    
    find "${BACKUP_DIR}" -name "*.tar.gz*" -mtime +${RETENTION_DAYS} -delete
    
    # 清理S3中的旧备份
    if command -v aws &> /dev/null && [[ -n "${S3_BUCKET}" ]]; then
        local cutoff_date
        cutoff_date=$(date -d "${RETENTION_DAYS} days ago" '+%Y-%m-%d')
        
        aws s3 ls "s3://${S3_BUCKET}/" | while read -r line; do
            local file_date
            file_date=$(echo "${line}" | awk '{print $1}')
            local file_name
            file_name=$(echo "${line}" | awk '{print $4}')
            
            if [[ "${file_date}" < "${cutoff_date}" ]]; then
                aws s3 rm "s3://${S3_BUCKET}/${file_name}"
                log "已删除S3中的旧备份: ${file_name}"
            fi
        done
    fi
    
    log "旧备份清理完成"
}

# 验证备份
verify_backup() {
    local backup_file="$1"
    local temp_restore_path="/tmp/mongodb_verify_$(date '+%Y%m%d_%H%M%S')"
    
    log "验证备份: ${backup_file}"
    
    mkdir -p "${temp_restore_path}"
    
    # 解密和解压
    if [[ "${backup_file}" == *.enc ]]; then
        openssl enc -aes-256-cbc -d -in "${backup_file}" \
            -out "${temp_restore_path}/backup.tar.gz" -k "${ENCRYPTION_KEY}"
        tar -tzf "${temp_restore_path}/backup.tar.gz" > /dev/null
    else
        tar -tzf "${backup_file}" > /dev/null
    fi
    
    if [[ $? -eq 0 ]]; then
        log "备份验证成功: ${backup_file}"
        rm -rf "${temp_restore_path}"
        return 0
    else
        log "ERROR: 备份验证失败: ${backup_file}"
        rm -rf "${temp_restore_path}"
        return 1
    fi
}

# 主函数
main() {
    local action="$1"
    
    case "${action}" in
        "backup")
            local backup_type="${2:-full}"
            mkdir -p "${BACKUP_DIR}"
            backup_file=$(create_backup "${backup_type}")
            verify_backup "${backup_file}"
            cleanup_old_backups
            ;;
        "restore")
            local backup_file="$2"
            local target_uri="${3:-${MONGODB_URI}}"
            restore_backup "${backup_file}" "${target_uri}"
            ;;
        "verify")
            local backup_file="$2"
            verify_backup "${backup_file}"
            ;;
        "cleanup")
            cleanup_old_backups
            ;;
        *)
            echo "用法: $0 {backup|restore|verify|cleanup} [参数...]"
            echo "  backup [full|incremental] - 创建备份"
            echo "  restore <backup_file> [target_uri] - 恢复备份"
            echo "  verify <backup_file> - 验证备份"
            echo "  cleanup - 清理旧备份"
            exit 1
            ;;
    esac
}

# 执行主函数
main "$@"

总结

MongoDB分布式架构为现代应用提供了强大的数据存储和处理能力。通过合理的架构设计、正确的分片策略、完善的监控体系和可靠的备份恢复机制,可以构建出高可用、高性能、可扩展的MongoDB集群。

关键要点

  1. 架构选择:根据业务需求选择合适的架构模式
  2. 分片策略:选择合适的分片键,确保数据均匀分布
  3. 性能优化:通过索引优化、查询优化提升性能
  4. 监控告警:建立完善的监控体系,及时发现问题
  5. 备份恢复:制定完善的备份策略,确保数据安全

MongoDB分布式架构的成功实施需要深入理解其工作原理,结合业务特点进行定制化设计,并在实践中不断优化和完善。

副本集监控脚本

#!/bin/bash
# scripts/mongodb_replica_set_monitor.sh

set -euo pipefail

# 配置参数
REPLICA_SET_NAME="rs0"
MONGODB_NODES=("192.168.1.10:27017" "192.168.1.11:27017" "192.168.1.12:27017")
MONGODB_USER="admin"
MONGODB_PASSWORD="your_secure_password"
CHECK_INTERVAL="${CHECK_INTERVAL:-60}"
ALERT_THRESHOLD_LAG="${ALERT_THRESHOLD_LAG:-10}"

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >&2
}

# 检查MongoDB连接
check_mongodb_connection() {
    local host_port="$1"
    
    if mongo --host "${host_port}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "db.runCommand('ping')" --quiet > /dev/null 2>&1; then
        return 0
    else
        return 1
    fi
}

# 获取副本集状态
get_replica_set_status() {
    local primary_node=""
    
    # 找到主节点
    for node in "${MONGODB_NODES[@]}"; do
        if check_mongodb_connection "${node}"; then
            local is_master
            is_master=$(mongo --host "${node}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "db.isMaster().ismaster" --quiet 2>/dev/null || echo "false")
            
            if [[ "${is_master}" == "true" ]]; then
                primary_node="${node}"
                break
            fi
        fi
    done
    
    if [[ -z "${primary_node}" ]]; then
        log "ERROR: 无法找到主节点"
        send_alert "MongoDB副本集无法找到主节点"
        return 1
    fi
    
    log "主节点: ${primary_node}"
    
    # 获取副本集状态
    local rs_status
    rs_status=$(mongo --host "${primary_node}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "rs.status()" --quiet 2>/dev/null || echo "")
    
    if [[ -z "${rs_status}" ]]; then
        log "ERROR: 无法获取副本集状态"
        send_alert "MongoDB副本集状态获取失败"
        return 1
    fi
    
    echo "${rs_status}"
}

# 检查副本集健康状态
check_replica_set_health() {
    log "检查副本集健康状态..."
    
    local rs_status
    rs_status=$(get_replica_set_status)
    
    if [[ $? -ne 0 ]]; then
        return 1
    fi
    
    # 解析副本集状态
    local members_info
    members_info=$(echo "${rs_status}" | grep -A 20 "members" || echo "")
    
    local healthy_members=0
    local total_members=0
    local primary_count=0
    local secondary_count=0
    
    # 统计成员状态
    while IFS= read -r line; do
        if echo "${line}" | grep -q '"state"'; then
            total_members=$((total_members + 1))
            
            if echo "${line}" | grep -q '"state" : 1'; then
                # Primary
                healthy_members=$((healthy_members + 1))
                primary_count=$((primary_count + 1))
            elif echo "${line}" | grep -q '"state" : 2'; then
                # Secondary
                healthy_members=$((healthy_members + 1))
                secondary_count=$((secondary_count + 1))
            fi
        fi
    done <<< "${members_info}"
    
    log "副本集成员状态: 总数=${total_members}, 健康=${healthy_members}, 主节点=${primary_count}, 从节点=${secondary_count}"
    
    # 检查主节点数量
    if [[ "${primary_count}" -ne 1 ]]; then
        log "ERROR: 主节点数量异常: ${primary_count}"
        send_alert "MongoDB副本集主节点数量异常: ${primary_count}"
    fi
    
    # 检查健康成员比例
    if [[ "${total_members}" -gt 0 ]]; then
        local health_ratio
        health_ratio=$((healthy_members * 100 / total_members))
        
        if [[ "${health_ratio}" -lt 67 ]]; then
            log "WARNING: 副本集健康成员比例过低: ${health_ratio}%"
            send_alert "MongoDB副本集健康成员比例过低: ${health_ratio}%"
        fi
    fi
}

# 检查复制延迟
check_replication_lag() {
    log "检查复制延迟..."
    
    local primary_node=""
    
    # 找到主节点
    for node in "${MONGODB_NODES[@]}"; do
        if check_mongodb_connection "${node}"; then
            local is_master
            is_master=$(mongo --host "${node}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "db.isMaster().ismaster" --quiet 2>/dev/null || echo "false")
            
            if [[ "${is_master}" == "true" ]]; then
                primary_node="${node}"
                break
            fi
        fi
    done
    
    if [[ -z "${primary_node}" ]]; then
        return 1
    fi
    
    # 获取主节点optime
    local primary_optime
    primary_optime=$(mongo --host "${primary_node}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "rs.status().members.find(m => m.self).optimeDate" --quiet 2>/dev/null || echo "")
    
    if [[ -z "${primary_optime}" ]]; then
        log "WARNING: 无法获取主节点optime"
        return 1
    fi
    
    # 检查每个从节点的延迟
    for node in "${MONGODB_NODES[@]}"; do
        if [[ "${node}" == "${primary_node}" ]]; then
            continue
        fi
        
        if check_mongodb_connection "${node}"; then
            local secondary_optime
            secondary_optime=$(mongo --host "${node}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "rs.status().members.find(m => m.self).optimeDate" --quiet 2>/dev/null || echo "")
            
            if [[ -n "${secondary_optime}" ]]; then
                # 计算延迟(简化计算,实际应该解析时间戳)
                local lag_seconds
                lag_seconds=$(mongo --host "${primary_node}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "
                    var primary = rs.status().members.find(m => m.self).optimeDate;
                    var secondary = rs.status().members.find(m => m.name === '${node}').optimeDate;
                    Math.floor((primary - secondary) / 1000);
                " --quiet 2>/dev/null || echo "-1")
                
                if [[ "${lag_seconds}" -eq -1 ]]; then
                    log "WARNING: 无法计算节点 ${node} 的复制延迟"
                elif [[ "${lag_seconds}" -gt "${ALERT_THRESHOLD_LAG}" ]]; then
                    log "WARNING: 节点 ${node} 复制延迟过高: ${lag_seconds} 秒"
                    send_alert "MongoDB节点复制延迟过高: ${node} ${lag_seconds}秒"
                else
                    log "节点 ${node} 复制延迟正常: ${lag_seconds} 秒"
                fi
            fi
        else
            log "ERROR: 无法连接到节点 ${node}"
            send_alert "MongoDB节点连接失败: ${node}"
        fi
    done
}

# 检查性能指标
check_performance_metrics() {
    log "检查性能指标..."
    
    for node in "${MONGODB_NODES[@]}"; do
        if ! check_mongodb_connection "${node}"; then
            continue
        fi
        
        log "检查节点 ${node} 性能指标:"
        
        # 获取服务器状态
        local server_status
        server_status=$(mongo --host "${node}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "db.serverStatus()" --quiet 2>/dev/null || echo "")
        
        if [[ -n "${server_status}" ]]; then
            # 解析关键指标
            local connections
            local memory_resident
            local memory_virtual
            local opcounters_insert
            local opcounters_query
            local opcounters_update
            local opcounters_delete
            
            connections=$(echo "${server_status}" | grep -A 5 '"connections"' | grep '"current"' | grep -o '[0-9]\+' || echo "0")
            memory_resident=$(echo "${server_status}" | grep -A 10 '"mem"' | grep '"resident"' | grep -o '[0-9]\+' || echo "0")
            memory_virtual=$(echo "${server_status}" | grep -A 10 '"mem"' | grep '"virtual"' | grep -o '[0-9]\+' || echo "0")
            
            log "  连接数: ${connections}"
            log "  常驻内存: ${memory_resident} MB"
            log "  虚拟内存: ${memory_virtual} MB"
            
            # 检查连接数阈值
            if [[ "${connections}" -gt 800 ]]; then
                log "WARNING: 节点 ${node} 连接数过高: ${connections}"
                send_alert "MongoDB节点连接数过高: ${node} ${connections}"
            fi
            
            # 检查内存使用
            if [[ "${memory_resident}" -gt 4096 ]]; then
                log "WARNING: 节点 ${node} 内存使用过高: ${memory_resident} MB"
                send_alert "MongoDB节点内存使用过高: ${node} ${memory_resident}MB"
            fi
        fi
        
        echo "---"
    done
}

# 发送告警
send_alert() {
    local message="$1"
    
    # 发送到Slack
    if [[ -n "${SLACK_WEBHOOK_URL:-}" ]]; then
        curl -X POST "${SLACK_WEBHOOK_URL}" \
            -H 'Content-type: application/json' \
            -d "{\"text\":\"🚨 MongoDB Alert: ${message}\"}" || true
    fi
    
    # 发送邮件
    if command -v mail &> /dev/null; then
        echo "${message}" | mail -s "MongoDB Alert" "${ALERT_EMAIL:-admin@company.com}" || true
    fi
    
    log "Alert sent: ${message}"
}

# 主循环
main() {
    log "开始MongoDB副本集监控"
    
    while true; do
        # 检查副本集健康状态
        check_replica_set_health
        
        # 检查复制延迟
        check_replication_lag
        
        # 检查性能指标
        check_performance_metrics
        
        log "监控周期完成,等待 ${CHECK_INTERVAL} 秒"
        sleep "${CHECK_INTERVAL}"
    done
}

# 信号处理
cleanup() {
    log "监控程序退出"
    exit 0
}

trap cleanup SIGTERM SIGINT

# 启动监控
main "$@"

MongoDB分片集群架构

分片集群组件

MongoDB分片集群由以下组件组成:

  1. mongos:查询路由器,客户端连接入口
  2. Config Servers:存储集群元数据的副本集
  3. Shards:存储实际数据的副本集

分片集群架构图

graph TB
    subgraph "应用层"
        APP1[Application 1]
        APP2[Application 2]
        APP3[Application 3]
    end
    
    subgraph "路由层"
        MONGOS1[mongos 1]
        MONGOS2[mongos 2]
        MONGOS3[mongos 3]
    end
    
    subgraph "配置服务器"
        CONFIG1[Config Server 1]
        CONFIG2[Config Server 2]
        CONFIG3[Config Server 3]
    end
    
    subgraph "分片1"
        SHARD1P[Shard1 Primary]
        SHARD1S1[Shard1 Secondary1]
        SHARD1S2[Shard1 Secondary2]
    end
    
    subgraph "分片2"
        SHARD2P[Shard2 Primary]
        SHARD2S1[Shard2 Secondary1]
        SHARD2S2[Shard2 Secondary2]
    end
    
    subgraph "分片3"
        SHARD3P[Shard3 Primary]
        SHARD3S1[Shard3 Secondary1]
        SHARD3S2[Shard3 Secondary2]
    end
    
    APP1 --> MONGOS1
    APP2 --> MONGOS2
    APP3 --> MONGOS3
    
    MONGOS1 --> CONFIG1
    MONGOS2 --> CONFIG2
    MONGOS3 --> CONFIG3
    
    MONGOS1 --> SHARD1P
    MONGOS1 --> SHARD2P
    MONGOS1 --> SHARD3P
    
    SHARD1P --> SHARD1S1
    SHARD1P --> SHARD1S2
    SHARD2P --> SHARD2S1
    SHARD2P --> SHARD2S2
    SHARD3P --> SHARD3S1
    SHARD3P --> SHARD3S2

分片集群部署脚本

#!/bin/bash
# scripts/deploy_mongodb_sharded_cluster.sh

set -euo pipefail

# 配置参数
MONGODB_VERSION="6.0"
MONGODB_USER="admin"
MONGODB_PASSWORD="your_secure_password"
KEYFILE_CONTENT="your_keyfile_content_here"

# Config Servers配置
CONFIG_REPLICA_SET="configReplSet"
declare -A CONFIG_SERVERS=(
    ["config1"]="192.168.1.20:27019"
    ["config2"]="192.168.1.21:27019"
    ["config3"]="192.168.1.22:27019"
)

# Shard配置
declare -A SHARD1_SERVERS=(
    ["shard1-primary"]="192.168.1.30:27018"
    ["shard1-secondary1"]="192.168.1.31:27018"
    ["shard1-secondary2"]="192.168.1.32:27018"
)

declare -A SHARD2_SERVERS=(
    ["shard2-primary"]="192.168.1.40:27018"
    ["shard2-secondary1"]="192.168.1.41:27018"
    ["shard2-secondary2"]="192.168.1.42:27018"
)

# mongos配置
declare -A MONGOS_SERVERS=(
    ["mongos1"]="192.168.1.50:27017"
    ["mongos2"]="192.168.1.51:27017"
    ["mongos3"]="192.168.1.52:27017"
)

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >&2
}

# 创建Config Server配置文件
create_config_server_config() {
    local node_name="$1"
    local config_file="/tmp/mongod-${node_name}.conf"
    
    cat > "${config_file}" << EOF
systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod-${node_name}.log
  logRotate: reopen

storage:
  dbPath: /var/lib/mongodb/${node_name}
  journal:
    enabled: true
  wiredTiger:
    engineConfig:
      cacheSizeGB: 2
      journalCompressor: snappy

processManagement:
  fork: true
  pidFilePath: /var/run/mongodb/mongod-${node_name}.pid

net:
  port: 27019
  bindIp: 0.0.0.0

security:
  authorization: enabled
  keyFile: /etc/mongodb/mongodb-keyfile

replication:
  replSetName: "${CONFIG_REPLICA_SET}"

sharding:
  clusterRole: configsvr
EOF
    
    echo "${config_file}"
}

# 创建Shard配置文件
create_shard_config() {
    local node_name="$1"
    local shard_name="$2"
    local config_file="/tmp/mongod-${node_name}.conf"
    
    cat > "${config_file}" << EOF
systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod-${node_name}.log
  logRotate: reopen

storage:
  dbPath: /var/lib/mongodb/${node_name}
  journal:
    enabled: true
  wiredTiger:
    engineConfig:
      cacheSizeGB: 4
      journalCompressor: snappy
      directoryForIndexes: true
    collectionConfig:
      blockCompressor: snappy

processManagement:
  fork: true
  pidFilePath: /var/run/mongodb/mongod-${node_name}.pid

net:
  port: 27018
  bindIp: 0.0.0.0

security:
  authorization: enabled
  keyFile: /etc/mongodb/mongodb-keyfile

replication:
  replSetName: "${shard_name}"

sharding:
  clusterRole: shardsvr
EOF
    
    echo "${config_file}"
}

# 创建mongos配置文件
create_mongos_config() {
    local node_name="$1"
    local config_file="/tmp/mongos-${node_name}.conf"
    
    # 构建config servers连接字符串
    local config_servers_str="${CONFIG_REPLICA_SET}/"
    local first=true
    for server in "${CONFIG_SERVERS[@]}"; do
        if [[ "${first}" == "true" ]]; then
            config_servers_str="${config_servers_str}${server}"
            first=false
        else
            config_servers_str="${config_servers_str},${server}"
        fi
    done
    
    cat > "${config_file}" << EOF
systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongos-${node_name}.log
  logRotate: reopen

processManagement:
  fork: true
  pidFilePath: /var/run/mongodb/mongos-${node_name}.pid

net:
  port: 27017
  bindIp: 0.0.0.0

security:
  keyFile: /etc/mongodb/mongodb-keyfile

sharding:
  configDB: ${config_servers_str}
EOF
    
    echo "${config_file}"
}

# 部署Config Servers
deploy_config_servers() {
    log "部署Config Servers..."
    
    # 部署每个config server
    for node_name in "${!CONFIG_SERVERS[@]}"; do
        local server_addr="${CONFIG_SERVERS[${node_name}]}"
        local server_ip
        server_ip=$(echo "${server_addr}" | cut -d: -f1)
        
        log "部署Config Server: ${node_name} (${server_ip})"
        
        # 创建配置文件
        local config_file
        config_file=$(create_config_server_config "${node_name}")
        
        # 创建keyfile
        local keyfile="/tmp/mongodb-keyfile"
        echo "${KEYFILE_CONTENT}" > "${keyfile}"
        chmod 600 "${keyfile}"
        
        # 复制文件到目标节点
        scp "${config_file}" "root@${server_ip}:/etc/mongod.conf"
        scp "${keyfile}" "root@${server_ip}:/etc/mongodb/mongodb-keyfile"
        
        # 创建目录和设置权限
        ssh "root@${server_ip}" "mkdir -p /var/lib/mongodb/${node_name} /var/log/mongodb /var/run/mongodb /etc/mongodb"
        ssh "root@${server_ip}" "chmod 600 /etc/mongodb/mongodb-keyfile"
        ssh "root@${server_ip}" "chown -R mongod:mongod /var/lib/mongodb /var/log/mongodb /var/run/mongodb /etc/mongodb"
        
        # 启动MongoDB
        ssh "root@${server_ip}" "systemctl start mongod"
        ssh "root@${server_ip}" "systemctl enable mongod"
        
        # 清理临时文件
        rm -f "${config_file}" "${keyfile}"
    done
    
    # 等待所有config server启动
    sleep 10
    
    # 初始化config server副本集
    local first_config_server
    first_config_server=$(echo "${CONFIG_SERVERS[config1]}" | cut -d: -f1)
    
    local rs_config=$(cat << EOF
rs.initiate({
  _id: "${CONFIG_REPLICA_SET}",
  configsvr: true,
  members: [
    { _id: 0, host: "${CONFIG_SERVERS[config1]}" },
    { _id: 1, host: "${CONFIG_SERVERS[config2]}" },
    { _id: 2, host: "${CONFIG_SERVERS[config3]}" }
  ]
})
EOF
)
    
    mongo --host "${first_config_server}:27019" --eval "${rs_config}"
    
    # 等待副本集初始化完成
    sleep 30
    
    # 创建管理员用户
    local create_user_script=$(cat << EOF
use admin
db.createUser({
  user: "${MONGODB_USER}",
  pwd: "${MONGODB_PASSWORD}",
  roles: [
    { role: "root", db: "admin" }
  ]
})
EOF
)
    
    mongo --host "${first_config_server}:27019" --eval "${create_user_script}"
    
    log "Config Servers部署完成"
}

# 部署Shard
deploy_shard() {
    local shard_name="$1"
    local -n shard_servers=$2
    
    log "部署Shard: ${shard_name}..."
    
    # 部署每个shard节点
    for node_name in "${!shard_servers[@]}"; do
        local server_addr="${shard_servers[${node_name}]}"
        local server_ip
        server_ip=$(echo "${server_addr}" | cut -d: -f1)
        
        log "部署Shard节点: ${node_name} (${server_ip})"
        
        # 创建配置文件
        local config_file
        config_file=$(create_shard_config "${node_name}" "${shard_name}")
        
        # 创建keyfile
        local keyfile="/tmp/mongodb-keyfile"
        echo "${KEYFILE_CONTENT}" > "${keyfile}"
        chmod 600 "${keyfile}"
        
        # 复制文件到目标节点
        scp "${config_file}" "root@${server_ip}:/etc/mongod.conf"
        scp "${keyfile}" "root@${server_ip}:/etc/mongodb/mongodb-keyfile"
        
        # 创建目录和设置权限
        ssh "root@${server_ip}" "mkdir -p /var/lib/mongodb/${node_name} /var/log/mongodb /var/run/mongodb /etc/mongodb"
        ssh "root@${server_ip}" "chmod 600 /etc/mongodb/mongodb-keyfile"
        ssh "root@${server_ip}" "chown -R mongod:mongod /var/lib/mongodb /var/log/mongodb /var/run/mongodb /etc/mongodb"
        
        # 启动MongoDB
        ssh "root@${server_ip}" "systemctl start mongod"
        ssh "root@${server_ip}" "systemctl enable mongod"
        
        # 清理临时文件
        rm -f "${config_file}" "${keyfile}"
    done
    
    # 等待所有shard节点启动
    sleep 10
    
    # 初始化shard副本集
    local first_shard_server=""
    for node_name in "${!shard_servers[@]}"; do
        if [[ "${node_name}" == *"primary"* ]]; then
            first_shard_server=$(echo "${shard_servers[${node_name}]}" | cut -d: -f1)
            break
        fi
    done
    
    if [[ -z "${first_shard_server}" ]]; then
        first_shard_server=$(echo "${shard_servers[$(echo "${!shard_servers[@]}" | cut -d' ' -f1)]}" | cut -d: -f1)
    fi
    
    # 构建副本集成员配置
    local members_config=""
    local member_id=0
    for node_name in "${!shard_servers[@]}"; do
        local priority=1
        if [[ "${node_name}" == *"primary"* ]]; then
            priority=2
        fi
        
        if [[ -n "${members_config}" ]]; then
            members_config="${members_config},"
        fi
        members_config="${members_config}{ _id: ${member_id}, host: \"${shard_servers[${node_name}]}\", priority: ${priority} }"
        member_id=$((member_id + 1))
    done
    
    local rs_config=$(cat << EOF
rs.initiate({
  _id: "${shard_name}",
  members: [${members_config}]
})
EOF
)
    
    mongo --host "${first_shard_server}:27018" --eval "${rs_config}"
    
    # 等待副本集初始化完成
    sleep 30
    
    log "Shard ${shard_name} 部署完成"
}

# 部署mongos
deploy_mongos() {
    log "部署mongos..."
    
    for node_name in "${!MONGOS_SERVERS[@]}"; do
        local server_addr="${MONGOS_SERVERS[${node_name}]}"
        local server_ip
        server_ip=$(echo "${server_addr}" | cut -d: -f1)
        
        log "部署mongos: ${node_name} (${server_ip})"
        
        # 创建配置文件
        local config_file
        config_file=$(create_mongos_config "${node_name}")
        
        # 创建keyfile
        local keyfile="/tmp/mongodb-keyfile"
        echo "${KEYFILE_CONTENT}" > "${keyfile}"
        chmod 600 "${keyfile}"
        
        # 复制文件到目标节点
        scp "${config_file}" "root@${server_ip}:/etc/mongos.conf"
        scp "${keyfile}" "root@${server_ip}:/etc/mongodb/mongodb-keyfile"
        
        # 创建目录和设置权限
        ssh "root@${server_ip}" "mkdir -p /var/log/mongodb /var/run/mongodb /etc/mongodb"
        ssh "root@${server_ip}" "chmod 600 /etc/mongodb/mongodb-keyfile"
        ssh "root@${server_ip}" "chown -R mongod:mongod /var/log/mongodb /var/run/mongodb /etc/mongodb"
        
        # 启动mongos
        ssh "root@${server_ip}" "mongos --config /etc/mongos.conf"
        
        # 清理临时文件
        rm -f "${config_file}" "${keyfile}"
    done
    
    log "mongos部署完成"
}

# 配置分片集群
configure_sharded_cluster() {
    log "配置分片集群..."
    
    # 连接到第一个mongos
    local first_mongos
    first_mongos=$(echo "${MONGOS_SERVERS[mongos1]}" | cut -d: -f1)
    
    # 等待mongos启动完成
    sleep 10
    
    # 添加分片
    local add_shard1_cmd="sh.addShard(\"shard1rs/${SHARD1_SERVERS[shard1-primary]},${SHARD1_SERVERS[shard1-secondary1]},${SHARD1_SERVERS[shard1-secondary2]}\")"
    local add_shard2_cmd="sh.addShard(\"shard2rs/${SHARD2_SERVERS[shard2-primary]},${SHARD2_SERVERS[shard2-secondary1]},${SHARD2_SERVERS[shard2-secondary2]}\")"
    
    mongo --host "${first_mongos}:27017" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "${add_shard1_cmd}"
    mongo --host "${first_mongos}:27017" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "${add_shard2_cmd}"
    
    # 验证分片状态
    mongo --host "${first_mongos}:27017" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "sh.status()"
    
    log "分片集群配置完成"
}

# 主函数
main() {
    log "开始部署MongoDB分片集群..."
    
    # 部署Config Servers
    deploy_config_servers
    
    # 部署Shards
    deploy_shard "shard1rs" SHARD1_SERVERS
    deploy_shard "shard2rs" SHARD2_SERVERS
    
    # 部署mongos
    deploy_mongos
    
    # 配置分片集群
    configure_sharded_cluster
    
    log "MongoDB分片集群部署完成!"
}

# 执行主函数
main "$@"

分享文章