MongoDB分布式架构设计与实践:从副本集到分片集群的完整方案
MongoDB作为领先的NoSQL文档数据库,在现代应用架构中广泛应用。随着数据量的增长和业务复杂度的提升,单机MongoDB已无法满足高可用、高性能的需求。本文将深入探讨MongoDB分布式架构的设计原理和实践方案,从基础的副本集到复杂的分片集群部署。
MongoDB分布式架构概述
MongoDB集群架构对比
| 架构模式 | 特点 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|---|
| 单机模式 | 单个MongoDB实例 | 开发测试、小型应用 | 简单易用、性能高 | 无高可用、容量受限 |
| 副本集 | 一主多从,自动故障转移 | 中等规模应用 | 高可用、读扩展 | 写性能无法扩展 |
| 分片集群 | 水平分片,分布式存储 | 大规模应用 | 水平扩展、高性能 | 复杂度高、运维成本高 |
| 混合架构 | 分片+副本集 | 企业级应用 | 高可用+高性能 | 架构复杂、资源消耗大 |
MongoDB分布式架构演进
graph TB
subgraph "单机模式"
A[MongoDB Server]
A1[Application]
A1 --> A
end
subgraph "副本集模式"
B[Primary]
B1[Secondary 1]
B2[Secondary 2]
B3[Application]
B --> B1
B --> B2
B3 --> B
B3 -.-> B1
B3 -.-> B2
end
subgraph "分片集群模式"
C[mongos]
C1[Config Server RS]
C2[Shard 1 RS]
C3[Shard 2 RS]
C4[Shard 3 RS]
C5[Application]
C --> C1
C --> C2
C --> C3
C --> C4
C5 --> C
end
MongoDB副本集配置与管理
副本集架构设计
MongoDB副本集是一组维护相同数据集的MongoDB服务器,提供冗余和高可用性。
副本集成员类型
- Primary(主节点):接收所有写操作
- Secondary(从节点):复制主节点数据,可提供读服务
- Arbiter(仲裁节点):参与选举但不存储数据
副本集配置文件
# mongod-primary.conf
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongod-primary.log
logRotate: reopen
storage:
dbPath: /var/lib/mongodb/primary
journal:
enabled: true
wiredTiger:
engineConfig:
cacheSizeGB: 4
journalCompressor: snappy
directoryForIndexes: true
collectionConfig:
blockCompressor: snappy
indexConfig:
prefixCompression: true
processManagement:
fork: true
pidFilePath: /var/run/mongodb/mongod-primary.pid
net:
port: 27017
bindIp: 0.0.0.0
maxIncomingConnections: 1000
security:
authorization: enabled
keyFile: /etc/mongodb/mongodb-keyfile
replication:
replSetName: "rs0"
oplogSizeMB: 2048
setParameter:
enableLocalhostAuthBypass: false
authenticationMechanisms: SCRAM-SHA-1,SCRAM-SHA-256
operationProfiling:
mode: slowOp
slowOpThresholdMs: 100
副本集部署脚本
#!/bin/bash
# scripts/deploy_mongodb_replica_set.sh
set -euo pipefail
# 配置参数
REPLICA_SET_NAME="rs0"
MONGODB_VERSION="6.0"
MONGODB_PORT="27017"
MONGODB_USER="admin"
MONGODB_PASSWORD="your_secure_password"
KEYFILE_CONTENT="your_keyfile_content_here"
# 节点配置
declare -A NODES=(
["primary"]="192.168.1.10"
["secondary1"]="192.168.1.11"
["secondary2"]="192.168.1.12"
)
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >&2
}
# 创建MongoDB配置文件
create_mongodb_config() {
local node_type="$1"
local node_ip="$2"
local config_file="/tmp/mongod-${node_type}.conf"
cat > "${config_file}" << EOF
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongod-${node_type}.log
logRotate: reopen
storage:
dbPath: /var/lib/mongodb/${node_type}
journal:
enabled: true
wiredTiger:
engineConfig:
cacheSizeGB: 4
journalCompressor: snappy
directoryForIndexes: true
collectionConfig:
blockCompressor: snappy
indexConfig:
prefixCompression: true
processManagement:
fork: true
pidFilePath: /var/run/mongodb/mongod-${node_type}.pid
net:
port: ${MONGODB_PORT}
bindIp: 0.0.0.0
maxIncomingConnections: 1000
security:
authorization: enabled
keyFile: /etc/mongodb/mongodb-keyfile
replication:
replSetName: "${REPLICA_SET_NAME}"
oplogSizeMB: 2048
setParameter:
enableLocalhostAuthBypass: false
authenticationMechanisms: SCRAM-SHA-1,SCRAM-SHA-256
operationProfiling:
mode: slowOp
slowOpThresholdMs: 100
EOF
echo "${config_file}"
}
# 创建keyfile
create_keyfile() {
local keyfile="/tmp/mongodb-keyfile"
echo "${KEYFILE_CONTENT}" > "${keyfile}"
chmod 600 "${keyfile}"
echo "${keyfile}"
}
# 部署MongoDB节点
deploy_mongodb_node() {
local node_type="$1"
local node_ip="$2"
log "部署MongoDB节点: ${node_type} (${node_ip})"
# 创建配置文件
local config_file
config_file=$(create_mongodb_config "${node_type}" "${node_ip}")
# 创建keyfile
local keyfile
keyfile=$(create_keyfile)
# 复制文件到目标节点
scp "${config_file}" "root@${node_ip}:/etc/mongod.conf"
scp "${keyfile}" "root@${node_ip}:/etc/mongodb/mongodb-keyfile"
# 在目标节点上创建必要的目录
ssh "root@${node_ip}" "mkdir -p /var/lib/mongodb/${node_type} /var/log/mongodb /var/run/mongodb /etc/mongodb"
# 设置权限
ssh "root@${node_ip}" "chmod 600 /etc/mongodb/mongodb-keyfile"
ssh "root@${node_ip}" "chown -R mongod:mongod /var/lib/mongodb /var/log/mongodb /var/run/mongodb /etc/mongodb"
# 启动MongoDB
ssh "root@${node_ip}" "systemctl start mongod"
ssh "root@${node_ip}" "systemctl enable mongod"
# 检查启动状态
sleep 5
if ssh "root@${node_ip}" "systemctl is-active mongod" | grep -q "active"; then
log "MongoDB节点 ${node_type} 启动成功"
else
log "ERROR: MongoDB节点 ${node_type} 启动失败"
exit 1
fi
# 清理临时文件
rm -f "${config_file}" "${keyfile}"
}
# 初始化副本集
initialize_replica_set() {
log "初始化副本集..."
local primary_ip="${NODES[primary]}"
# 等待MongoDB启动完成
sleep 10
# 创建副本集配置
local rs_config=$(cat << EOF
rs.initiate({
_id: "${REPLICA_SET_NAME}",
members: [
{ _id: 0, host: "${NODES[primary]}:${MONGODB_PORT}", priority: 2 },
{ _id: 1, host: "${NODES[secondary1]}:${MONGODB_PORT}", priority: 1 },
{ _id: 2, host: "${NODES[secondary2]}:${MONGODB_PORT}", priority: 1 }
]
})
EOF
)
# 连接到主节点并初始化副本集
mongo --host "${primary_ip}:${MONGODB_PORT}" --eval "${rs_config}"
# 等待副本集初始化完成
log "等待副本集初始化完成..."
sleep 30
# 创建管理员用户
local create_user_script=$(cat << EOF
use admin
db.createUser({
user: "${MONGODB_USER}",
pwd: "${MONGODB_PASSWORD}",
roles: [
{ role: "root", db: "admin" }
]
})
EOF
)
mongo --host "${primary_ip}:${MONGODB_PORT}" --eval "${create_user_script}"
log "副本集初始化完成"
}
# 验证副本集状态
verify_replica_set() {
log "验证副本集状态..."
local primary_ip="${NODES[primary]}"
# 检查副本集状态
local rs_status
rs_status=$(mongo --host "${primary_ip}:${MONGODB_PORT}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "rs.status()" --quiet)
echo "${rs_status}"
# 检查副本集配置
local rs_config
rs_config=$(mongo --host "${primary_ip}:${MONGODB_PORT}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "rs.conf()" --quiet)
echo "${rs_config}"
}
# 测试副本集功能
test_replica_set() {
log "测试副本集功能..."
local primary_ip="${NODES[primary]}"
# 测试写入数据
local test_write=$(cat << 'EOF'
use testdb
db.testcollection.insertOne({
name: "test_document",
timestamp: new Date(),
data: "This is a test document for replica set"
})
EOF
)
mongo --host "${primary_ip}:${MONGODB_PORT}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "${test_write}"
# 等待复制完成
sleep 5
# 从从节点读取数据
local secondary_ip="${NODES[secondary1]}"
local test_read=$(cat << 'EOF'
rs.slaveOk()
use testdb
db.testcollection.findOne({name: "test_document"})
EOF
)
local read_result
read_result=$(mongo --host "${secondary_ip}:${MONGODB_PORT}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "${test_read}" --quiet)
if echo "${read_result}" | grep -q "test_document"; then
log "副本集功能测试成功"
else
log "ERROR: 副本集功能测试失败"
exit 1
fi
}
# 主函数
main() {
log "开始部署MongoDB副本集..."
# 部署所有节点
for node_type in "${!NODES[@]}"; do
deploy_mongodb_node "${node_type}" "${NODES[${node_type}]}"
done
# 初始化副本集
initialize_replica_set
# 验证副本集状态
verify_replica_set
# 测试副本集功能
test_replica_set
log "MongoDB副本集部署完成!"
}
# 执行主函数
main "$@"
MongoDB数据分布策略
分片键选择原则
选择合适的分片键是分片集群性能的关键因素:
分片键特征要求
- 高基数(High Cardinality):分片键值应该有足够的唯一性
- 低频率(Low Frequency):避免某个值出现过于频繁
- 非单调性(Non-Monotonic):避免热点写入
常见分片键策略
// 1. 哈希分片键 - 适用于单调递增的字段
db.users.createIndex({ "user_id": "hashed" })
sh.shardCollection("myapp.users", { "user_id": "hashed" })
// 2. 范围分片键 - 适用于范围查询
db.orders.createIndex({ "order_date": 1, "customer_id": 1 })
sh.shardCollection("myapp.orders", { "order_date": 1, "customer_id": 1 })
// 3. 复合分片键 - 提高查询性能
db.products.createIndex({ "category": 1, "product_id": 1 })
sh.shardCollection("myapp.products", { "category": 1, "product_id": 1 })
// 4. 标签感知分片 - 地理位置分布
sh.addShardTag("shard1rs", "US-EAST")
sh.addShardTag("shard2rs", "US-WEST")
sh.addTagRange("myapp.users", { "region": "east" }, { "region": "east\uffff" }, "US-EAST")
sh.addTagRange("myapp.users", { "region": "west" }, { "region": "west\uffff" }, "US-WEST")
数据分布监控脚本
#!/usr/bin/env python3
# scripts/mongodb_shard_distribution_monitor.py
import pymongo
import json
import time
import argparse
from datetime import datetime
from collections import defaultdict
class MongoShardMonitor:
def __init__(self, mongos_uri, username, password):
self.client = pymongo.MongoClient(
mongos_uri,
username=username,
password=password,
authSource='admin'
)
self.admin_db = self.client.admin
def get_shard_distribution(self, database_name, collection_name):
"""获取集合的分片分布信息"""
try:
# 获取集合统计信息
stats = self.admin_db.command("collStats", f"{database_name}.{collection_name}")
shard_distribution = {}
if 'sharded' in stats and stats['sharded']:
for shard_name, shard_stats in stats['shards'].items():
shard_distribution[shard_name] = {
'count': shard_stats.get('count', 0),
'size': shard_stats.get('size', 0),
'avgObjSize': shard_stats.get('avgObjSize', 0),
'storageSize': shard_stats.get('storageSize', 0),
'indexes': shard_stats.get('nindexes', 0),
'indexSize': shard_stats.get('totalIndexSize', 0)
}
else:
# 非分片集合
shard_distribution['unsharded'] = {
'count': stats.get('count', 0),
'size': stats.get('size', 0),
'avgObjSize': stats.get('avgObjSize', 0),
'storageSize': stats.get('storageSize', 0),
'indexes': stats.get('nindexes', 0),
'indexSize': stats.get('totalIndexSize', 0)
}
return shard_distribution
except Exception as e:
print(f"Error getting shard distribution for {database_name}.{collection_name}: {e}")
return {}
def get_chunk_distribution(self, database_name, collection_name):
"""获取chunk分布信息"""
try:
config_db = self.client.config
# 获取chunk信息
chunks = list(config_db.chunks.find({
"ns": f"{database_name}.{collection_name}"
}))
chunk_distribution = defaultdict(int)
for chunk in chunks:
shard = chunk['shard']
chunk_distribution[shard] += 1
return dict(chunk_distribution)
except Exception as e:
print(f"Error getting chunk distribution for {database_name}.{collection_name}: {e}")
return {}
def get_balancer_status(self):
"""获取均衡器状态"""
try:
balancer_status = self.admin_db.command("balancerStatus")
return balancer_status
except Exception as e:
print(f"Error getting balancer status: {e}")
return {}
def get_shard_list(self):
"""获取分片列表"""
try:
shards = list(self.admin_db.command("listShards")['shards'])
return shards
except Exception as e:
print(f"Error getting shard list: {e}")
return []
def analyze_distribution_balance(self, shard_distribution):
"""分析分布均衡性"""
if not shard_distribution:
return {}
total_docs = sum(shard['count'] for shard in shard_distribution.values())
total_size = sum(shard['size'] for shard in shard_distribution.values())
if total_docs == 0:
return {}
shard_count = len(shard_distribution)
expected_docs_per_shard = total_docs / shard_count
expected_size_per_shard = total_size / shard_count
balance_analysis = {
'total_documents': total_docs,
'total_size_bytes': total_size,
'shard_count': shard_count,
'expected_docs_per_shard': expected_docs_per_shard,
'expected_size_per_shard': expected_size_per_shard,
'shard_details': {}
}
for shard_name, stats in shard_distribution.items():
doc_variance = abs(stats['count'] - expected_docs_per_shard) / expected_docs_per_shard * 100
size_variance = abs(stats['size'] - expected_size_per_shard) / expected_size_per_shard * 100 if expected_size_per_shard > 0 else 0
balance_analysis['shard_details'][shard_name] = {
'documents': stats['count'],
'size_bytes': stats['size'],
'doc_percentage': stats['count'] / total_docs * 100,
'size_percentage': stats['size'] / total_size * 100 if total_size > 0 else 0,
'doc_variance_percent': doc_variance,
'size_variance_percent': size_variance,
'balance_score': (doc_variance + size_variance) / 2
}
return balance_analysis
def generate_report(self, databases_collections):
"""生成分布报告"""
report = {
'timestamp': datetime.now().isoformat(),
'cluster_info': {
'shards': self.get_shard_list(),
'balancer_status': self.get_balancer_status()
},
'collections': {}
}
for db_name, collections in databases_collections.items():
for collection_name in collections:
collection_key = f"{db_name}.{collection_name}"
print(f"Analyzing {collection_key}...")
shard_distribution = self.get_shard_distribution(db_name, collection_name)
chunk_distribution = self.get_chunk_distribution(db_name, collection_name)
balance_analysis = self.analyze_distribution_balance(shard_distribution)
report['collections'][collection_key] = {
'shard_distribution': shard_distribution,
'chunk_distribution': chunk_distribution,
'balance_analysis': balance_analysis
}
return report
def print_summary_report(self, report):
"""打印摘要报告"""
print("\n" + "="*80)
print("MongoDB分片集群分布报告")
print("="*80)
print(f"生成时间: {report['timestamp']}")
# 集群信息
print(f"\n集群信息:")
print(f" 分片数量: {len(report['cluster_info']['shards'])}")
balancer = report['cluster_info']['balancer_status']
if balancer:
print(f" 均衡器状态: {'启用' if balancer.get('mode') == 'full' else '禁用'}")
if 'inBalancerRound' in balancer:
print(f" 正在均衡: {'是' if balancer['inBalancerRound'] else '否'}")
# 集合分析
print(f"\n集合分析:")
for collection_name, collection_data in report['collections'].items():
print(f"\n 集合: {collection_name}")
balance_analysis = collection_data['balance_analysis']
if balance_analysis:
print(f" 总文档数: {balance_analysis['total_documents']:,}")
print(f" 总大小: {balance_analysis['total_size_bytes']:,} bytes")
print(f" 分片数量: {balance_analysis['shard_count']}")
# 分片详情
print(f" 分片分布:")
for shard_name, shard_details in balance_analysis['shard_details'].items():
print(f" {shard_name}:")
print(f" 文档数: {shard_details['documents']:,} ({shard_details['doc_percentage']:.1f}%)")
print(f" 大小: {shard_details['size_bytes']:,} bytes ({shard_details['size_percentage']:.1f}%)")
print(f" 均衡评分: {shard_details['balance_score']:.1f}")
# 警告不均衡的分片
if shard_details['balance_score'] > 20:
print(f" ⚠️ 警告: 分布不均衡!")
# Chunk分布
chunk_distribution = collection_data['chunk_distribution']
if chunk_distribution:
print(f" Chunk分布:")
for shard_name, chunk_count in chunk_distribution.items():
print(f" {shard_name}: {chunk_count} chunks")
def save_report(self, report, filename):
"""保存报告到文件"""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False, default=str)
print(f"\n报告已保存到: {filename}")
def main():
parser = argparse.ArgumentParser(description='MongoDB分片分布监控工具')
parser.add_argument('--mongos-uri', required=True, help='mongos连接URI')
parser.add_argument('--username', required=True, help='MongoDB用户名')
parser.add_argument('--password', required=True, help='MongoDB密码')
parser.add_argument('--databases', required=True, help='要监控的数据库和集合,格式: db1:col1,col2;db2:col3')
parser.add_argument('--output', help='输出文件名')
parser.add_argument('--interval', type=int, default=0, help='监控间隔(秒),0表示只运行一次')
args = parser.parse_args()
# 解析数据库和集合
databases_collections = {}
for db_spec in args.databases.split(';'):
db_name, collections_str = db_spec.split(':')
databases_collections[db_name] = collections_str.split(',')
monitor = MongoShardMonitor(args.mongos_uri, args.username, args.password)
try:
while True:
report = monitor.generate_report(databases_collections)
monitor.print_summary_report(report)
if args.output:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{args.output}_{timestamp}.json"
monitor.save_report(report, filename)
if args.interval <= 0:
break
print(f"\n等待 {args.interval} 秒后进行下次检查...")
time.sleep(args.interval)
except KeyboardInterrupt:
print("\n监控已停止")
except Exception as e:
print(f"监控过程中发生错误: {e}")
if __name__ == "__main__":
main()
MongoDB性能优化
索引优化策略
// 1. 复合索引优化
// ESR规则:Equality, Sort, Range
db.orders.createIndex({
"status": 1, // Equality
"created_at": -1, // Sort
"amount": 1 // Range
})
// 2. 部分索引 - 减少索引大小
db.users.createIndex(
{ "email": 1 },
{
partialFilterExpression: {
"email": { $exists: true, $ne: null }
}
}
)
// 3. 稀疏索引 - 只索引存在字段的文档
db.products.createIndex(
{ "discount_code": 1 },
{ sparse: true }
)
// 4. TTL索引 - 自动删除过期数据
db.sessions.createIndex(
{ "created_at": 1 },
{ expireAfterSeconds: 3600 }
)
// 5. 文本索引 - 全文搜索
db.articles.createIndex({
"title": "text",
"content": "text"
}, {
weights: {
"title": 10,
"content": 5
},
name: "article_text_index"
})
查询优化脚本
#!/usr/bin/env python3
# scripts/mongodb_query_optimizer.py
import pymongo
import time
import json
from datetime import datetime, timedelta
class MongoQueryOptimizer:
def __init__(self, connection_uri, username, password):
self.client = pymongo.MongoClient(
connection_uri,
username=username,
password=password,
authSource='admin'
)
def analyze_slow_queries(self, database_name, threshold_ms=100):
"""分析慢查询"""
db = self.client[database_name]
# 启用profiler
db.set_profiling_level(2, slow_ms=threshold_ms)
print(f"分析数据库 {database_name} 的慢查询 (阈值: {threshold_ms}ms)")
# 等待收集一些查询数据
time.sleep(60)
# 获取慢查询
slow_queries = list(db.system.profile.find({
"ts": {"$gte": datetime.now() - timedelta(minutes=5)},
"millis": {"$gte": threshold_ms}
}).sort("millis", -1).limit(20))
return slow_queries
def suggest_indexes(self, slow_queries):
"""根据慢查询建议索引"""
index_suggestions = []
for query in slow_queries:
suggestion = {
'collection': query.get('ns', '').split('.')[-1],
'query_pattern': query.get('command', {}),
'execution_time_ms': query.get('millis', 0),
'suggested_indexes': []
}
# 分析查询模式
command = query.get('command', {})
if 'find' in command:
filter_fields = command.get('filter', {})
sort_fields = command.get('sort', {})
# 构建建议的复合索引
index_fields = []
# 添加等值查询字段
for field, value in filter_fields.items():
if not isinstance(value, dict):
index_fields.append((field, 1))
# 添加排序字段
for field, direction in sort_fields.items():
if (field, direction) not in index_fields:
index_fields.append((field, direction))
# 添加范围查询字段
for field, value in filter_fields.items():
if isinstance(value, dict) and any(op in value for op in ['$gt', '$gte', '$lt', '$lte']):
if (field, 1) not in index_fields:
index_fields.append((field, 1))
if index_fields:
suggestion['suggested_indexes'].append({
'fields': index_fields,
'type': 'compound'
})
if suggestion['suggested_indexes']:
index_suggestions.append(suggestion)
return index_suggestions
def analyze_index_usage(self, database_name, collection_name):
"""分析索引使用情况"""
db = self.client[database_name]
collection = db[collection_name]
# 获取索引统计
index_stats = list(collection.aggregate([
{"$indexStats": {}}
]))
# 获取索引信息
indexes = collection.list_indexes()
analysis = {
'collection': f"{database_name}.{collection_name}",
'indexes': [],
'recommendations': []
}
for index_stat in index_stats:
index_name = index_stat['name']
usage_count = index_stat['accesses']['ops']
# 查找对应的索引定义
index_def = None
for idx in indexes:
if idx['name'] == index_name:
index_def = idx
break
index_info = {
'name': index_name,
'usage_count': usage_count,
'definition': index_def['key'] if index_def else {},
'size_bytes': 0 # 需要通过collStats获取
}
# 添加建议
if usage_count == 0 and index_name != '_id_':
analysis['recommendations'].append({
'type': 'unused_index',
'index_name': index_name,
'suggestion': f"考虑删除未使用的索引: {index_name}"
})
analysis['indexes'].append(index_info)
return analysis
def generate_optimization_report(self, databases):
"""生成优化报告"""
report = {
'timestamp': datetime.now().isoformat(),
'databases': {}
}
for db_name in databases:
print(f"分析数据库: {db_name}")
db_report = {
'slow_queries': [],
'index_suggestions': [],
'collections': {}
}
# 分析慢查询
slow_queries = self.analyze_slow_queries(db_name)
db_report['slow_queries'] = slow_queries
# 生成索引建议
index_suggestions = self.suggest_indexes(slow_queries)
db_report['index_suggestions'] = index_suggestions
# 分析每个集合的索引使用情况
db = self.client[db_name]
collections = db.list_collection_names()
for collection_name in collections:
if not collection_name.startswith('system.'):
print(f" 分析集合: {collection_name}")
collection_analysis = self.analyze_index_usage(db_name, collection_name)
db_report['collections'][collection_name] = collection_analysis
report['databases'][db_name] = db_report
return report
def print_optimization_report(self, report):
"""打印优化报告"""
print("\n" + "="*80)
print("MongoDB查询优化报告")
print("="*80)
print(f"生成时间: {report['timestamp']}")
for db_name, db_report in report['databases'].items():
print(f"\n数据库: {db_name}")
print("-" * 40)
# 慢查询统计
slow_queries = db_report['slow_queries']
print(f"慢查询数量: {len(slow_queries)}")
if slow_queries:
print("最慢的5个查询:")
for i, query in enumerate(slow_queries[:5]):
collection = query.get('ns', '').split('.')[-1]
duration = query.get('millis', 0)
print(f" {i+1}. 集合: {collection}, 耗时: {duration}ms")
# 索引建议
index_suggestions = db_report['index_suggestions']
if index_suggestions:
print(f"\n索引建议 ({len(index_suggestions)} 个):")
for suggestion in index_suggestions:
collection = suggestion['collection']
print(f" 集合: {collection}")
for idx_suggestion in suggestion['suggested_indexes']:
fields = idx_suggestion['fields']
print(f" 建议索引: {fields}")
# 集合分析
print(f"\n集合索引分析:")
for collection_name, collection_analysis in db_report['collections'].items():
print(f" 集合: {collection_name}")
unused_indexes = [rec for rec in collection_analysis['recommendations']
if rec['type'] == 'unused_index']
if unused_indexes:
print(f" 未使用索引: {len(unused_indexes)} 个")
for rec in unused_indexes:
print(f" - {rec['index_name']}")
def main():
# 配置参数
MONGOS_URI = "mongodb://localhost:27017"
USERNAME = "admin"
PASSWORD = "your_password"
DATABASES = ["myapp", "analytics"]
optimizer = MongoQueryOptimizer(MONGOS_URI, USERNAME, PASSWORD)
try:
report = optimizer.generate_optimization_report(DATABASES)
optimizer.print_optimization_report(report)
# 保存报告
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"mongodb_optimization_report_{timestamp}.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False, default=str)
print(f"\n详细报告已保存到: {filename}")
except Exception as e:
print(f"优化分析过程中发生错误: {e}")
if __name__ == "__main__":
main()
故障转移与灾难恢复
自动故障转移配置
# 副本集故障转移配置
replication:
replSetName: "rs0"
enableMajorityReadConcern: true
# 选举超时配置
settings:
electionTimeoutMillis: 10000
heartbeatIntervalMillis: 2000
heartbeatTimeoutSecs: 10
catchUpTimeoutMillis: 60000
备份恢复脚本
#!/bin/bash
# scripts/mongodb_backup_restore.sh
set -euo pipefail
# 配置参数
MONGODB_URI="mongodb://admin:password@localhost:27017/admin"
BACKUP_DIR="/backup/mongodb"
RETENTION_DAYS=7
S3_BUCKET="mongodb-backups"
ENCRYPTION_KEY="your-encryption-key"
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >&2
}
# 创建备份
create_backup() {
local backup_type="$1" # full, incremental
local timestamp
timestamp=$(date '+%Y%m%d_%H%M%S')
local backup_path="${BACKUP_DIR}/${backup_type}_${timestamp}"
log "开始创建 ${backup_type} 备份..."
mkdir -p "${backup_path}"
case "${backup_type}" in
"full")
# 全量备份
mongodump --uri="${MONGODB_URI}" --out="${backup_path}" --gzip
;;
"incremental")
# 增量备份(基于oplog)
local last_backup_time
last_backup_time=$(find "${BACKUP_DIR}" -name "full_*" -type d | sort | tail -1 | xargs basename | cut -d_ -f2-)
if [[ -n "${last_backup_time}" ]]; then
local start_time
start_time=$(date -d "${last_backup_time:0:8} ${last_backup_time:9:2}:${last_backup_time:11:2}:${last_backup_time:13:2}" '+%s')
mongodump --uri="${MONGODB_URI}" --out="${backup_path}" \
--oplog --query "{\"ts\": {\"\$gte\": {\"t\": ${start_time}, \"i\": 0}}}" --gzip
else
log "ERROR: 未找到全量备份,无法创建增量备份"
return 1
fi
;;
esac
# 压缩备份
tar -czf "${backup_path}.tar.gz" -C "${BACKUP_DIR}" "$(basename "${backup_path}")"
rm -rf "${backup_path}"
# 加密备份
if [[ -n "${ENCRYPTION_KEY}" ]]; then
openssl enc -aes-256-cbc -salt -in "${backup_path}.tar.gz" \
-out "${backup_path}.tar.gz.enc" -k "${ENCRYPTION_KEY}"
rm "${backup_path}.tar.gz"
backup_file="${backup_path}.tar.gz.enc"
else
backup_file="${backup_path}.tar.gz"
fi
# 上传到S3
if command -v aws &> /dev/null && [[ -n "${S3_BUCKET}" ]]; then
aws s3 cp "${backup_file}" "s3://${S3_BUCKET}/$(basename "${backup_file}")"
log "备份已上传到S3: s3://${S3_BUCKET}/$(basename "${backup_file}")"
fi
log "${backup_type} 备份完成: ${backup_file}"
echo "${backup_file}"
}
# 恢复备份
restore_backup() {
local backup_file="$1"
local target_uri="$2"
local restore_path="/tmp/mongodb_restore_$(date '+%Y%m%d_%H%M%S')"
log "开始恢复备份: ${backup_file}"
mkdir -p "${restore_path}"
# 解密备份
if [[ "${backup_file}" == *.enc ]]; then
openssl enc -aes-256-cbc -d -in "${backup_file}" \
-out "${restore_path}/backup.tar.gz" -k "${ENCRYPTION_KEY}"
backup_file="${restore_path}/backup.tar.gz"
fi
# 解压备份
tar -xzf "${backup_file}" -C "${restore_path}"
# 查找备份目录
local backup_dir
backup_dir=$(find "${restore_path}" -type d -name "*_*" | head -1)
if [[ -z "${backup_dir}" ]]; then
log "ERROR: 未找到有效的备份目录"
return 1
fi
# 恢复数据
mongorestore --uri="${target_uri}" --dir="${backup_dir}" --gzip --drop
# 清理临时文件
rm -rf "${restore_path}"
log "备份恢复完成"
}
# 清理旧备份
cleanup_old_backups() {
log "清理 ${RETENTION_DAYS} 天前的备份..."
find "${BACKUP_DIR}" -name "*.tar.gz*" -mtime +${RETENTION_DAYS} -delete
# 清理S3中的旧备份
if command -v aws &> /dev/null && [[ -n "${S3_BUCKET}" ]]; then
local cutoff_date
cutoff_date=$(date -d "${RETENTION_DAYS} days ago" '+%Y-%m-%d')
aws s3 ls "s3://${S3_BUCKET}/" | while read -r line; do
local file_date
file_date=$(echo "${line}" | awk '{print $1}')
local file_name
file_name=$(echo "${line}" | awk '{print $4}')
if [[ "${file_date}" < "${cutoff_date}" ]]; then
aws s3 rm "s3://${S3_BUCKET}/${file_name}"
log "已删除S3中的旧备份: ${file_name}"
fi
done
fi
log "旧备份清理完成"
}
# 验证备份
verify_backup() {
local backup_file="$1"
local temp_restore_path="/tmp/mongodb_verify_$(date '+%Y%m%d_%H%M%S')"
log "验证备份: ${backup_file}"
mkdir -p "${temp_restore_path}"
# 解密和解压
if [[ "${backup_file}" == *.enc ]]; then
openssl enc -aes-256-cbc -d -in "${backup_file}" \
-out "${temp_restore_path}/backup.tar.gz" -k "${ENCRYPTION_KEY}"
tar -tzf "${temp_restore_path}/backup.tar.gz" > /dev/null
else
tar -tzf "${backup_file}" > /dev/null
fi
if [[ $? -eq 0 ]]; then
log "备份验证成功: ${backup_file}"
rm -rf "${temp_restore_path}"
return 0
else
log "ERROR: 备份验证失败: ${backup_file}"
rm -rf "${temp_restore_path}"
return 1
fi
}
# 主函数
main() {
local action="$1"
case "${action}" in
"backup")
local backup_type="${2:-full}"
mkdir -p "${BACKUP_DIR}"
backup_file=$(create_backup "${backup_type}")
verify_backup "${backup_file}"
cleanup_old_backups
;;
"restore")
local backup_file="$2"
local target_uri="${3:-${MONGODB_URI}}"
restore_backup "${backup_file}" "${target_uri}"
;;
"verify")
local backup_file="$2"
verify_backup "${backup_file}"
;;
"cleanup")
cleanup_old_backups
;;
*)
echo "用法: $0 {backup|restore|verify|cleanup} [参数...]"
echo " backup [full|incremental] - 创建备份"
echo " restore <backup_file> [target_uri] - 恢复备份"
echo " verify <backup_file> - 验证备份"
echo " cleanup - 清理旧备份"
exit 1
;;
esac
}
# 执行主函数
main "$@"
总结
MongoDB分布式架构为现代应用提供了强大的数据存储和处理能力。通过合理的架构设计、正确的分片策略、完善的监控体系和可靠的备份恢复机制,可以构建出高可用、高性能、可扩展的MongoDB集群。
关键要点
- 架构选择:根据业务需求选择合适的架构模式
- 分片策略:选择合适的分片键,确保数据均匀分布
- 性能优化:通过索引优化、查询优化提升性能
- 监控告警:建立完善的监控体系,及时发现问题
- 备份恢复:制定完善的备份策略,确保数据安全
MongoDB分布式架构的成功实施需要深入理解其工作原理,结合业务特点进行定制化设计,并在实践中不断优化和完善。
副本集监控脚本
#!/bin/bash
# scripts/mongodb_replica_set_monitor.sh
set -euo pipefail
# 配置参数
REPLICA_SET_NAME="rs0"
MONGODB_NODES=("192.168.1.10:27017" "192.168.1.11:27017" "192.168.1.12:27017")
MONGODB_USER="admin"
MONGODB_PASSWORD="your_secure_password"
CHECK_INTERVAL="${CHECK_INTERVAL:-60}"
ALERT_THRESHOLD_LAG="${ALERT_THRESHOLD_LAG:-10}"
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >&2
}
# 检查MongoDB连接
check_mongodb_connection() {
local host_port="$1"
if mongo --host "${host_port}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "db.runCommand('ping')" --quiet > /dev/null 2>&1; then
return 0
else
return 1
fi
}
# 获取副本集状态
get_replica_set_status() {
local primary_node=""
# 找到主节点
for node in "${MONGODB_NODES[@]}"; do
if check_mongodb_connection "${node}"; then
local is_master
is_master=$(mongo --host "${node}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "db.isMaster().ismaster" --quiet 2>/dev/null || echo "false")
if [[ "${is_master}" == "true" ]]; then
primary_node="${node}"
break
fi
fi
done
if [[ -z "${primary_node}" ]]; then
log "ERROR: 无法找到主节点"
send_alert "MongoDB副本集无法找到主节点"
return 1
fi
log "主节点: ${primary_node}"
# 获取副本集状态
local rs_status
rs_status=$(mongo --host "${primary_node}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "rs.status()" --quiet 2>/dev/null || echo "")
if [[ -z "${rs_status}" ]]; then
log "ERROR: 无法获取副本集状态"
send_alert "MongoDB副本集状态获取失败"
return 1
fi
echo "${rs_status}"
}
# 检查副本集健康状态
check_replica_set_health() {
log "检查副本集健康状态..."
local rs_status
rs_status=$(get_replica_set_status)
if [[ $? -ne 0 ]]; then
return 1
fi
# 解析副本集状态
local members_info
members_info=$(echo "${rs_status}" | grep -A 20 "members" || echo "")
local healthy_members=0
local total_members=0
local primary_count=0
local secondary_count=0
# 统计成员状态
while IFS= read -r line; do
if echo "${line}" | grep -q '"state"'; then
total_members=$((total_members + 1))
if echo "${line}" | grep -q '"state" : 1'; then
# Primary
healthy_members=$((healthy_members + 1))
primary_count=$((primary_count + 1))
elif echo "${line}" | grep -q '"state" : 2'; then
# Secondary
healthy_members=$((healthy_members + 1))
secondary_count=$((secondary_count + 1))
fi
fi
done <<< "${members_info}"
log "副本集成员状态: 总数=${total_members}, 健康=${healthy_members}, 主节点=${primary_count}, 从节点=${secondary_count}"
# 检查主节点数量
if [[ "${primary_count}" -ne 1 ]]; then
log "ERROR: 主节点数量异常: ${primary_count}"
send_alert "MongoDB副本集主节点数量异常: ${primary_count}"
fi
# 检查健康成员比例
if [[ "${total_members}" -gt 0 ]]; then
local health_ratio
health_ratio=$((healthy_members * 100 / total_members))
if [[ "${health_ratio}" -lt 67 ]]; then
log "WARNING: 副本集健康成员比例过低: ${health_ratio}%"
send_alert "MongoDB副本集健康成员比例过低: ${health_ratio}%"
fi
fi
}
# 检查复制延迟
check_replication_lag() {
log "检查复制延迟..."
local primary_node=""
# 找到主节点
for node in "${MONGODB_NODES[@]}"; do
if check_mongodb_connection "${node}"; then
local is_master
is_master=$(mongo --host "${node}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "db.isMaster().ismaster" --quiet 2>/dev/null || echo "false")
if [[ "${is_master}" == "true" ]]; then
primary_node="${node}"
break
fi
fi
done
if [[ -z "${primary_node}" ]]; then
return 1
fi
# 获取主节点optime
local primary_optime
primary_optime=$(mongo --host "${primary_node}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "rs.status().members.find(m => m.self).optimeDate" --quiet 2>/dev/null || echo "")
if [[ -z "${primary_optime}" ]]; then
log "WARNING: 无法获取主节点optime"
return 1
fi
# 检查每个从节点的延迟
for node in "${MONGODB_NODES[@]}"; do
if [[ "${node}" == "${primary_node}" ]]; then
continue
fi
if check_mongodb_connection "${node}"; then
local secondary_optime
secondary_optime=$(mongo --host "${node}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "rs.status().members.find(m => m.self).optimeDate" --quiet 2>/dev/null || echo "")
if [[ -n "${secondary_optime}" ]]; then
# 计算延迟(简化计算,实际应该解析时间戳)
local lag_seconds
lag_seconds=$(mongo --host "${primary_node}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "
var primary = rs.status().members.find(m => m.self).optimeDate;
var secondary = rs.status().members.find(m => m.name === '${node}').optimeDate;
Math.floor((primary - secondary) / 1000);
" --quiet 2>/dev/null || echo "-1")
if [[ "${lag_seconds}" -eq -1 ]]; then
log "WARNING: 无法计算节点 ${node} 的复制延迟"
elif [[ "${lag_seconds}" -gt "${ALERT_THRESHOLD_LAG}" ]]; then
log "WARNING: 节点 ${node} 复制延迟过高: ${lag_seconds} 秒"
send_alert "MongoDB节点复制延迟过高: ${node} ${lag_seconds}秒"
else
log "节点 ${node} 复制延迟正常: ${lag_seconds} 秒"
fi
fi
else
log "ERROR: 无法连接到节点 ${node}"
send_alert "MongoDB节点连接失败: ${node}"
fi
done
}
# 检查性能指标
check_performance_metrics() {
log "检查性能指标..."
for node in "${MONGODB_NODES[@]}"; do
if ! check_mongodb_connection "${node}"; then
continue
fi
log "检查节点 ${node} 性能指标:"
# 获取服务器状态
local server_status
server_status=$(mongo --host "${node}" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "db.serverStatus()" --quiet 2>/dev/null || echo "")
if [[ -n "${server_status}" ]]; then
# 解析关键指标
local connections
local memory_resident
local memory_virtual
local opcounters_insert
local opcounters_query
local opcounters_update
local opcounters_delete
connections=$(echo "${server_status}" | grep -A 5 '"connections"' | grep '"current"' | grep -o '[0-9]\+' || echo "0")
memory_resident=$(echo "${server_status}" | grep -A 10 '"mem"' | grep '"resident"' | grep -o '[0-9]\+' || echo "0")
memory_virtual=$(echo "${server_status}" | grep -A 10 '"mem"' | grep '"virtual"' | grep -o '[0-9]\+' || echo "0")
log " 连接数: ${connections}"
log " 常驻内存: ${memory_resident} MB"
log " 虚拟内存: ${memory_virtual} MB"
# 检查连接数阈值
if [[ "${connections}" -gt 800 ]]; then
log "WARNING: 节点 ${node} 连接数过高: ${connections}"
send_alert "MongoDB节点连接数过高: ${node} ${connections}"
fi
# 检查内存使用
if [[ "${memory_resident}" -gt 4096 ]]; then
log "WARNING: 节点 ${node} 内存使用过高: ${memory_resident} MB"
send_alert "MongoDB节点内存使用过高: ${node} ${memory_resident}MB"
fi
fi
echo "---"
done
}
# 发送告警
send_alert() {
local message="$1"
# 发送到Slack
if [[ -n "${SLACK_WEBHOOK_URL:-}" ]]; then
curl -X POST "${SLACK_WEBHOOK_URL}" \
-H 'Content-type: application/json' \
-d "{\"text\":\"🚨 MongoDB Alert: ${message}\"}" || true
fi
# 发送邮件
if command -v mail &> /dev/null; then
echo "${message}" | mail -s "MongoDB Alert" "${ALERT_EMAIL:-admin@company.com}" || true
fi
log "Alert sent: ${message}"
}
# 主循环
main() {
log "开始MongoDB副本集监控"
while true; do
# 检查副本集健康状态
check_replica_set_health
# 检查复制延迟
check_replication_lag
# 检查性能指标
check_performance_metrics
log "监控周期完成,等待 ${CHECK_INTERVAL} 秒"
sleep "${CHECK_INTERVAL}"
done
}
# 信号处理
cleanup() {
log "监控程序退出"
exit 0
}
trap cleanup SIGTERM SIGINT
# 启动监控
main "$@"
MongoDB分片集群架构
分片集群组件
MongoDB分片集群由以下组件组成:
- mongos:查询路由器,客户端连接入口
- Config Servers:存储集群元数据的副本集
- Shards:存储实际数据的副本集
分片集群架构图
graph TB
subgraph "应用层"
APP1[Application 1]
APP2[Application 2]
APP3[Application 3]
end
subgraph "路由层"
MONGOS1[mongos 1]
MONGOS2[mongos 2]
MONGOS3[mongos 3]
end
subgraph "配置服务器"
CONFIG1[Config Server 1]
CONFIG2[Config Server 2]
CONFIG3[Config Server 3]
end
subgraph "分片1"
SHARD1P[Shard1 Primary]
SHARD1S1[Shard1 Secondary1]
SHARD1S2[Shard1 Secondary2]
end
subgraph "分片2"
SHARD2P[Shard2 Primary]
SHARD2S1[Shard2 Secondary1]
SHARD2S2[Shard2 Secondary2]
end
subgraph "分片3"
SHARD3P[Shard3 Primary]
SHARD3S1[Shard3 Secondary1]
SHARD3S2[Shard3 Secondary2]
end
APP1 --> MONGOS1
APP2 --> MONGOS2
APP3 --> MONGOS3
MONGOS1 --> CONFIG1
MONGOS2 --> CONFIG2
MONGOS3 --> CONFIG3
MONGOS1 --> SHARD1P
MONGOS1 --> SHARD2P
MONGOS1 --> SHARD3P
SHARD1P --> SHARD1S1
SHARD1P --> SHARD1S2
SHARD2P --> SHARD2S1
SHARD2P --> SHARD2S2
SHARD3P --> SHARD3S1
SHARD3P --> SHARD3S2
分片集群部署脚本
#!/bin/bash
# scripts/deploy_mongodb_sharded_cluster.sh
set -euo pipefail
# 配置参数
MONGODB_VERSION="6.0"
MONGODB_USER="admin"
MONGODB_PASSWORD="your_secure_password"
KEYFILE_CONTENT="your_keyfile_content_here"
# Config Servers配置
CONFIG_REPLICA_SET="configReplSet"
declare -A CONFIG_SERVERS=(
["config1"]="192.168.1.20:27019"
["config2"]="192.168.1.21:27019"
["config3"]="192.168.1.22:27019"
)
# Shard配置
declare -A SHARD1_SERVERS=(
["shard1-primary"]="192.168.1.30:27018"
["shard1-secondary1"]="192.168.1.31:27018"
["shard1-secondary2"]="192.168.1.32:27018"
)
declare -A SHARD2_SERVERS=(
["shard2-primary"]="192.168.1.40:27018"
["shard2-secondary1"]="192.168.1.41:27018"
["shard2-secondary2"]="192.168.1.42:27018"
)
# mongos配置
declare -A MONGOS_SERVERS=(
["mongos1"]="192.168.1.50:27017"
["mongos2"]="192.168.1.51:27017"
["mongos3"]="192.168.1.52:27017"
)
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >&2
}
# 创建Config Server配置文件
create_config_server_config() {
local node_name="$1"
local config_file="/tmp/mongod-${node_name}.conf"
cat > "${config_file}" << EOF
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongod-${node_name}.log
logRotate: reopen
storage:
dbPath: /var/lib/mongodb/${node_name}
journal:
enabled: true
wiredTiger:
engineConfig:
cacheSizeGB: 2
journalCompressor: snappy
processManagement:
fork: true
pidFilePath: /var/run/mongodb/mongod-${node_name}.pid
net:
port: 27019
bindIp: 0.0.0.0
security:
authorization: enabled
keyFile: /etc/mongodb/mongodb-keyfile
replication:
replSetName: "${CONFIG_REPLICA_SET}"
sharding:
clusterRole: configsvr
EOF
echo "${config_file}"
}
# 创建Shard配置文件
create_shard_config() {
local node_name="$1"
local shard_name="$2"
local config_file="/tmp/mongod-${node_name}.conf"
cat > "${config_file}" << EOF
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongod-${node_name}.log
logRotate: reopen
storage:
dbPath: /var/lib/mongodb/${node_name}
journal:
enabled: true
wiredTiger:
engineConfig:
cacheSizeGB: 4
journalCompressor: snappy
directoryForIndexes: true
collectionConfig:
blockCompressor: snappy
processManagement:
fork: true
pidFilePath: /var/run/mongodb/mongod-${node_name}.pid
net:
port: 27018
bindIp: 0.0.0.0
security:
authorization: enabled
keyFile: /etc/mongodb/mongodb-keyfile
replication:
replSetName: "${shard_name}"
sharding:
clusterRole: shardsvr
EOF
echo "${config_file}"
}
# 创建mongos配置文件
create_mongos_config() {
local node_name="$1"
local config_file="/tmp/mongos-${node_name}.conf"
# 构建config servers连接字符串
local config_servers_str="${CONFIG_REPLICA_SET}/"
local first=true
for server in "${CONFIG_SERVERS[@]}"; do
if [[ "${first}" == "true" ]]; then
config_servers_str="${config_servers_str}${server}"
first=false
else
config_servers_str="${config_servers_str},${server}"
fi
done
cat > "${config_file}" << EOF
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongos-${node_name}.log
logRotate: reopen
processManagement:
fork: true
pidFilePath: /var/run/mongodb/mongos-${node_name}.pid
net:
port: 27017
bindIp: 0.0.0.0
security:
keyFile: /etc/mongodb/mongodb-keyfile
sharding:
configDB: ${config_servers_str}
EOF
echo "${config_file}"
}
# 部署Config Servers
deploy_config_servers() {
log "部署Config Servers..."
# 部署每个config server
for node_name in "${!CONFIG_SERVERS[@]}"; do
local server_addr="${CONFIG_SERVERS[${node_name}]}"
local server_ip
server_ip=$(echo "${server_addr}" | cut -d: -f1)
log "部署Config Server: ${node_name} (${server_ip})"
# 创建配置文件
local config_file
config_file=$(create_config_server_config "${node_name}")
# 创建keyfile
local keyfile="/tmp/mongodb-keyfile"
echo "${KEYFILE_CONTENT}" > "${keyfile}"
chmod 600 "${keyfile}"
# 复制文件到目标节点
scp "${config_file}" "root@${server_ip}:/etc/mongod.conf"
scp "${keyfile}" "root@${server_ip}:/etc/mongodb/mongodb-keyfile"
# 创建目录和设置权限
ssh "root@${server_ip}" "mkdir -p /var/lib/mongodb/${node_name} /var/log/mongodb /var/run/mongodb /etc/mongodb"
ssh "root@${server_ip}" "chmod 600 /etc/mongodb/mongodb-keyfile"
ssh "root@${server_ip}" "chown -R mongod:mongod /var/lib/mongodb /var/log/mongodb /var/run/mongodb /etc/mongodb"
# 启动MongoDB
ssh "root@${server_ip}" "systemctl start mongod"
ssh "root@${server_ip}" "systemctl enable mongod"
# 清理临时文件
rm -f "${config_file}" "${keyfile}"
done
# 等待所有config server启动
sleep 10
# 初始化config server副本集
local first_config_server
first_config_server=$(echo "${CONFIG_SERVERS[config1]}" | cut -d: -f1)
local rs_config=$(cat << EOF
rs.initiate({
_id: "${CONFIG_REPLICA_SET}",
configsvr: true,
members: [
{ _id: 0, host: "${CONFIG_SERVERS[config1]}" },
{ _id: 1, host: "${CONFIG_SERVERS[config2]}" },
{ _id: 2, host: "${CONFIG_SERVERS[config3]}" }
]
})
EOF
)
mongo --host "${first_config_server}:27019" --eval "${rs_config}"
# 等待副本集初始化完成
sleep 30
# 创建管理员用户
local create_user_script=$(cat << EOF
use admin
db.createUser({
user: "${MONGODB_USER}",
pwd: "${MONGODB_PASSWORD}",
roles: [
{ role: "root", db: "admin" }
]
})
EOF
)
mongo --host "${first_config_server}:27019" --eval "${create_user_script}"
log "Config Servers部署完成"
}
# 部署Shard
deploy_shard() {
local shard_name="$1"
local -n shard_servers=$2
log "部署Shard: ${shard_name}..."
# 部署每个shard节点
for node_name in "${!shard_servers[@]}"; do
local server_addr="${shard_servers[${node_name}]}"
local server_ip
server_ip=$(echo "${server_addr}" | cut -d: -f1)
log "部署Shard节点: ${node_name} (${server_ip})"
# 创建配置文件
local config_file
config_file=$(create_shard_config "${node_name}" "${shard_name}")
# 创建keyfile
local keyfile="/tmp/mongodb-keyfile"
echo "${KEYFILE_CONTENT}" > "${keyfile}"
chmod 600 "${keyfile}"
# 复制文件到目标节点
scp "${config_file}" "root@${server_ip}:/etc/mongod.conf"
scp "${keyfile}" "root@${server_ip}:/etc/mongodb/mongodb-keyfile"
# 创建目录和设置权限
ssh "root@${server_ip}" "mkdir -p /var/lib/mongodb/${node_name} /var/log/mongodb /var/run/mongodb /etc/mongodb"
ssh "root@${server_ip}" "chmod 600 /etc/mongodb/mongodb-keyfile"
ssh "root@${server_ip}" "chown -R mongod:mongod /var/lib/mongodb /var/log/mongodb /var/run/mongodb /etc/mongodb"
# 启动MongoDB
ssh "root@${server_ip}" "systemctl start mongod"
ssh "root@${server_ip}" "systemctl enable mongod"
# 清理临时文件
rm -f "${config_file}" "${keyfile}"
done
# 等待所有shard节点启动
sleep 10
# 初始化shard副本集
local first_shard_server=""
for node_name in "${!shard_servers[@]}"; do
if [[ "${node_name}" == *"primary"* ]]; then
first_shard_server=$(echo "${shard_servers[${node_name}]}" | cut -d: -f1)
break
fi
done
if [[ -z "${first_shard_server}" ]]; then
first_shard_server=$(echo "${shard_servers[$(echo "${!shard_servers[@]}" | cut -d' ' -f1)]}" | cut -d: -f1)
fi
# 构建副本集成员配置
local members_config=""
local member_id=0
for node_name in "${!shard_servers[@]}"; do
local priority=1
if [[ "${node_name}" == *"primary"* ]]; then
priority=2
fi
if [[ -n "${members_config}" ]]; then
members_config="${members_config},"
fi
members_config="${members_config}{ _id: ${member_id}, host: \"${shard_servers[${node_name}]}\", priority: ${priority} }"
member_id=$((member_id + 1))
done
local rs_config=$(cat << EOF
rs.initiate({
_id: "${shard_name}",
members: [${members_config}]
})
EOF
)
mongo --host "${first_shard_server}:27018" --eval "${rs_config}"
# 等待副本集初始化完成
sleep 30
log "Shard ${shard_name} 部署完成"
}
# 部署mongos
deploy_mongos() {
log "部署mongos..."
for node_name in "${!MONGOS_SERVERS[@]}"; do
local server_addr="${MONGOS_SERVERS[${node_name}]}"
local server_ip
server_ip=$(echo "${server_addr}" | cut -d: -f1)
log "部署mongos: ${node_name} (${server_ip})"
# 创建配置文件
local config_file
config_file=$(create_mongos_config "${node_name}")
# 创建keyfile
local keyfile="/tmp/mongodb-keyfile"
echo "${KEYFILE_CONTENT}" > "${keyfile}"
chmod 600 "${keyfile}"
# 复制文件到目标节点
scp "${config_file}" "root@${server_ip}:/etc/mongos.conf"
scp "${keyfile}" "root@${server_ip}:/etc/mongodb/mongodb-keyfile"
# 创建目录和设置权限
ssh "root@${server_ip}" "mkdir -p /var/log/mongodb /var/run/mongodb /etc/mongodb"
ssh "root@${server_ip}" "chmod 600 /etc/mongodb/mongodb-keyfile"
ssh "root@${server_ip}" "chown -R mongod:mongod /var/log/mongodb /var/run/mongodb /etc/mongodb"
# 启动mongos
ssh "root@${server_ip}" "mongos --config /etc/mongos.conf"
# 清理临时文件
rm -f "${config_file}" "${keyfile}"
done
log "mongos部署完成"
}
# 配置分片集群
configure_sharded_cluster() {
log "配置分片集群..."
# 连接到第一个mongos
local first_mongos
first_mongos=$(echo "${MONGOS_SERVERS[mongos1]}" | cut -d: -f1)
# 等待mongos启动完成
sleep 10
# 添加分片
local add_shard1_cmd="sh.addShard(\"shard1rs/${SHARD1_SERVERS[shard1-primary]},${SHARD1_SERVERS[shard1-secondary1]},${SHARD1_SERVERS[shard1-secondary2]}\")"
local add_shard2_cmd="sh.addShard(\"shard2rs/${SHARD2_SERVERS[shard2-primary]},${SHARD2_SERVERS[shard2-secondary1]},${SHARD2_SERVERS[shard2-secondary2]}\")"
mongo --host "${first_mongos}:27017" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "${add_shard1_cmd}"
mongo --host "${first_mongos}:27017" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "${add_shard2_cmd}"
# 验证分片状态
mongo --host "${first_mongos}:27017" -u "${MONGODB_USER}" -p "${MONGODB_PASSWORD}" --authenticationDatabase admin --eval "sh.status()"
log "分片集群配置完成"
}
# 主函数
main() {
log "开始部署MongoDB分片集群..."
# 部署Config Servers
deploy_config_servers
# 部署Shards
deploy_shard "shard1rs" SHARD1_SERVERS
deploy_shard "shard2rs" SHARD2_SERVERS
# 部署mongos
deploy_mongos
# 配置分片集群
configure_sharded_cluster
log "MongoDB分片集群部署完成!"
}
# 执行主函数
main "$@"