跳转到主要内容

MySQL分库分表实践:从理论到落地的全面指南

博主
17 分钟
3419 字
--

AI 导读

深刻理解和准确把握"MySQL分库分表实践:从理论到落地的全面指南"这一重要概念的核心要义,本文从理论基础、实践应用和发展前景等多个维度进行了系统性阐述,为读者提供了全面而深入的分析视角。

内容由AI智能生成

MySQL分库分表实践:从理论到落地的全面指南

随着业务数据量的爆发式增长,单库单表的MySQL架构往往无法满足高并发、大数据量的业务需求。分库分表作为解决数据库容量和性能瓶颈的关键技术,已成为大型互联网应用的标配。本文将从理论到实践,全面介绍MySQL分库分表的实施方案和最佳实践。

分库分表基础理论

为什么需要分库分表

在讨论具体实现方案前,我们需要明确分库分表的驱动因素:

驱动因素 表现 影响 解决方案
数据量过大 单表数据超过千万级 查询性能下降,索引效率降低 水平分表
写入压力大 高并发写入导致锁竞争 写入延迟增加,影响用户体验 水平分库
读取压力大 查询响应时间增加 系统整体性能下降 读写分离+分库
单机资源瓶颈 CPU/内存/IO资源不足 数据库服务不稳定 分库+硬件扩容

分库分表的基本概念

graph TB
    subgraph "分库分表模式"
        A[原始数据库]
        
        subgraph "垂直分库"
            B1[用户库]
            B2[订单库]
            B3[商品库]
        end
        
        subgraph "垂直分表"
            C1[用户基本信息表]
            C2[用户详细信息表]
            C3[用户行为表]
        end
        
        subgraph "水平分库"
            D1[库0]
            D2[库1]
            D3[库n]
        end
        
        subgraph "水平分表"
            E1[表0]
            E2[表1]
            E3[表n]
        end
        
        A --> B1
        A --> B2
        A --> B3
        
        A --> C1
        A --> C2
        A --> C3
        
        A --> D1
        A --> D2
        A --> D3
        
        A --> E1
        A --> E2
        A --> E3
    end

垂直拆分

  • 垂直分库:按业务领域将不同表拆分到不同的数据库中
  • 垂直分表:将一个表按字段拆分成多个表,每个表存储部分字段

水平拆分

  • 水平分库:将同一个表的数据按照某个维度分散到不同的数据库中
  • 水平分表:将同一个表的数据按照某个维度分散到同一个数据库的多个表中

分片策略设计

分片键选择

分片键的选择直接影响分库分表的效率和均衡性,常见的分片键选择策略:

分片键类型 优点 缺点 适用场景
自增ID 分布均匀,易于扩展 不利于按ID查询 无特定查询模式的场景
用户ID 同用户数据聚合,减少跨库查询 可能导致数据倾斜 用户中心、社交应用
订单号 订单相关数据聚合 可能热点集中 订单系统
时间戳 便于历史数据归档 新数据热点问题 日志系统、时间序列数据
地理位置 提高本地化查询效率 区域分布不均 LBS应用
复合分片键 更精细的数据分布控制 增加路由复杂度 复杂查询场景

分片算法实现

哈希分片

/**
 * 简单哈希分片算法
 * @param shardingValue 分片键值
 * @param shardingCount 分片数量
 * @return 分片索引
 */
public int hashSharding(String shardingValue, int shardingCount) {
    // 使用一致性哈希算法
    int hashCode = shardingValue.hashCode();
    // 确保非负数
    if (hashCode < 0) {
        hashCode = Math.abs(hashCode);
    }
    return hashCode % shardingCount;
}

范围分片

/**
 * 范围分片算法
 * @param shardingValue 分片键值(假设为整型)
 * @param rangeMap 范围映射表
 * @return 分片索引
 */
public int rangeSharding(long shardingValue, Map<Range, Integer> rangeMap) {
    for (Map.Entry<Range, Integer> entry : rangeMap.entrySet()) {
        Range range = entry.getKey();
        if (range.contains(shardingValue)) {
            return entry.getValue();
        }
    }
    throw new IllegalArgumentException("No matching range found for value: " + shardingValue);
}

// 范围定义
class Range {
    private long start;
    private long end;
    
    public Range(long start, long end) {
        this.start = start;
        this.end = end;
    }
    
    public boolean contains(long value) {
        return value >= start && value <= end;
    }
}

时间分片

/**
 * 按月份分片算法
 * @param dateStr 日期字符串,格式为yyyy-MM-dd
 * @param shardingCount 分片数量
 * @return 分片索引
 */
public int timeSharding(String dateStr, int shardingCount) throws ParseException {
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
    Date date = sdf.parse(dateStr);
    
    Calendar cal = Calendar.getInstance();
    cal.setTime(date);
    
    // 获取年和月
    int year = cal.get(Calendar.YEAR);
    int month = cal.get(Calendar.MONTH) + 1;
    
    // 计算月份索引,例如2023年1月为2023*12+1=24277
    int monthIndex = year * 12 + month;
    
    return monthIndex % shardingCount;
}

分库分表中间件选型

主流中间件对比

中间件 开发语言 特点 适用场景 社区活跃度
ShardingSphere Java 功能全面,支持多种数据库,生态完善 企业级应用,复杂查询场景
MyCat Java 轻量级,性能好,配置简单 中小型应用,简单查询场景
Vitess Go 高性能,支持大规模部署 超大规模应用,云原生环境
TDDL Java 阿里内部使用,功能丰富 阿里系应用
Proxy SQL C/C++ 轻量级,专注于MySQL 简单分片场景

ShardingSphere-JDBC配置示例

# application.yml
spring:
  shardingsphere:
    datasource:
      names: ds0,ds1,ds2,ds3
      ds0:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://192.168.0.1:3306/order_db_0
        username: root
        password: password
      ds1:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://192.168.0.2:3306/order_db_1
        username: root
        password: password
      ds2:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://192.168.0.3:3306/order_db_2
        username: root
        password: password
      ds3:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://192.168.0.4:3306/order_db_3
        username: root
        password: password
    
    sharding:
      default-database-strategy:
        standard:
          sharding-column: user_id
          precise-algorithm-class-name: com.example.algorithm.DatabaseShardingAlgorithm
      
      tables:
        t_order:
          actual-data-nodes: ds${0..3}.t_order_${0..7}
          database-strategy:
            standard:
              sharding-column: user_id
              precise-algorithm-class-name: com.example.algorithm.DatabaseShardingAlgorithm
          table-strategy:
            standard:
              sharding-column: order_id
              precise-algorithm-class-name: com.example.algorithm.TableShardingAlgorithm
          key-generator:
            column: order_id
            type: SNOWFLAKE
            props:
              worker.id: 123
        
        t_order_item:
          actual-data-nodes: ds${0..3}.t_order_item_${0..7}
          database-strategy:
            standard:
              sharding-column: user_id
              precise-algorithm-class-name: com.example.algorithm.DatabaseShardingAlgorithm
          table-strategy:
            standard:
              sharding-column: order_id
              precise-algorithm-class-name: com.example.algorithm.TableShardingAlgorithm
          key-generator:
            column: item_id
            type: SNOWFLAKE
            props:
              worker.id: 123
      
      binding-tables:
        - t_order,t_order_item
      
      broadcast-tables:
        - t_config
    
    props:
      sql.show: true

自定义分片算法实现

package com.example.algorithm;

import org.apache.shardingsphere.api.sharding.standard.PreciseShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingValue;

import java.util.Collection;

/**
 * 数据库分片算法
 */
public class DatabaseShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
    
    @Override
    public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
        Long userId = shardingValue.getValue();
        // 对用户ID取模,确定分库索引
        long dbIndex = userId % availableTargetNames.size();
        
        for (String each : availableTargetNames) {
            if (each.endsWith(String.valueOf(dbIndex))) {
                return each;
            }
        }
        
        throw new UnsupportedOperationException("找不到匹配的数据源");
    }
}

/**
 * 表分片算法
 */
public class TableShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
    
    @Override
    public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
        Long orderId = shardingValue.getValue();
        // 对订单ID取模,确定分表索引
        long tableIndex = orderId % 8;
        
        for (String each : availableTargetNames) {
            if (each.endsWith(String.valueOf(tableIndex))) {
                return each;
            }
        }
        
        throw new UnsupportedOperationException("找不到匹配的数据表");
    }
}

MyCat配置示例

<!-- server.xml -->
<server>
    <system>
        <property name="serverPort">8066</property>
        <property name="managerPort">9066</property>
        <property name="nonePasswordLogin">0</property>
        <property name="useHandshakeV10">1</property>
        <property name="useSqlStat">1</property>
        <property name="useGlobleTableCheck">0</property>
        <property name="sequnceHandlerType">2</property>
        <property name="processorBufferPoolType">0</property>
        <property name="handleDistributedTransactions">0</property>
        <property name="useOffHeapForMerge">1</property>
        <property name="memoryPageSize">1m</property>
        <property name="spillsFileBufferSize">1k</property>
        <property name="useStreamOutput">0</property>
        <property name="systemReserveMemorySize">384m</property>
    </system>
    
    <user name="root">
        <property name="password">123456</property>
        <property name="schemas">TESTDB</property>
    </user>
    
    <user name="user">
        <property name="password">user</property>
        <property name="schemas">TESTDB</property>
        <property name="readOnly">true</property>
    </user>
</server>

<!-- schema.xml -->
<schema name="TESTDB" checkSQLschema="false" sqlMaxLimit="100">
    <!-- 订单表,按用户ID分库,按订单ID分表 -->
    <table name="t_order" primaryKey="order_id" dataNode="dn$0-3.t_order_$0-7" rule="sharding-by-user-order"/>
    
    <!-- 订单明细表,与订单表绑定,保持分片一致性 -->
    <table name="t_order_item" primaryKey="item_id" dataNode="dn$0-3.t_order_item_$0-7" rule="sharding-by-user-order"/>
    
    <!-- 配置表,广播到所有节点 -->
    <table name="t_config" primaryKey="id" type="global" dataNode="dn$0-3.t_config"/>
</schema>

<!-- 定义数据节点 -->
<dataNode name="dn0" dataHost="host1" database="db0"/>
<dataNode name="dn1" dataHost="host1" database="db1"/>
<dataNode name="dn2" dataHost="host2" database="db2"/>
<dataNode name="dn3" dataHost="host2" database="db3"/>

<!-- 定义数据主机 -->
<dataHost name="host1" maxCon="1000" minCon="10" balance="3" writeType="0" dbType="mysql" dbDriver="native">
    <heartbeat>select user()</heartbeat>
    <writeHost host="hostM1" url="192.168.0.1:3306" user="root" password="password">
        <readHost host="hostS1" url="192.168.0.2:3306" user="root" password="password"/>
    </writeHost>
</dataHost>

<dataHost name="host2" maxCon="1000" minCon="10" balance="3" writeType="0" dbType="mysql" dbDriver="native">
    <heartbeat>select user()</heartbeat>
    <writeHost host="hostM2" url="192.168.0.3:3306" user="root" password="password">
        <readHost host="hostS2" url="192.168.0.4:3306" user="root" password="password"/>
    </writeHost>
</dataHost>

<!-- rule.xml -->
<tableRule name="sharding-by-user-order">
    <rule>
        <columns>user_id,order_id</columns>
        <algorithm>hash-mod</algorithm>
    </rule>
</tableRule>

<function name="hash-mod" class="io.mycat.route.function.PartitionByMod">
    <property name="count">4</property> <!-- 分库数量 -->
    <property name="tableCount">8</property> <!-- 每个库中的分表数量 -->
</function>

分库分表数据迁移方案

全量数据迁移

使用Mydumper和Myloader工具

#!/bin/bash
# 全量数据导出脚本

# 配置参数
SOURCE_HOST="192.168.0.1"
SOURCE_USER="root"
SOURCE_PASS="password"
SOURCE_DB="original_db"
BACKUP_DIR="/data/mysql_backup/$(date +%Y%m%d)"
THREADS=8

# 创建备份目录
mkdir -p ${BACKUP_DIR}

# 使用Mydumper导出数据
mydumper \
  --host=${SOURCE_HOST} \
  --user=${SOURCE_USER} \
  --password=${SOURCE_PASS} \
  --database=${SOURCE_DB} \
  --outputdir=${BACKUP_DIR} \
  --threads=${THREADS} \
  --compress \
  --build-empty-files \
  --verbose=3

# 检查导出结果
if [ $? -eq 0 ]; then
  echo "数据导出成功,备份目录: ${BACKUP_DIR}"
else
  echo "数据导出失败!"
  exit 1
fi

# 数据导入脚本
for i in {0..3}; do
  for j in {0..7}; do
    TARGET_HOST="192.168.0.$((i+1))"
    TARGET_DB="order_db_${i}"
    TARGET_TABLE="t_order_${j}"
    
    echo "正在导入数据到 ${TARGET_HOST}:${TARGET_DB}.${TARGET_TABLE}"
    
    # 使用Myloader导入数据
    myloader \
      --host=${TARGET_HOST} \
      --user=${SOURCE_USER} \
      --password=${SOURCE_PASS} \
      --database=${TARGET_DB} \
      --directory=${BACKUP_DIR} \
      --threads=${THREADS} \
      --overwrite-tables \
      --verbose=3
      
    if [ $? -eq 0 ]; then
      echo "导入到 ${TARGET_HOST}:${TARGET_DB}.${TARGET_TABLE} 成功"
    else
      echo "导入到 ${TARGET_HOST}:${TARGET_DB}.${TARGET_TABLE} 失败!"
      exit 1
    fi
  done
done

echo "全量数据迁移完成!"

增量数据同步

使用Canal实现增量同步

// Canal客户端示例代码
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalClient {
    
    public static void main(String[] args) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111), 
                "example", 
                "canal", 
                "canal");
        
        try {
            // 连接Canal服务器
            connector.connect();
            // 订阅数据库表,格式为 {database}.{table}
            connector.subscribe("original_db.t_order");
            // 回滚到未进行ack的地方
            connector.rollback();
            
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(1000);
                long batchId = message.getId();
                
                if (batchId == -1 || message.getEntries().isEmpty()) {
                    // 没有数据,休眠一段时间
                    Thread.sleep(1000);
                    continue;
                }
                
                // 处理数据
                processEntries(message.getEntries());
                
                // 提交确认
                connector.ack(batchId);
            }
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭连接
            connector.disconnect();
        }
    }
    
    private static void processEntries(List<Entry> entries) {
        for (Entry entry : entries) {
            if (entry.getEntryType() != EntryType.ROWDATA) {
                continue;
            }
            
            try {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                String tableName = entry.getHeader().getTableName();
                EventType eventType = rowChange.getEventType();
                
                System.out.println("================> binlog[" + entry.getHeader().getLogfileName() + ":" 
                        + entry.getHeader().getLogfileOffset() + "], name[" + tableName + "], eventType: " + eventType);
                
                for (RowData rowData : rowChange.getRowDatasList()) {
                    if (eventType == EventType.DELETE) {
                        // 处理删除数据
                        processDeleteData(tableName, rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        // 处理插入数据
                        processInsertData(tableName, rowData.getAfterColumnsList());
                    } else if (eventType == EventType.UPDATE) {
                        // 处理更新数据
                        processUpdateData(tableName, rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    private static void processDeleteData(String tableName, List<Column> columns) {
        // 根据分片规则,确定目标库表,执行删除操作
        System.out.println("DELETE from " + tableName);
        printColumns(columns);
    }
    
    private static void processInsertData(String tableName, List<Column> columns) {
        // 根据分片规则,确定目标库表,执行插入操作
        System.out.println("INSERT into " + tableName);
        printColumns(columns);
    }
    
    private static void processUpdateData(String tableName, List<Column> beforeColumns, List<Column> afterColumns) {
        // 根据分片规则,确定目标库表,执行更新操作
        System.out.println("UPDATE " + tableName);
        System.out.println("Before:");
        printColumns(beforeColumns);
        System.out.println("After:");
        printColumns(afterColumns);
    }
    
    private static void printColumns(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

双写方案实现

/**
 * 数据库双写服务
 */
@Service
public class DualWriteService {
    
    @Autowired
    private JdbcTemplate oldJdbcTemplate;
    
    @Autowired
    private JdbcTemplate newJdbcTemplate;
    
    @Autowired
    private ShardingService shardingService;
    
    /**
     * 双写订单数据
     * @param order 订单对象
     * @return 是否成功
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean saveOrder(Order order) {
        try {
            // 写入旧库
            saveToOldDatabase(order);
            
            // 写入新库
            saveToNewDatabase(order);
            
            return true;
        } catch (Exception e) {
            log.error("双写订单数据失败", e);
            throw e;
        }
    }
    
    private void saveToOldDatabase(Order order) {
        String sql = "INSERT INTO t_order (order_id, user_id, status, create_time) VALUES (?, ?, ?, ?)";
        oldJdbcTemplate.update(sql, 
                order.getOrderId(), 
                order.getUserId(), 
                order.getStatus(), 
                order.getCreateTime());
    }
    
    private void saveToNewDatabase(Order order) {
        // 计算分片位置
        ShardingResult shardingResult = shardingService.calculateSharding(order.getUserId(), order.getOrderId());
        
        // 构建实际的SQL
        String dbIndex = shardingResult.getDbIndex();
        String tableIndex = shardingResult.getTableIndex();
        String actualTable = "t_order_" + tableIndex;
        
        // 获取对应分片的JdbcTemplate
        JdbcTemplate shardJdbcTemplate = getShardingJdbcTemplate(dbIndex);
        
        String sql = "INSERT INTO " + actualTable + " (order_id, user_id, status, create_time) VALUES (?, ?, ?, ?)";
        shardJdbcTemplate.update(sql, 
                order.getOrderId(), 
                order.getUserId(), 
                order.getStatus(), 
                order.getCreateTime());
    }
    
    private JdbcTemplate getShardingJdbcTemplate(String dbIndex) {
        // 根据dbIndex获取对应的数据源JdbcTemplate
        // 实际实现可能是从数据源池中获取
        return newJdbcTemplate;
    }
    
    /**
     * 分片结果
     */
    @Data
    public static class ShardingResult {
        private String dbIndex;
        private String tableIndex;
    }
}

分库分表扩容方案

一致性哈希算法实现

/**
 * 一致性哈希环实现
 */
public class ConsistentHash<T> {
    
    // 虚拟节点数量
    private final int numberOfReplicas;
    // 哈希环
    private final SortedMap<Integer, T> circle = new TreeMap<>();
    
    /**
     * 构造函数
     * @param numberOfReplicas 虚拟节点数量
     * @param nodes 物理节点列表
     */
    public ConsistentHash(int numberOfReplicas, Collection<T> nodes) {
        this.numberOfReplicas = numberOfReplicas;
        for (T node : nodes) {
            add(node);
        }
    }
    
    /**
     * 添加节点
     * @param node 物理节点
     */
    public void add(T node) {
        for (int i = 0; i < numberOfReplicas; i++) {
            // 对于每个物理节点,创建多个虚拟节点
            String nodeKey = node.toString() + "-" + i;
            int hash = getHash(nodeKey);
            circle.put(hash, node);
        }
    }
    
    /**
     * 移除节点
     * @param node 物理节点
     */
    public void remove(T node) {
        for (int i = 0; i < numberOfReplicas; i++) {
            String nodeKey = node.toString() + "-" + i;
            int hash = getHash(nodeKey);
            circle.remove(hash);
        }
    }
    
    /**
     * 获取对象所在的节点
     * @param key 对象键
     * @return 节点
     */
    public T get(Object key) {
        if (circle.isEmpty()) {
            return null;
        }
        
        int hash = getHash(key.toString());
        
        // 如果环中没有大于等于该hash值的节点,则返回环的第一个节点
        if (!circle.containsKey(hash)) {
            SortedMap<Integer, T> tailMap = circle.tailMap(hash);
            hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
        }
        
        return circle.get(hash);
    }
    
    /**
     * 计算哈希值
     * @param key 键
     * @return 哈希值
     */
    private int getHash(String key) {
        // 使用FNV1_32_HASH算法计算哈希值
        final int p = 16777619;
        int hash = (int) 2166136261L;
        for (int i = 0; i < key.length(); i++) {
            hash = (hash ^ key.charAt(i)) * p;
        }
        hash += hash << 13;
        hash ^= hash >> 7;
        hash += hash << 3;
        hash ^= hash >> 17;
        hash += hash << 5;
        
        // 确保hash为正数
        if (hash < 0) {
            hash = Math.abs(hash);
        }
        return hash;
    }
}

数据再平衡工具

/**
 * 数据再平衡工具
 */
public class DataRebalancer {
    
    private final JdbcTemplate sourceJdbcTemplate;
    private final JdbcTemplate targetJdbcTemplate;
    private final int batchSize;
    
    public DataRebalancer(JdbcTemplate sourceJdbcTemplate, JdbcTemplate targetJdbcTemplate, int batchSize) {
        this.sourceJdbcTemplate = sourceJdbcTemplate;
        this.targetJdbcTemplate = targetJdbcTemplate;
        this.batchSize = batchSize;
    }
    
    /**
     * 执行数据迁移
     * @param sourceTable 源表
     * @param targetTable 目标表
     * @param shardingKey 分片键
     * @param startValue 起始值
     * @param endValue 结束值
     * @return 迁移的记录数
     */
    public int migrateData(String sourceTable, String targetTable, String shardingKey, 
                          Object startValue, Object endValue) {
        int totalCount = 0;
        int offset = 0;
        
        while (true) {
            // 分批查询数据
            String querySql = String.format(
                "SELECT * FROM %s WHERE %s >= ? AND %s < ? ORDER BY %s LIMIT %d OFFSET %d",
                sourceTable, shardingKey, shardingKey, shardingKey, batchSize, offset);
            
            List<Map<String, Object>> records = sourceJdbcTemplate.queryForList(
                querySql, startValue, endValue);
            
            if (records.isEmpty()) {
                break;
            }
            
            // 批量插入目标表
            batchInsert(targetTable, records);
            
            totalCount += records.size();
            offset += batchSize;
            
            System.out.printf("已迁移 %d 条记录\n", totalCount);
        }
        
        return totalCount;
    }
    
    /**
     * 批量插入数据
     * @param targetTable 目标表
     * @param records 记录列表
     */
    private void batchInsert(String targetTable, List<Map<String, Object>> records) {
        if (records.isEmpty()) {
            return;
        }
        
        // 构建插入SQL
        Map<String, Object> firstRecord = records.get(0);
        StringBuilder columns = new StringBuilder();
        StringBuilder placeholders = new StringBuilder();
        
        for (String column : firstRecord.keySet()) {
            if (columns.length() > 0) {
                columns.append(", ");
                placeholders.append(", ");
            }
            columns.append(column);
            placeholders.append("?");
        }
        
        String insertSql = String.format(
            "INSERT INTO %s (%s) VALUES (%s)",
            targetTable, columns.toString(), placeholders.toString());
        
        // 批量执行插入
        targetJdbcTemplate.batchUpdate(insertSql, new BatchPreparedStatementSetter() {
            @Override
            public void setValues(PreparedStatement ps, int i) throws SQLException {
                Map<String, Object> record = records.get(i);
                int index = 1;
                for (String column : firstRecord.keySet()) {
                    ps.setObject(index++, record.get(column));
                }
            }
            
            @Override
            public int getBatchSize() {
                return records.size();
            }
        });
    }
}

扩容流程自动化脚本

#!/bin/bash
# 分库分表扩容自动化脚本

set -e

# 配置参数
SOURCE_HOST="192.168.0.1"
SOURCE_USER="root"
SOURCE_PASS="password"
SOURCE_DB="order_db_0"
TARGET_HOST="192.168.0.5"
TARGET_USER="root"
TARGET_PASS="password"
TARGET_DB="order_db_4"
TABLE_PREFIX="t_order_"
SHARD_KEY="user_id"
BATCH_SIZE=1000
LOG_FILE="rebalance_$(date +%Y%m%d%H%M%S).log"

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a ${LOG_FILE}
}

# 检查目标数据库是否存在
check_target_db() {
    log "检查目标数据库 ${TARGET_DB} 是否存在..."
    
    DB_EXISTS=$(mysql -h${TARGET_HOST} -u${TARGET_USER} -p${TARGET_PASS} -e "SHOW DATABASES LIKE '${TARGET_DB}';" | grep -c ${TARGET_DB} || true)
    
    if [ ${DB_EXISTS} -eq 0 ]; then
        log "创建目标数据库 ${TARGET_DB}..."
        mysql -h${TARGET_HOST} -u${TARGET_USER} -p${TARGET_PASS} -e "CREATE DATABASE ${TARGET_DB} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;"
    else
        log "目标数据库 ${TARGET_DB} 已存在"
    fi
}

# 创建目标表结构
create_target_tables() {
    log "创建目标表结构..."
    
    for i in {0..7}; do
        SOURCE_TABLE="${TABLE_PREFIX}${i}"
        TARGET_TABLE="${TABLE_PREFIX}${i}"
        
        log "复制表结构: ${SOURCE_DB}.${SOURCE_TABLE} -> ${TARGET_DB}.${TARGET_TABLE}"
        
        # 获取表结构
        TABLE_DDL=$(mysqldump -h${SOURCE_HOST} -u${SOURCE_USER} -p${SOURCE_PASS} --no-data ${SOURCE_DB} ${SOURCE_TABLE} | grep -v "^--" | grep -v "^/\*")
        
        # 创建目标表
        echo "${TABLE_DDL}" | mysql -h${TARGET_HOST} -u${TARGET_USER} -p${TARGET_PASS} ${TARGET_DB}
    done
}

# 迁移数据
migrate_data() {
    log "开始数据迁移..."
    
    # 计算需要迁移的用户ID范围
    # 假设我们要将原来4个库扩容到5个库,那么需要迁移1/5的数据
    # 对于user_id % 4 = 0的数据,迁移user_id % 5 = 4的部分
    
    for i in {0..7}; do
        SOURCE_TABLE="${TABLE_PREFIX}${i}"
        TARGET_TABLE="${TABLE_PREFIX}${i}"
        
        log "迁移数据: ${SOURCE_DB}.${SOURCE_TABLE} -> ${TARGET_DB}.${TARGET_TABLE}"
        
        # 统计需要迁移的记录数
        TOTAL_COUNT=$(mysql -h${SOURCE_HOST} -u${SOURCE_USER} -p${SOURCE_PASS} -e "SELECT COUNT(*) FROM ${SOURCE_DB}.${SOURCE_TABLE} WHERE ${SHARD_KEY} % 5 = 4;" | tail -1)
        log "需要迁移的记录数: ${TOTAL_COUNT}"
        
        # 分批迁移数据
        OFFSET=0
        MIGRATED=0
        
        while true; do
            # 查询一批数据
            DATA_FILE="/tmp/migrate_data_${i}_${OFFSET}.sql"
            mysql -h${SOURCE_HOST} -u${SOURCE_USER} -p${SOURCE_PASS} -e "
                SELECT * FROM ${SOURCE_DB}.${SOURCE_TABLE} 
                WHERE ${SHARD_KEY} % 5 = 4
                LIMIT ${BATCH_SIZE} OFFSET ${OFFSET}
            " > ${DATA_FILE}
            
            # 检查是否还有数据
            RECORD_COUNT=$(cat ${DATA_FILE} | wc -l)
            if [ ${RECORD_COUNT} -le 1 ]; then
                rm -f ${DATA_FILE}
                break
            fi
            
            # 转换为INSERT语句
            INSERT_FILE="/tmp/migrate_insert_${i}_${OFFSET}.sql"
            echo "INSERT INTO ${TARGET_DB}.${TARGET_TABLE} VALUES " > ${INSERT_FILE}
            
            # 处理数据文件,转换为INSERT语句
            tail -n +2 ${DATA_FILE} | sed 's/^/(/' | sed 's/$/),/' | sed '$s/,$/;/' >> ${INSERT_FILE}
            
            # 导入数据到目标库
            mysql -h${TARGET_HOST} -u${TARGET_USER} -p${TARGET_PASS} < ${INSERT_FILE}
            
            # 更新进度
            MIGRATED=$((MIGRATED + RECORD_COUNT - 1))
            log "已迁移 ${MIGRATED}/${TOTAL_COUNT} 条记录"
            
            # 清理临时文件
            rm -f ${DATA_FILE} ${INSERT_FILE}
            
            # 更新偏移量
            OFFSET=$((OFFSET + BATCH_SIZE))
        done
        
        log "表 ${SOURCE_TABLE} 数据迁移完成"
    done
}

# 验证数据一致性
validate_data() {
    log "验证数据一致性..."
    
    for i in {0..7}; do
        SOURCE_TABLE="${TABLE_PREFIX}${i}"
        TARGET_TABLE="${TABLE_PREFIX}${i}"
        
        log "验证表 ${SOURCE_TABLE} 的数据一致性"
        
        # 比较源表和目标表的记录数
        SOURCE_COUNT=$(mysql -h${SOURCE_HOST} -u${SOURCE_USER} -p${SOURCE_PASS} -e "SELECT COUNT(*) FROM ${SOURCE_DB}.${SOURCE_TABLE} WHERE ${SHARD_KEY} % 5 = 4;" | tail -1)
        TARGET_COUNT=$(mysql -h${TARGET_HOST} -u${TARGET_USER} -p${TARGET_PASS} -e "SELECT COUNT(*) FROM ${TARGET_DB}.${TARGET_TABLE};" | tail -1)
        
        if [ "${SOURCE_COUNT}" != "${TARGET_COUNT}" ]; then
            log "错误: 表 ${SOURCE_TABLE} 的记录数不一致! 源表: ${SOURCE_COUNT}, 目标表: ${TARGET_COUNT}"
            exit 1
        fi
        
        log "表 ${SOURCE_TABLE} 的记录数一致: ${SOURCE_COUNT}"
        
        # 抽样检查数据内容
        log "抽样检查数据内容..."
        
        # 随机选择10条记录进行比较
        SAMPLE_IDS=$(mysql -h${SOURCE_HOST} -u${SOURCE_USER} -p${SOURCE_PASS} -e "
            SELECT id FROM ${SOURCE_DB}.${SOURCE_TABLE} 
            WHERE ${SHARD_KEY} % 5 = 4
            ORDER BY RAND() LIMIT 10;
        " | tail -n +2)
        
        for ID in ${SAMPLE_IDS}; do
            SOURCE_DATA=$(mysql -h${SOURCE_HOST} -u${SOURCE_USER} -p${SOURCE_PASS} -e "
                SELECT * FROM ${SOURCE_DB}.${SOURCE_TABLE} WHERE id = ${ID};
            " | tail -1)
            
            TARGET_DATA=$(mysql -h${TARGET_HOST} -u${TARGET_USER} -p${TARGET_PASS} -e "
                SELECT * FROM ${TARGET_DB}.${TARGET_TABLE} WHERE id = ${ID};
            " | tail -1)
            
            if [ "${SOURCE_DATA}" != "${TARGET_DATA}" ]; then
                log "错误: ID=${ID} 的数据不一致!"
                log "源数据: ${SOURCE_DATA}"
                log "目标数据: ${TARGET_DATA}"
                exit 1
            fi
        done
        
        log "表 ${SOURCE_TABLE} 的数据内容一致"
    done
    
    log "数据一致性验证通过"
}

# 更新应用配置
update_app_config() {
    log "更新应用配置..."
    
    # 这里需要根据实际应用的配置方式进行修改
    # 例如,更新ShardingSphere的配置文件
    
    log "应用配置已更新"
}

# 清理源库数据
cleanup_source_data() {
    log "清理源库数据..."
    
    # 确认迁移完成后,删除源库中已迁移的数据
    for i in {0..7}; do
        SOURCE_TABLE="${TABLE_PREFIX}${i}"
        
        log "清理表 ${SOURCE_TABLE} 中已迁移的数据"
        
        mysql -h${SOURCE_HOST} -u${SOURCE_USER} -p${SOURCE_PASS} -e "
            DELETE FROM ${SOURCE_DB}.${SOURCE_TABLE} WHERE ${SHARD_KEY} % 5 = 4;
        "
    done
    
    log "源库数据清理完成"
}

# 主函数
main() {
    log "开始分库分表扩容流程..."
    
    # 检查目标数据库
    check_target_db
    
    # 创建目标表结构
    create_target_tables
    
    # 迁移数据
    migrate_data
    
    # 验证数据一致性
    validate_data
    
    # 更新应用配置
    update_app_config
    
    # 清理源库数据
    cleanup_source_data
    
    log "分库分表扩容流程完成!"
}

# 执行主函数
main

分库分表常见问题及解决方案

跨库事务处理

基于XA协议的分布式事务

/**
 * 基于XA协议的分布式事务管理器
 */
@Service
public class XATransactionManager {
    
    @Autowired
    private DataSource dataSource;
    
    /**
     * 执行分布式事务
     * @param action 事务操作
     * @return 执行结果
     */
    public <T> T executeInXATransaction(Supplier<T> action) throws Exception {
        UserTransaction userTransaction = null;
        
        try {
            // 获取JTA事务管理器
            userTransaction = (UserTransaction) new InitialContext().lookup("java:comp/UserTransaction");
            
            // 开始事务
            userTransaction.begin();
            
            // 执行业务逻辑
            T result = action.get();
            
            // 提交事务
            userTransaction.commit();
            
            return result;
        } catch (Exception e) {
            // 回滚事务
            if (userTransaction != null) {
                try {
                    userTransaction.rollback();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            throw e;
        }
    }
}

基于TCC模式的分布式事务

/**
 * TCC事务接口
 */
public interface TCCService<T> {
    
    /**
     * Try阶段:资源预留
     * @param params 业务参数
     * @return 执行结果
     */
    boolean try_(T params);
    
    /**
     * Confirm阶段:确认执行
     * @param params 业务参数
     * @return 执行结果
     */
    boolean confirm(T params);
    
    /**
     * Cancel阶段:取消执行
     * @param params 业务参数
     * @return 执行结果
     */
    boolean cancel(T params);
}

/**
 * 订单TCC实现
 */
@Service
public class OrderTCCServiceImpl implements TCCService<Order> {
    
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    @Override
    public boolean try_(Order order) {
        try {
            // 检查库存
            int stock = jdbcTemplate.queryForObject(
                "SELECT stock FROM t_product WHERE product_id = ?", 
                Integer.class, 
                order.getProductId());
            
            if (stock < order.getQuantity()) {
                return false;
            }
            
            // 冻结库存
            jdbcTemplate.update(
                "UPDATE t_product SET frozen_stock = frozen_stock + ? WHERE product_id = ?",
                order.getQuantity(),
                order.getProductId());
            
            // 创建订单,状态为PENDING
            jdbcTemplate.update(
                "INSERT INTO t_order (order_id, user_id, product_id, quantity, status) VALUES (?, ?, ?, ?, 'PENDING')",
                order.getOrderId(),
                order.getUserId(),
                order.getProductId(),
                order.getQuantity());
            
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    
    @Override
    public boolean confirm(Order order) {
        try {
            // 扣减库存
            jdbcTemplate.update(
                "UPDATE t_product SET stock = stock - ?, frozen_stock = frozen_stock - ? WHERE product_id = ?",
                order.getQuantity(),
                order.getQuantity(),
                order.getProductId());
            
            // 更新订单状态为CONFIRMED
            jdbcTemplate.update(
                "UPDATE t_order SET status = 'CONFIRMED' WHERE order_id = ?",
                order.getOrderId());
            
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    
    @Override
    public boolean cancel(Order order) {
        try {
            // 解冻库存
            jdbcTemplate.update(
                "UPDATE t_product SET frozen_stock = frozen_stock - ? WHERE product_id = ?",
                order.getQuantity(),
                order.getProductId());
            
            // 更新订单状态为CANCELED
            jdbcTemplate.update(
                "UPDATE t_order SET status = 'CANCELED' WHERE order_id = ?",
                order.getOrderId());
            
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
}

分页查询优化

/**
 * 分页查询优化器
 */
@Service
public class ShardingPaginationOptimizer {
    
    @Autowired
    private List<JdbcTemplate> shardingJdbcTemplates;
    
    /**
     * 执行分片分页查询
     * @param sql SQL语句
     * @param params SQL参数
     * @param pageNum 页码
     * @param pageSize 每页大小
     * @return 分页结果
     */
    public <T> Page<T> queryWithPagination(String sql, Object[] params, int pageNum, int pageSize, 
                                          RowMapper<T> rowMapper) {
        // 计算总记录数
        int totalCount = countTotalRecords(sql, params);
        
        // 计算总页数
        int totalPages = (totalCount + pageSize - 1) / pageSize;
        
        // 查询当前页数据
        List<T> records = queryForPage(sql, params, pageNum, pageSize, rowMapper);
        
        // 构建分页结果
        Page<T> page = new Page<>();
        page.setPageNum(pageNum);
        page.setPageSize(pageSize);
        page.setTotalCount(totalCount);
        page.setTotalPages(totalPages);
        page.setRecords(records);
        
        return page;
    }
    
    /**
     * 统计总记录数
     */
    private int countTotalRecords(String sql, Object[] params) {
        // 构建COUNT查询
        String countSql = "SELECT COUNT(*) FROM (" + sql + ") AS t";
        
        // 在所有分片上执行查询并汇总结果
        int totalCount = 0;
        for (JdbcTemplate jdbcTemplate : shardingJdbcTemplates) {
            Integer count = jdbcTemplate.queryForObject(countSql, params, Integer.class);
            if (count != null) {
                totalCount += count;
            }
        }
        
        return totalCount;
    }
    
    /**
     * 查询分页数据
     */
    private <T> List<T> queryForPage(String sql, Object[] params, int pageNum, int pageSize, 
                                    RowMapper<T> rowMapper) {
        // 计算偏移量
        int offset = (pageNum - 1) * pageSize;
        
        // 添加分页限制
        String pageSql = sql + " LIMIT " + offset + ", " + pageSize;
        
        // 在所有分片上执行查询
        List<T> allRecords = new ArrayList<>();
        for (JdbcTemplate jdbcTemplate : shardingJdbcTemplates) {
            List<T> records = jdbcTemplate.query(pageSql, params, rowMapper);
            allRecords.addAll(records);
        }
        
        // 对结果进行排序和截取
        // 注意:这里假设SQL中已经包含了ORDER BY子句
        // 如果没有,需要在这里进行排序
        
        // 截取需要的记录
        int fromIndex = 0;
        int toIndex = Math.min(pageSize, allRecords.size());
        
        return allRecords.subList(fromIndex, toIndex);
    }
    
    /**
     * 分页结果
     */
    @Data
    public static class Page<T> {
        private int pageNum;
        private int pageSize;
        private int totalCount;
        private int totalPages;
        private List<T> records;
    }
}

全局ID生成方案

/**
 * 雪花算法ID生成器
 */
@Service
public class SnowflakeIdGenerator {
    
    // 起始时间戳,可以设置为项目开始时间
    private final long startTimestamp = 1609430400000L; // 2021-01-01 00:00:00
    
    // 机器ID所占位数
    private final long workerIdBits = 5L;
    // 数据中心ID所占位数
    private final long dataCenterIdBits = 5L;
    // 序列号所占位数
    private final long sequenceBits = 12L;
    
    // 支持的最大机器ID
    private final long maxWorkerId = ~(-1L << workerIdBits);
    // 支持的最大数据中心ID
    private final long maxDataCenterId = ~(-1L << dataCenterIdBits);
    
    // 机器ID向左移12位
    private final long workerIdShift = sequenceBits;
    // 数据中心ID向左移17位
    private final long dataCenterIdShift = sequenceBits + workerIdBits;
    // 时间戳向左移22位
    private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
    
    // 序列号掩码,用于限定序列号的最大值
    private final long sequenceMask = ~(-1L << sequenceBits);
    
    // 工作机器ID
    private long workerId;
    // 数据中心ID
    private long dataCenterId;
    // 毫秒内序列
    private long sequence = 0L;
    // 上次生成ID的时间戳
    private long lastTimestamp = -1L;
    
    /**
     * 构造函数
     * @param workerId 工作机器ID
     * @param dataCenterId 数据中心ID
     */
    public SnowflakeIdGenerator(long workerId, long dataCenterId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException("Worker ID can't be greater than " + maxWorkerId + " or less than 0");
        }
        if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
            throw new IllegalArgumentException("DataCenter ID can't be greater than " + maxDataCenterId + " or less than 0");
        }
        this.workerId = workerId;
        this.dataCenterId = dataCenterId;
    }
    
    /**
     * 获取下一个ID
     * @return ID
     */
    public synchronized long nextId() {
        long timestamp = timeGen();
        
        // 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过,抛出异常
        if (timestamp < lastTimestamp) {
            throw new RuntimeException("Clock moved backwards. Refusing to generate id for " + (lastTimestamp - timestamp) + " milliseconds");
        }
        
        // 如果是同一时间生成的,则进行毫秒内序列
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            // 毫秒内序列溢出
            if (sequence == 0) {
                // 阻塞到下一个毫秒,获得新的时间戳
                timestamp = tilNextMillis(lastTimestamp);
            }
        } else {
            // 时间戳改变,毫秒内序列重置
            sequence = 0L;
        }
        
        // 上次生成ID的时间戳
        lastTimestamp = timestamp;
        
        // 移位并通过或运算拼到一起组成64位的ID
        return ((timestamp - startTimestamp) << timestampLeftShift) |
               (dataCenterId << dataCenterIdShift) |
               (workerId << workerIdShift) |
               sequence;
    }
    
    /**
     * 阻塞到下一个毫秒,直到获得新的时间戳
     * @param lastTimestamp 上次生成ID的时间戳
     * @return 新的时间戳
     */
    private long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }
    
    /**
     * 返回以毫秒为单位的当前时间
     * @return 当前时间(毫秒)
     */
    private long timeGen() {
        return System.currentTimeMillis();
    }
}

总结

MySQL分库分表是解决数据库容量和性能瓶颈的有效手段,但实施过程中需要考虑多个方面:

  1. 分片策略设计:选择合适的分片键和分片算法,确保数据分布均匀
  2. 中间件选型:根据业务需求和技术栈选择合适的分库分表中间件
  3. 数据迁移方案:制定完善的全量和增量数据迁移方案,确保数据一致性
  4. 扩容方案:设计灵活的扩容方案,支持业务持续增长
  5. 常见问题处理:解决跨库事务、分页查询、全局ID生成等常见问题

通过本文介绍的方案和最佳实践,可以帮助开发团队更好地实施MySQL分库分表,构建高性能、可扩展的数据库架构。

分享文章