基于Shardingsphere + ElasticSearch 实现3000万级用户数据分库分表+读写分离实现思路

年前在公司中做了个分库表的服务,将过程中出现的问题以及技术栈记录下来

为什么要分库分表

该系统是个身份认证系统,主要面向全省个人与法人用户,所以用户量也比较大,改造目的是性能问题、数据安全、数据库高可用等问题困扰,数据库架构为单库,单表,数据量为2000w+,有主从备份、无读写分离。在每日用户办事高峰期会导致主库写入、查询压力同步升高,由于数据量过大而造成数据库性能降低的随着业务增长单库单表已经满足不了现有需求,开始考虑分库分表。

什么是ShardingSphere

Apache ShardingSphere 是一套开源的分布式数据库解决方案组成的生态圈,它由 JDBC、Proxy 和 Sidecar(规划中)这 3 款既能够独立部署,又支持混合部署配合使用的产品组成。 它们均提供标准化的数据水平扩展、分布式事务和分布式治理等功能,可适用于如 Java 同构、异构语言、云原生等各种多样化的应用场景。 Shardingjdbc

如何分库分表

如何分库需要对系统的业务有足够的了解,才能着手进行库表的拆分,主流拆分方式有以下四种:

垂直分表:操作数据库中某张表,把这张表中的一部分字段数据存到一张新表里面,再把这张表另一部分字段数据存到另外一张表里面 垂直分库:把单一数据库按照业务进行划分,专库专表 水平分表:在同一个数据库中建立多张表,根据一定的规则划分主键ID,将不同的数据存放在多个表中 水平分库:将一张表建立在多个数据库,根据一定的规则划分主键ID,将数据平均存放到每一个库

我这次拆分采用垂直分库+水平分表的方式。由于系统登录业务的特殊性导致不能使用原始自增的用户id来作为路由键,只能重新生成全局的分布式id,然后加入ElasticSearch作为用户登录名、手机号、身份证号等多种登录方式的索引库,ES中维护用户登录标识与分布式ID的索引。登录时根据登录方式将登录名转换为分布式ID然后通过路由算法进行查询用户的具体信息。

根据业务明确了如何拆分以及实现思路,那么接下来就进行实现!

技术栈:

SpringBoot 2.3.2.RELEASE

SpringCloud Hoxton.SR10

MyBatis-Plus 3.4.0

Sharding-JDBC 4.1.1

ElasticSearch 7.6.2

alibaba druid 1.1.21

集成Shardingjdbc和ElasticSearch

1、引入相关依赖

<!--elasticsearch-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>


<!--  shardingjdbc-->
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
</dependency>

<!-- shardingjdbc xa transaction-->
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-transaction-xa-core</artifactId>
</dependency>

2、制定shardingjdbc分库分表规则、读写分离配置

#全局springboot、cloud相关通用配置。拉取配置时会拉取改配置文件,如有每个服务都有重复配置项,请添加到该配置文件中
spring:
  elasticsearch:
    rest:
      uris: 10.18.32.41:9200,10.18.32.48:9200
      username: ****
      password: ****
  shardingsphere:
    props:
      sql:
        show: true
    datasource:
      #共有数据源
      names: ds0,ds1
      ds0:
        driver-class-name: com.mysql.cj.jdbc.Driver
        type: com.alibaba.druid.pool.DruidDataSource
        url: jdbc:mysql://****:3306/shard-jdbc-01?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8&rewriteBatchedStatements=true
        username: ENC(e4e3IQQEOQeWBriSmSc/Tw==)
        password: ENC(ce3IsuRlKwDbT9QRK6yBhCeSf3Y7PMvf)
        maxPoolSize: 100
        minPoolSize: 5
        validationQuery: SELECT 1 FROM DUAL
        minEvictableIdleTimeMillis: 300000
        minIdle: 5
        testOnBorrow: false
        testOnReturn: false
        testWhileIdle: true
        timeBetweenEvictionRunsMillis: 800000
        initialSize: 5
        logSlowSql: true
        maxActive: 20
        maxWait: 60000
      ds1:
        driver-class-name: com.mysql.cj.jdbc.Driver
        type: com.alibaba.druid.pool.DruidDataSource
        url: jdbc:mysql://****:3306/shard-jdbc-02?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8&rewriteBatchedStatements=true
        username: ENC(e4e3IQQEOQeWBriSmSc/Tw==)
        password: ENC(ce3IsuRlKwDbT9QRK6yBhCeSf3Y7PMvf)
        maxPoolSize: 100
        minPoolSize: 5
        validationQuery: SELECT 1 FROM DUAL
        minEvictableIdleTimeMillis: 300000
        minIdle: 5
        testOnBorrow: false
        testOnReturn: false
        testWhileIdle: true
        timeBetweenEvictionRunsMillis: 600000
        initialSize: 5
        logSlowSql: true
        maxActive: 20
        maxWait: 60000
    sharding:
	  masterslave:
        name: ms
        #master 写
        master-data-source-name: ds0
        #slave 读
        slave-data-source-names: ds1
        load-balance-algorithm-type: round_robin
      default-data-source-name: ds0
      tables:
        complat_outsideuser:
          key-generator:
            column: id
            type: SNOWFLAKE
            props:
              worker.id: 1
              max.vibration.offset: 1
          actual-data-nodes: ds$->{0..1}.complat_outsideuser_$->{0..2}
          default-database-strategy: ds0
          default-table-strategy: complat_outsideuser
          #分库策略
          database-strategy:
            standard:
              sharding-column: id
              precise-algorithm-class-name: com.zdww.uids.uns.config.shardjdbc.algorithm.database.MyPreciseDatabaseShardingAlgorithm
          #分表策略
          table-strategy:
            standard:
              sharding-column: id
              precise-algorithm-class-name: com.zdww.uids.uns.config.shardjdbc.algorithm.table.MyPreciseTableShardingAlgorithm

3、编写Hash路由键路由算法

库路由:

public class MyPreciseDatabaseShardingAlgorithm implements PreciseShardingAlgorithm<String> {

    private static final String DATA_SOURCE_SUFFIX = "ds";


    private static final Integer DATA_BASE_LIMIT = 2;

    @Override
    public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<String> shardingValue) {

        //分布式主键
        Integer hashedSnowFlakeId =  HashUtil.rsHash(shardingValue.getValue());

        String dataSource = DATA_SOURCE_SUFFIX + hashedSnowFlakeId % DATA_BASE_LIMIT;

        if (!availableTargetNames.contains(dataSource)){

            log.error("不存在可用的数据源{}",dataSource);
        }
        return dataSource;
    }

表路由:

public class MyPreciseTableShardingAlgorithm implements PreciseShardingAlgorithm<String> {
    @Override
    public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<String> shardingValue) {

        //log.error("collection:{},shardingValues:{}", availableTargetNames,shardingValue);
        //字段 value
        Integer hashedSnowFlakeId =  HashUtil.rsHash(shardingValue.getValue());
        //逻辑表名
        String logicTableName = shardingValue.getLogicTableName();
        //截取分布式主键的最后两位
        //Integer substringColumnValue = Integer.parseInt(columnValue.substring(columnValue.length() - 2));
        //取余
        String finalRealTableName = logicTableName + "_" + hashedSnowFlakeId % 3;
        //判断根据分片规则生成的物理表名是否可用
        if (!availableTargetNames.contains(finalRealTableName)){
            log.error("未找到物理表,生成的物理表为{},已赋值默认表【complat_outsideuser】",finalRealTableName);
            finalRealTableName = "t_user_1";
        }
        return finalRealTableName;
    }
}

4、通过编写自定义注解将用户唯一标识字段转换为分布式ID。

/**
 *
 * 用户唯一标识转换为分布式主键
 * @author ZaNgVVB:)
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface UserKeyConvertDistributeKey {

    /**
     * 用户唯一标识
     *
     * @return
     */
    String requestKeyName();
}
/**
 * @Company: ******</ p>
 * @Project: uids-parent</p>
 * @ClassName: UserKeyConvertDistributeKey</ p>
 * @Description: 通过用户唯一标识(身份证、手机号、登录名、wxOpenId,upayId,aliUserId)转换为分布式主键 </p>
 * @CreateDate: 2021/7/20  14:40</p>
 * @Author: ZaNgVVB:)</ p>
 * @Version: 1.0.0</ p>
 * @Copyright: Copyright(c) 2021</p>
 */
@Profile({"prod","dev"})
@Slf4j
@Aspect
@Order(10)
@Component
public class UserKeyConvertDistributeKeyAspect extends BaseAspect {


    @Autowired
    private ElasticsearchRestTemplate elasticsearchRestTemplate;

    @Autowired
    private UidsPubNaturalUserMapper uidsPubNaturalUserMapper;

    @Autowired
    private ElasticSearchUtil elasticSearchUtil;

    @Pointcut("@annotation(com.zdww.uids.uns.aspect.UserKeyConvertDistributeKey)")
    private void MethodPointcut() {}


    //private String[] columnNames = {"loginName","idCardNum","mobile","wxOpenId","aliUserId","upayId"};

    @Around("MethodPointcut()")
    public Object doBefore(ProceedingJoinPoint joinPoint) {

		//该部分由于是公司核心代码不方便发出,如有兴趣可询问实现思路

    }


    /**
     * 根据用户输入的唯一标识多字段从ES中查询分布式主键
     * @param userUniqueKey
     * @return
	 * @author ZaNgVVB:)
     */
    private EsComplatOutSideUserMapping queryDistributeKeyByUserKey(String userUniqueKey,String requestKeyName){

        NativeSearchQuery searchQuery  = new NativeSearchQueryBuilder()
                .withQuery(QueryBuilders.termQuery(requestKeyName+ ".keyword",userUniqueKey))
                .build();
        SearchHits<EsComplatOutSideUserMapping> searchHits = elasticsearchRestTemplate.search(searchQuery, EsComplatOutSideUserMapping.class);
        List<SearchHit<EsComplatOutSideUserMapping>> hits = searchHits.getSearchHits();
        if (hits.size() <= 0) {
            List<ComplatOutsideUser> complatOutsideUsers = queryDataDb(requestKeyName, userUniqueKey);
            if (complatOutsideUsers == null){
                return null;
            }
           return dataDbResult(complatOutsideUsers);
        }else if (hits.size() > 1){
            List<EsComplatOutSideUserBaseMapping> esComplatOutSideUserMappings = new ArrayList<>();
            hits.forEach(hit -> esComplatOutSideUserMappings.add(hit.getContent()));
            throw new TooManyUserAccountException(esComplatOutSideUserMappings);
        }
        return hits.get(0).getContent();
    }

}

po出的代码并非全代码,请勿直接拷贝使用。

5、开放对分库分表服务的基础CRUD Rest接口或者是RPC接口。该部分就不贴代码了。

总结

在做分库分表之前一定要对需求有充分的了解,不可盲目拆分,以及业务数据一致性,需不需要分布式事务等问题都要考虑到,不然非但达不到要求还会埋雷。如果采用hash分片的方式就要考虑无限增长数据的扩容性,以及历史数据迁移再分片等问题。例如采用一致性hash解决后期增加分片再hash困难和多服务器节点的数据负载不均衡等问题。

Comment