MySQL分库分表实践:从理论到落地的全面指南
随着业务数据量的爆发式增长,单库单表的MySQL架构往往无法满足高并发、大数据量的业务需求。分库分表作为解决数据库容量和性能瓶颈的关键技术,已成为大型互联网应用的标配。本文将从理论到实践,全面介绍MySQL分库分表的实施方案和最佳实践。
分库分表基础理论
为什么需要分库分表
在讨论具体实现方案前,我们需要明确分库分表的驱动因素:
| 驱动因素 | 表现 | 影响 | 解决方案 |
|---|---|---|---|
| 数据量过大 | 单表数据超过千万级 | 查询性能下降,索引效率降低 | 水平分表 |
| 写入压力大 | 高并发写入导致锁竞争 | 写入延迟增加,影响用户体验 | 水平分库 |
| 读取压力大 | 查询响应时间增加 | 系统整体性能下降 | 读写分离+分库 |
| 单机资源瓶颈 | CPU/内存/IO资源不足 | 数据库服务不稳定 | 分库+硬件扩容 |
分库分表的基本概念
graph TB
subgraph "分库分表模式"
A[原始数据库]
subgraph "垂直分库"
B1[用户库]
B2[订单库]
B3[商品库]
end
subgraph "垂直分表"
C1[用户基本信息表]
C2[用户详细信息表]
C3[用户行为表]
end
subgraph "水平分库"
D1[库0]
D2[库1]
D3[库n]
end
subgraph "水平分表"
E1[表0]
E2[表1]
E3[表n]
end
A --> B1
A --> B2
A --> B3
A --> C1
A --> C2
A --> C3
A --> D1
A --> D2
A --> D3
A --> E1
A --> E2
A --> E3
end
垂直拆分
- 垂直分库:按业务领域将不同表拆分到不同的数据库中
- 垂直分表:将一个表按字段拆分成多个表,每个表存储部分字段
水平拆分
- 水平分库:将同一个表的数据按照某个维度分散到不同的数据库中
- 水平分表:将同一个表的数据按照某个维度分散到同一个数据库的多个表中
分片策略设计
分片键选择
分片键的选择直接影响分库分表的效率和均衡性,常见的分片键选择策略:
| 分片键类型 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 自增ID | 分布均匀,易于扩展 | 不利于按ID查询 | 无特定查询模式的场景 |
| 用户ID | 同用户数据聚合,减少跨库查询 | 可能导致数据倾斜 | 用户中心、社交应用 |
| 订单号 | 订单相关数据聚合 | 可能热点集中 | 订单系统 |
| 时间戳 | 便于历史数据归档 | 新数据热点问题 | 日志系统、时间序列数据 |
| 地理位置 | 提高本地化查询效率 | 区域分布不均 | LBS应用 |
| 复合分片键 | 更精细的数据分布控制 | 增加路由复杂度 | 复杂查询场景 |
分片算法实现
哈希分片
/**
* 简单哈希分片算法
* @param shardingValue 分片键值
* @param shardingCount 分片数量
* @return 分片索引
*/
public int hashSharding(String shardingValue, int shardingCount) {
// 使用一致性哈希算法
int hashCode = shardingValue.hashCode();
// 确保非负数
if (hashCode < 0) {
hashCode = Math.abs(hashCode);
}
return hashCode % shardingCount;
}
范围分片
/**
* 范围分片算法
* @param shardingValue 分片键值(假设为整型)
* @param rangeMap 范围映射表
* @return 分片索引
*/
public int rangeSharding(long shardingValue, Map<Range, Integer> rangeMap) {
for (Map.Entry<Range, Integer> entry : rangeMap.entrySet()) {
Range range = entry.getKey();
if (range.contains(shardingValue)) {
return entry.getValue();
}
}
throw new IllegalArgumentException("No matching range found for value: " + shardingValue);
}
// 范围定义
class Range {
private long start;
private long end;
public Range(long start, long end) {
this.start = start;
this.end = end;
}
public boolean contains(long value) {
return value >= start && value <= end;
}
}
时间分片
/**
* 按月份分片算法
* @param dateStr 日期字符串,格式为yyyy-MM-dd
* @param shardingCount 分片数量
* @return 分片索引
*/
public int timeSharding(String dateStr, int shardingCount) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date date = sdf.parse(dateStr);
Calendar cal = Calendar.getInstance();
cal.setTime(date);
// 获取年和月
int year = cal.get(Calendar.YEAR);
int month = cal.get(Calendar.MONTH) + 1;
// 计算月份索引,例如2023年1月为2023*12+1=24277
int monthIndex = year * 12 + month;
return monthIndex % shardingCount;
}
分库分表中间件选型
主流中间件对比
| 中间件 | 开发语言 | 特点 | 适用场景 | 社区活跃度 |
|---|---|---|---|---|
| ShardingSphere | Java | 功能全面,支持多种数据库,生态完善 | 企业级应用,复杂查询场景 | 高 |
| MyCat | Java | 轻量级,性能好,配置简单 | 中小型应用,简单查询场景 | 中 |
| Vitess | Go | 高性能,支持大规模部署 | 超大规模应用,云原生环境 | 高 |
| TDDL | Java | 阿里内部使用,功能丰富 | 阿里系应用 | 低 |
| Proxy SQL | C/C++ | 轻量级,专注于MySQL | 简单分片场景 | 中 |
ShardingSphere-JDBC配置示例
# application.yml
spring:
shardingsphere:
datasource:
names: ds0,ds1,ds2,ds3
ds0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://192.168.0.1:3306/order_db_0
username: root
password: password
ds1:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://192.168.0.2:3306/order_db_1
username: root
password: password
ds2:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://192.168.0.3:3306/order_db_2
username: root
password: password
ds3:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://192.168.0.4:3306/order_db_3
username: root
password: password
sharding:
default-database-strategy:
standard:
sharding-column: user_id
precise-algorithm-class-name: com.example.algorithm.DatabaseShardingAlgorithm
tables:
t_order:
actual-data-nodes: ds${0..3}.t_order_${0..7}
database-strategy:
standard:
sharding-column: user_id
precise-algorithm-class-name: com.example.algorithm.DatabaseShardingAlgorithm
table-strategy:
standard:
sharding-column: order_id
precise-algorithm-class-name: com.example.algorithm.TableShardingAlgorithm
key-generator:
column: order_id
type: SNOWFLAKE
props:
worker.id: 123
t_order_item:
actual-data-nodes: ds${0..3}.t_order_item_${0..7}
database-strategy:
standard:
sharding-column: user_id
precise-algorithm-class-name: com.example.algorithm.DatabaseShardingAlgorithm
table-strategy:
standard:
sharding-column: order_id
precise-algorithm-class-name: com.example.algorithm.TableShardingAlgorithm
key-generator:
column: item_id
type: SNOWFLAKE
props:
worker.id: 123
binding-tables:
- t_order,t_order_item
broadcast-tables:
- t_config
props:
sql.show: true
自定义分片算法实现
package com.example.algorithm;
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingValue;
import java.util.Collection;
/**
* 数据库分片算法
*/
public class DatabaseShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
@Override
public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
Long userId = shardingValue.getValue();
// 对用户ID取模,确定分库索引
long dbIndex = userId % availableTargetNames.size();
for (String each : availableTargetNames) {
if (each.endsWith(String.valueOf(dbIndex))) {
return each;
}
}
throw new UnsupportedOperationException("找不到匹配的数据源");
}
}
/**
* 表分片算法
*/
public class TableShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
@Override
public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
Long orderId = shardingValue.getValue();
// 对订单ID取模,确定分表索引
long tableIndex = orderId % 8;
for (String each : availableTargetNames) {
if (each.endsWith(String.valueOf(tableIndex))) {
return each;
}
}
throw new UnsupportedOperationException("找不到匹配的数据表");
}
}
MyCat配置示例
<!-- server.xml -->
<server>
<system>
<property name="serverPort">8066</property>
<property name="managerPort">9066</property>
<property name="nonePasswordLogin">0</property>
<property name="useHandshakeV10">1</property>
<property name="useSqlStat">1</property>
<property name="useGlobleTableCheck">0</property>
<property name="sequnceHandlerType">2</property>
<property name="processorBufferPoolType">0</property>
<property name="handleDistributedTransactions">0</property>
<property name="useOffHeapForMerge">1</property>
<property name="memoryPageSize">1m</property>
<property name="spillsFileBufferSize">1k</property>
<property name="useStreamOutput">0</property>
<property name="systemReserveMemorySize">384m</property>
</system>
<user name="root">
<property name="password">123456</property>
<property name="schemas">TESTDB</property>
</user>
<user name="user">
<property name="password">user</property>
<property name="schemas">TESTDB</property>
<property name="readOnly">true</property>
</user>
</server>
<!-- schema.xml -->
<schema name="TESTDB" checkSQLschema="false" sqlMaxLimit="100">
<!-- 订单表,按用户ID分库,按订单ID分表 -->
<table name="t_order" primaryKey="order_id" dataNode="dn$0-3.t_order_$0-7" rule="sharding-by-user-order"/>
<!-- 订单明细表,与订单表绑定,保持分片一致性 -->
<table name="t_order_item" primaryKey="item_id" dataNode="dn$0-3.t_order_item_$0-7" rule="sharding-by-user-order"/>
<!-- 配置表,广播到所有节点 -->
<table name="t_config" primaryKey="id" type="global" dataNode="dn$0-3.t_config"/>
</schema>
<!-- 定义数据节点 -->
<dataNode name="dn0" dataHost="host1" database="db0"/>
<dataNode name="dn1" dataHost="host1" database="db1"/>
<dataNode name="dn2" dataHost="host2" database="db2"/>
<dataNode name="dn3" dataHost="host2" database="db3"/>
<!-- 定义数据主机 -->
<dataHost name="host1" maxCon="1000" minCon="10" balance="3" writeType="0" dbType="mysql" dbDriver="native">
<heartbeat>select user()</heartbeat>
<writeHost host="hostM1" url="192.168.0.1:3306" user="root" password="password">
<readHost host="hostS1" url="192.168.0.2:3306" user="root" password="password"/>
</writeHost>
</dataHost>
<dataHost name="host2" maxCon="1000" minCon="10" balance="3" writeType="0" dbType="mysql" dbDriver="native">
<heartbeat>select user()</heartbeat>
<writeHost host="hostM2" url="192.168.0.3:3306" user="root" password="password">
<readHost host="hostS2" url="192.168.0.4:3306" user="root" password="password"/>
</writeHost>
</dataHost>
<!-- rule.xml -->
<tableRule name="sharding-by-user-order">
<rule>
<columns>user_id,order_id</columns>
<algorithm>hash-mod</algorithm>
</rule>
</tableRule>
<function name="hash-mod" class="io.mycat.route.function.PartitionByMod">
<property name="count">4</property> <!-- 分库数量 -->
<property name="tableCount">8</property> <!-- 每个库中的分表数量 -->
</function>
分库分表数据迁移方案
全量数据迁移
使用Mydumper和Myloader工具
#!/bin/bash
# 全量数据导出脚本
# 配置参数
SOURCE_HOST="192.168.0.1"
SOURCE_USER="root"
SOURCE_PASS="password"
SOURCE_DB="original_db"
BACKUP_DIR="/data/mysql_backup/$(date +%Y%m%d)"
THREADS=8
# 创建备份目录
mkdir -p ${BACKUP_DIR}
# 使用Mydumper导出数据
mydumper \
--host=${SOURCE_HOST} \
--user=${SOURCE_USER} \
--password=${SOURCE_PASS} \
--database=${SOURCE_DB} \
--outputdir=${BACKUP_DIR} \
--threads=${THREADS} \
--compress \
--build-empty-files \
--verbose=3
# 检查导出结果
if [ $? -eq 0 ]; then
echo "数据导出成功,备份目录: ${BACKUP_DIR}"
else
echo "数据导出失败!"
exit 1
fi
# 数据导入脚本
for i in {0..3}; do
for j in {0..7}; do
TARGET_HOST="192.168.0.$((i+1))"
TARGET_DB="order_db_${i}"
TARGET_TABLE="t_order_${j}"
echo "正在导入数据到 ${TARGET_HOST}:${TARGET_DB}.${TARGET_TABLE}"
# 使用Myloader导入数据
myloader \
--host=${TARGET_HOST} \
--user=${SOURCE_USER} \
--password=${SOURCE_PASS} \
--database=${TARGET_DB} \
--directory=${BACKUP_DIR} \
--threads=${THREADS} \
--overwrite-tables \
--verbose=3
if [ $? -eq 0 ]; then
echo "导入到 ${TARGET_HOST}:${TARGET_DB}.${TARGET_TABLE} 成功"
else
echo "导入到 ${TARGET_HOST}:${TARGET_DB}.${TARGET_TABLE} 失败!"
exit 1
fi
done
done
echo "全量数据迁移完成!"
增量数据同步
使用Canal实现增量同步
// Canal客户端示例代码
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
public class CanalClient {
public static void main(String[] args) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example",
"canal",
"canal");
try {
// 连接Canal服务器
connector.connect();
// 订阅数据库表,格式为 {database}.{table}
connector.subscribe("original_db.t_order");
// 回滚到未进行ack的地方
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(1000);
long batchId = message.getId();
if (batchId == -1 || message.getEntries().isEmpty()) {
// 没有数据,休眠一段时间
Thread.sleep(1000);
continue;
}
// 处理数据
processEntries(message.getEntries());
// 提交确认
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭连接
connector.disconnect();
}
}
private static void processEntries(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() != EntryType.ROWDATA) {
continue;
}
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
EventType eventType = rowChange.getEventType();
System.out.println("================> binlog[" + entry.getHeader().getLogfileName() + ":"
+ entry.getHeader().getLogfileOffset() + "], name[" + tableName + "], eventType: " + eventType);
for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.DELETE) {
// 处理删除数据
processDeleteData(tableName, rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
// 处理插入数据
processInsertData(tableName, rowData.getAfterColumnsList());
} else if (eventType == EventType.UPDATE) {
// 处理更新数据
processUpdateData(tableName, rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static void processDeleteData(String tableName, List<Column> columns) {
// 根据分片规则,确定目标库表,执行删除操作
System.out.println("DELETE from " + tableName);
printColumns(columns);
}
private static void processInsertData(String tableName, List<Column> columns) {
// 根据分片规则,确定目标库表,执行插入操作
System.out.println("INSERT into " + tableName);
printColumns(columns);
}
private static void processUpdateData(String tableName, List<Column> beforeColumns, List<Column> afterColumns) {
// 根据分片规则,确定目标库表,执行更新操作
System.out.println("UPDATE " + tableName);
System.out.println("Before:");
printColumns(beforeColumns);
System.out.println("After:");
printColumns(afterColumns);
}
private static void printColumns(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
双写方案实现
/**
* 数据库双写服务
*/
@Service
public class DualWriteService {
@Autowired
private JdbcTemplate oldJdbcTemplate;
@Autowired
private JdbcTemplate newJdbcTemplate;
@Autowired
private ShardingService shardingService;
/**
* 双写订单数据
* @param order 订单对象
* @return 是否成功
*/
@Transactional(rollbackFor = Exception.class)
public boolean saveOrder(Order order) {
try {
// 写入旧库
saveToOldDatabase(order);
// 写入新库
saveToNewDatabase(order);
return true;
} catch (Exception e) {
log.error("双写订单数据失败", e);
throw e;
}
}
private void saveToOldDatabase(Order order) {
String sql = "INSERT INTO t_order (order_id, user_id, status, create_time) VALUES (?, ?, ?, ?)";
oldJdbcTemplate.update(sql,
order.getOrderId(),
order.getUserId(),
order.getStatus(),
order.getCreateTime());
}
private void saveToNewDatabase(Order order) {
// 计算分片位置
ShardingResult shardingResult = shardingService.calculateSharding(order.getUserId(), order.getOrderId());
// 构建实际的SQL
String dbIndex = shardingResult.getDbIndex();
String tableIndex = shardingResult.getTableIndex();
String actualTable = "t_order_" + tableIndex;
// 获取对应分片的JdbcTemplate
JdbcTemplate shardJdbcTemplate = getShardingJdbcTemplate(dbIndex);
String sql = "INSERT INTO " + actualTable + " (order_id, user_id, status, create_time) VALUES (?, ?, ?, ?)";
shardJdbcTemplate.update(sql,
order.getOrderId(),
order.getUserId(),
order.getStatus(),
order.getCreateTime());
}
private JdbcTemplate getShardingJdbcTemplate(String dbIndex) {
// 根据dbIndex获取对应的数据源JdbcTemplate
// 实际实现可能是从数据源池中获取
return newJdbcTemplate;
}
/**
* 分片结果
*/
@Data
public static class ShardingResult {
private String dbIndex;
private String tableIndex;
}
}
分库分表扩容方案
一致性哈希算法实现
/**
* 一致性哈希环实现
*/
public class ConsistentHash<T> {
// 虚拟节点数量
private final int numberOfReplicas;
// 哈希环
private final SortedMap<Integer, T> circle = new TreeMap<>();
/**
* 构造函数
* @param numberOfReplicas 虚拟节点数量
* @param nodes 物理节点列表
*/
public ConsistentHash(int numberOfReplicas, Collection<T> nodes) {
this.numberOfReplicas = numberOfReplicas;
for (T node : nodes) {
add(node);
}
}
/**
* 添加节点
* @param node 物理节点
*/
public void add(T node) {
for (int i = 0; i < numberOfReplicas; i++) {
// 对于每个物理节点,创建多个虚拟节点
String nodeKey = node.toString() + "-" + i;
int hash = getHash(nodeKey);
circle.put(hash, node);
}
}
/**
* 移除节点
* @param node 物理节点
*/
public void remove(T node) {
for (int i = 0; i < numberOfReplicas; i++) {
String nodeKey = node.toString() + "-" + i;
int hash = getHash(nodeKey);
circle.remove(hash);
}
}
/**
* 获取对象所在的节点
* @param key 对象键
* @return 节点
*/
public T get(Object key) {
if (circle.isEmpty()) {
return null;
}
int hash = getHash(key.toString());
// 如果环中没有大于等于该hash值的节点,则返回环的第一个节点
if (!circle.containsKey(hash)) {
SortedMap<Integer, T> tailMap = circle.tailMap(hash);
hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
}
return circle.get(hash);
}
/**
* 计算哈希值
* @param key 键
* @return 哈希值
*/
private int getHash(String key) {
// 使用FNV1_32_HASH算法计算哈希值
final int p = 16777619;
int hash = (int) 2166136261L;
for (int i = 0; i < key.length(); i++) {
hash = (hash ^ key.charAt(i)) * p;
}
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
// 确保hash为正数
if (hash < 0) {
hash = Math.abs(hash);
}
return hash;
}
}
数据再平衡工具
/**
* 数据再平衡工具
*/
public class DataRebalancer {
private final JdbcTemplate sourceJdbcTemplate;
private final JdbcTemplate targetJdbcTemplate;
private final int batchSize;
public DataRebalancer(JdbcTemplate sourceJdbcTemplate, JdbcTemplate targetJdbcTemplate, int batchSize) {
this.sourceJdbcTemplate = sourceJdbcTemplate;
this.targetJdbcTemplate = targetJdbcTemplate;
this.batchSize = batchSize;
}
/**
* 执行数据迁移
* @param sourceTable 源表
* @param targetTable 目标表
* @param shardingKey 分片键
* @param startValue 起始值
* @param endValue 结束值
* @return 迁移的记录数
*/
public int migrateData(String sourceTable, String targetTable, String shardingKey,
Object startValue, Object endValue) {
int totalCount = 0;
int offset = 0;
while (true) {
// 分批查询数据
String querySql = String.format(
"SELECT * FROM %s WHERE %s >= ? AND %s < ? ORDER BY %s LIMIT %d OFFSET %d",
sourceTable, shardingKey, shardingKey, shardingKey, batchSize, offset);
List<Map<String, Object>> records = sourceJdbcTemplate.queryForList(
querySql, startValue, endValue);
if (records.isEmpty()) {
break;
}
// 批量插入目标表
batchInsert(targetTable, records);
totalCount += records.size();
offset += batchSize;
System.out.printf("已迁移 %d 条记录\n", totalCount);
}
return totalCount;
}
/**
* 批量插入数据
* @param targetTable 目标表
* @param records 记录列表
*/
private void batchInsert(String targetTable, List<Map<String, Object>> records) {
if (records.isEmpty()) {
return;
}
// 构建插入SQL
Map<String, Object> firstRecord = records.get(0);
StringBuilder columns = new StringBuilder();
StringBuilder placeholders = new StringBuilder();
for (String column : firstRecord.keySet()) {
if (columns.length() > 0) {
columns.append(", ");
placeholders.append(", ");
}
columns.append(column);
placeholders.append("?");
}
String insertSql = String.format(
"INSERT INTO %s (%s) VALUES (%s)",
targetTable, columns.toString(), placeholders.toString());
// 批量执行插入
targetJdbcTemplate.batchUpdate(insertSql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
Map<String, Object> record = records.get(i);
int index = 1;
for (String column : firstRecord.keySet()) {
ps.setObject(index++, record.get(column));
}
}
@Override
public int getBatchSize() {
return records.size();
}
});
}
}
扩容流程自动化脚本
#!/bin/bash
# 分库分表扩容自动化脚本
set -e
# 配置参数
SOURCE_HOST="192.168.0.1"
SOURCE_USER="root"
SOURCE_PASS="password"
SOURCE_DB="order_db_0"
TARGET_HOST="192.168.0.5"
TARGET_USER="root"
TARGET_PASS="password"
TARGET_DB="order_db_4"
TABLE_PREFIX="t_order_"
SHARD_KEY="user_id"
BATCH_SIZE=1000
LOG_FILE="rebalance_$(date +%Y%m%d%H%M%S).log"
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a ${LOG_FILE}
}
# 检查目标数据库是否存在
check_target_db() {
log "检查目标数据库 ${TARGET_DB} 是否存在..."
DB_EXISTS=$(mysql -h${TARGET_HOST} -u${TARGET_USER} -p${TARGET_PASS} -e "SHOW DATABASES LIKE '${TARGET_DB}';" | grep -c ${TARGET_DB} || true)
if [ ${DB_EXISTS} -eq 0 ]; then
log "创建目标数据库 ${TARGET_DB}..."
mysql -h${TARGET_HOST} -u${TARGET_USER} -p${TARGET_PASS} -e "CREATE DATABASE ${TARGET_DB} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;"
else
log "目标数据库 ${TARGET_DB} 已存在"
fi
}
# 创建目标表结构
create_target_tables() {
log "创建目标表结构..."
for i in {0..7}; do
SOURCE_TABLE="${TABLE_PREFIX}${i}"
TARGET_TABLE="${TABLE_PREFIX}${i}"
log "复制表结构: ${SOURCE_DB}.${SOURCE_TABLE} -> ${TARGET_DB}.${TARGET_TABLE}"
# 获取表结构
TABLE_DDL=$(mysqldump -h${SOURCE_HOST} -u${SOURCE_USER} -p${SOURCE_PASS} --no-data ${SOURCE_DB} ${SOURCE_TABLE} | grep -v "^--" | grep -v "^/\*")
# 创建目标表
echo "${TABLE_DDL}" | mysql -h${TARGET_HOST} -u${TARGET_USER} -p${TARGET_PASS} ${TARGET_DB}
done
}
# 迁移数据
migrate_data() {
log "开始数据迁移..."
# 计算需要迁移的用户ID范围
# 假设我们要将原来4个库扩容到5个库,那么需要迁移1/5的数据
# 对于user_id % 4 = 0的数据,迁移user_id % 5 = 4的部分
for i in {0..7}; do
SOURCE_TABLE="${TABLE_PREFIX}${i}"
TARGET_TABLE="${TABLE_PREFIX}${i}"
log "迁移数据: ${SOURCE_DB}.${SOURCE_TABLE} -> ${TARGET_DB}.${TARGET_TABLE}"
# 统计需要迁移的记录数
TOTAL_COUNT=$(mysql -h${SOURCE_HOST} -u${SOURCE_USER} -p${SOURCE_PASS} -e "SELECT COUNT(*) FROM ${SOURCE_DB}.${SOURCE_TABLE} WHERE ${SHARD_KEY} % 5 = 4;" | tail -1)
log "需要迁移的记录数: ${TOTAL_COUNT}"
# 分批迁移数据
OFFSET=0
MIGRATED=0
while true; do
# 查询一批数据
DATA_FILE="/tmp/migrate_data_${i}_${OFFSET}.sql"
mysql -h${SOURCE_HOST} -u${SOURCE_USER} -p${SOURCE_PASS} -e "
SELECT * FROM ${SOURCE_DB}.${SOURCE_TABLE}
WHERE ${SHARD_KEY} % 5 = 4
LIMIT ${BATCH_SIZE} OFFSET ${OFFSET}
" > ${DATA_FILE}
# 检查是否还有数据
RECORD_COUNT=$(cat ${DATA_FILE} | wc -l)
if [ ${RECORD_COUNT} -le 1 ]; then
rm -f ${DATA_FILE}
break
fi
# 转换为INSERT语句
INSERT_FILE="/tmp/migrate_insert_${i}_${OFFSET}.sql"
echo "INSERT INTO ${TARGET_DB}.${TARGET_TABLE} VALUES " > ${INSERT_FILE}
# 处理数据文件,转换为INSERT语句
tail -n +2 ${DATA_FILE} | sed 's/^/(/' | sed 's/$/),/' | sed '$s/,$/;/' >> ${INSERT_FILE}
# 导入数据到目标库
mysql -h${TARGET_HOST} -u${TARGET_USER} -p${TARGET_PASS} < ${INSERT_FILE}
# 更新进度
MIGRATED=$((MIGRATED + RECORD_COUNT - 1))
log "已迁移 ${MIGRATED}/${TOTAL_COUNT} 条记录"
# 清理临时文件
rm -f ${DATA_FILE} ${INSERT_FILE}
# 更新偏移量
OFFSET=$((OFFSET + BATCH_SIZE))
done
log "表 ${SOURCE_TABLE} 数据迁移完成"
done
}
# 验证数据一致性
validate_data() {
log "验证数据一致性..."
for i in {0..7}; do
SOURCE_TABLE="${TABLE_PREFIX}${i}"
TARGET_TABLE="${TABLE_PREFIX}${i}"
log "验证表 ${SOURCE_TABLE} 的数据一致性"
# 比较源表和目标表的记录数
SOURCE_COUNT=$(mysql -h${SOURCE_HOST} -u${SOURCE_USER} -p${SOURCE_PASS} -e "SELECT COUNT(*) FROM ${SOURCE_DB}.${SOURCE_TABLE} WHERE ${SHARD_KEY} % 5 = 4;" | tail -1)
TARGET_COUNT=$(mysql -h${TARGET_HOST} -u${TARGET_USER} -p${TARGET_PASS} -e "SELECT COUNT(*) FROM ${TARGET_DB}.${TARGET_TABLE};" | tail -1)
if [ "${SOURCE_COUNT}" != "${TARGET_COUNT}" ]; then
log "错误: 表 ${SOURCE_TABLE} 的记录数不一致! 源表: ${SOURCE_COUNT}, 目标表: ${TARGET_COUNT}"
exit 1
fi
log "表 ${SOURCE_TABLE} 的记录数一致: ${SOURCE_COUNT}"
# 抽样检查数据内容
log "抽样检查数据内容..."
# 随机选择10条记录进行比较
SAMPLE_IDS=$(mysql -h${SOURCE_HOST} -u${SOURCE_USER} -p${SOURCE_PASS} -e "
SELECT id FROM ${SOURCE_DB}.${SOURCE_TABLE}
WHERE ${SHARD_KEY} % 5 = 4
ORDER BY RAND() LIMIT 10;
" | tail -n +2)
for ID in ${SAMPLE_IDS}; do
SOURCE_DATA=$(mysql -h${SOURCE_HOST} -u${SOURCE_USER} -p${SOURCE_PASS} -e "
SELECT * FROM ${SOURCE_DB}.${SOURCE_TABLE} WHERE id = ${ID};
" | tail -1)
TARGET_DATA=$(mysql -h${TARGET_HOST} -u${TARGET_USER} -p${TARGET_PASS} -e "
SELECT * FROM ${TARGET_DB}.${TARGET_TABLE} WHERE id = ${ID};
" | tail -1)
if [ "${SOURCE_DATA}" != "${TARGET_DATA}" ]; then
log "错误: ID=${ID} 的数据不一致!"
log "源数据: ${SOURCE_DATA}"
log "目标数据: ${TARGET_DATA}"
exit 1
fi
done
log "表 ${SOURCE_TABLE} 的数据内容一致"
done
log "数据一致性验证通过"
}
# 更新应用配置
update_app_config() {
log "更新应用配置..."
# 这里需要根据实际应用的配置方式进行修改
# 例如,更新ShardingSphere的配置文件
log "应用配置已更新"
}
# 清理源库数据
cleanup_source_data() {
log "清理源库数据..."
# 确认迁移完成后,删除源库中已迁移的数据
for i in {0..7}; do
SOURCE_TABLE="${TABLE_PREFIX}${i}"
log "清理表 ${SOURCE_TABLE} 中已迁移的数据"
mysql -h${SOURCE_HOST} -u${SOURCE_USER} -p${SOURCE_PASS} -e "
DELETE FROM ${SOURCE_DB}.${SOURCE_TABLE} WHERE ${SHARD_KEY} % 5 = 4;
"
done
log "源库数据清理完成"
}
# 主函数
main() {
log "开始分库分表扩容流程..."
# 检查目标数据库
check_target_db
# 创建目标表结构
create_target_tables
# 迁移数据
migrate_data
# 验证数据一致性
validate_data
# 更新应用配置
update_app_config
# 清理源库数据
cleanup_source_data
log "分库分表扩容流程完成!"
}
# 执行主函数
main
分库分表常见问题及解决方案
跨库事务处理
基于XA协议的分布式事务
/**
* 基于XA协议的分布式事务管理器
*/
@Service
public class XATransactionManager {
@Autowired
private DataSource dataSource;
/**
* 执行分布式事务
* @param action 事务操作
* @return 执行结果
*/
public <T> T executeInXATransaction(Supplier<T> action) throws Exception {
UserTransaction userTransaction = null;
try {
// 获取JTA事务管理器
userTransaction = (UserTransaction) new InitialContext().lookup("java:comp/UserTransaction");
// 开始事务
userTransaction.begin();
// 执行业务逻辑
T result = action.get();
// 提交事务
userTransaction.commit();
return result;
} catch (Exception e) {
// 回滚事务
if (userTransaction != null) {
try {
userTransaction.rollback();
} catch (Exception ex) {
ex.printStackTrace();
}
}
throw e;
}
}
}
基于TCC模式的分布式事务
/**
* TCC事务接口
*/
public interface TCCService<T> {
/**
* Try阶段:资源预留
* @param params 业务参数
* @return 执行结果
*/
boolean try_(T params);
/**
* Confirm阶段:确认执行
* @param params 业务参数
* @return 执行结果
*/
boolean confirm(T params);
/**
* Cancel阶段:取消执行
* @param params 业务参数
* @return 执行结果
*/
boolean cancel(T params);
}
/**
* 订单TCC实现
*/
@Service
public class OrderTCCServiceImpl implements TCCService<Order> {
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public boolean try_(Order order) {
try {
// 检查库存
int stock = jdbcTemplate.queryForObject(
"SELECT stock FROM t_product WHERE product_id = ?",
Integer.class,
order.getProductId());
if (stock < order.getQuantity()) {
return false;
}
// 冻结库存
jdbcTemplate.update(
"UPDATE t_product SET frozen_stock = frozen_stock + ? WHERE product_id = ?",
order.getQuantity(),
order.getProductId());
// 创建订单,状态为PENDING
jdbcTemplate.update(
"INSERT INTO t_order (order_id, user_id, product_id, quantity, status) VALUES (?, ?, ?, ?, 'PENDING')",
order.getOrderId(),
order.getUserId(),
order.getProductId(),
order.getQuantity());
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
@Override
public boolean confirm(Order order) {
try {
// 扣减库存
jdbcTemplate.update(
"UPDATE t_product SET stock = stock - ?, frozen_stock = frozen_stock - ? WHERE product_id = ?",
order.getQuantity(),
order.getQuantity(),
order.getProductId());
// 更新订单状态为CONFIRMED
jdbcTemplate.update(
"UPDATE t_order SET status = 'CONFIRMED' WHERE order_id = ?",
order.getOrderId());
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
@Override
public boolean cancel(Order order) {
try {
// 解冻库存
jdbcTemplate.update(
"UPDATE t_product SET frozen_stock = frozen_stock - ? WHERE product_id = ?",
order.getQuantity(),
order.getProductId());
// 更新订单状态为CANCELED
jdbcTemplate.update(
"UPDATE t_order SET status = 'CANCELED' WHERE order_id = ?",
order.getOrderId());
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
}
分页查询优化
/**
* 分页查询优化器
*/
@Service
public class ShardingPaginationOptimizer {
@Autowired
private List<JdbcTemplate> shardingJdbcTemplates;
/**
* 执行分片分页查询
* @param sql SQL语句
* @param params SQL参数
* @param pageNum 页码
* @param pageSize 每页大小
* @return 分页结果
*/
public <T> Page<T> queryWithPagination(String sql, Object[] params, int pageNum, int pageSize,
RowMapper<T> rowMapper) {
// 计算总记录数
int totalCount = countTotalRecords(sql, params);
// 计算总页数
int totalPages = (totalCount + pageSize - 1) / pageSize;
// 查询当前页数据
List<T> records = queryForPage(sql, params, pageNum, pageSize, rowMapper);
// 构建分页结果
Page<T> page = new Page<>();
page.setPageNum(pageNum);
page.setPageSize(pageSize);
page.setTotalCount(totalCount);
page.setTotalPages(totalPages);
page.setRecords(records);
return page;
}
/**
* 统计总记录数
*/
private int countTotalRecords(String sql, Object[] params) {
// 构建COUNT查询
String countSql = "SELECT COUNT(*) FROM (" + sql + ") AS t";
// 在所有分片上执行查询并汇总结果
int totalCount = 0;
for (JdbcTemplate jdbcTemplate : shardingJdbcTemplates) {
Integer count = jdbcTemplate.queryForObject(countSql, params, Integer.class);
if (count != null) {
totalCount += count;
}
}
return totalCount;
}
/**
* 查询分页数据
*/
private <T> List<T> queryForPage(String sql, Object[] params, int pageNum, int pageSize,
RowMapper<T> rowMapper) {
// 计算偏移量
int offset = (pageNum - 1) * pageSize;
// 添加分页限制
String pageSql = sql + " LIMIT " + offset + ", " + pageSize;
// 在所有分片上执行查询
List<T> allRecords = new ArrayList<>();
for (JdbcTemplate jdbcTemplate : shardingJdbcTemplates) {
List<T> records = jdbcTemplate.query(pageSql, params, rowMapper);
allRecords.addAll(records);
}
// 对结果进行排序和截取
// 注意:这里假设SQL中已经包含了ORDER BY子句
// 如果没有,需要在这里进行排序
// 截取需要的记录
int fromIndex = 0;
int toIndex = Math.min(pageSize, allRecords.size());
return allRecords.subList(fromIndex, toIndex);
}
/**
* 分页结果
*/
@Data
public static class Page<T> {
private int pageNum;
private int pageSize;
private int totalCount;
private int totalPages;
private List<T> records;
}
}
全局ID生成方案
/**
* 雪花算法ID生成器
*/
@Service
public class SnowflakeIdGenerator {
// 起始时间戳,可以设置为项目开始时间
private final long startTimestamp = 1609430400000L; // 2021-01-01 00:00:00
// 机器ID所占位数
private final long workerIdBits = 5L;
// 数据中心ID所占位数
private final long dataCenterIdBits = 5L;
// 序列号所占位数
private final long sequenceBits = 12L;
// 支持的最大机器ID
private final long maxWorkerId = ~(-1L << workerIdBits);
// 支持的最大数据中心ID
private final long maxDataCenterId = ~(-1L << dataCenterIdBits);
// 机器ID向左移12位
private final long workerIdShift = sequenceBits;
// 数据中心ID向左移17位
private final long dataCenterIdShift = sequenceBits + workerIdBits;
// 时间戳向左移22位
private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
// 序列号掩码,用于限定序列号的最大值
private final long sequenceMask = ~(-1L << sequenceBits);
// 工作机器ID
private long workerId;
// 数据中心ID
private long dataCenterId;
// 毫秒内序列
private long sequence = 0L;
// 上次生成ID的时间戳
private long lastTimestamp = -1L;
/**
* 构造函数
* @param workerId 工作机器ID
* @param dataCenterId 数据中心ID
*/
public SnowflakeIdGenerator(long workerId, long dataCenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException("Worker ID can't be greater than " + maxWorkerId + " or less than 0");
}
if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
throw new IllegalArgumentException("DataCenter ID can't be greater than " + maxDataCenterId + " or less than 0");
}
this.workerId = workerId;
this.dataCenterId = dataCenterId;
}
/**
* 获取下一个ID
* @return ID
*/
public synchronized long nextId() {
long timestamp = timeGen();
// 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过,抛出异常
if (timestamp < lastTimestamp) {
throw new RuntimeException("Clock moved backwards. Refusing to generate id for " + (lastTimestamp - timestamp) + " milliseconds");
}
// 如果是同一时间生成的,则进行毫秒内序列
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
// 毫秒内序列溢出
if (sequence == 0) {
// 阻塞到下一个毫秒,获得新的时间戳
timestamp = tilNextMillis(lastTimestamp);
}
} else {
// 时间戳改变,毫秒内序列重置
sequence = 0L;
}
// 上次生成ID的时间戳
lastTimestamp = timestamp;
// 移位并通过或运算拼到一起组成64位的ID
return ((timestamp - startTimestamp) << timestampLeftShift) |
(dataCenterId << dataCenterIdShift) |
(workerId << workerIdShift) |
sequence;
}
/**
* 阻塞到下一个毫秒,直到获得新的时间戳
* @param lastTimestamp 上次生成ID的时间戳
* @return 新的时间戳
*/
private long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
/**
* 返回以毫秒为单位的当前时间
* @return 当前时间(毫秒)
*/
private long timeGen() {
return System.currentTimeMillis();
}
}
总结
MySQL分库分表是解决数据库容量和性能瓶颈的有效手段,但实施过程中需要考虑多个方面:
- 分片策略设计:选择合适的分片键和分片算法,确保数据分布均匀
- 中间件选型:根据业务需求和技术栈选择合适的分库分表中间件
- 数据迁移方案:制定完善的全量和增量数据迁移方案,确保数据一致性
- 扩容方案:设计灵活的扩容方案,支持业务持续增长
- 常见问题处理:解决跨库事务、分页查询、全局ID生成等常见问题
通过本文介绍的方案和最佳实践,可以帮助开发团队更好地实施MySQL分库分表,构建高性能、可扩展的数据库架构。