分布式多机扫库脚本

3周前 (09-03) wang JAVA 0评论 未收录 21℃ 浏览数:14

需求背景

需要扫描全库的商品数据,然后根据不同的条件执行业务。比较麻烦的就是如何去扫库。现在的商品表比较少,大概四百万条数据。但是分了八个库,1024张表。每张表大概有4k的数据。

解决思路

有八台机器,所以八机并行,每个处理一部分的数据,那么每个机器需要获得一个标示,然后通过标示去处理数据。那么现在的问题就是如何让机器获取标示。这个时候就想到了分布式任务调度。可以分片,

这样每台机器就可以获得到不同的分片。可以通过分片区处理一部分的数据。

private static List<String> getShardingTables(List<String> tables, 
    int shardingCount, Map<Integer, String> shardingMap) {
    // 如果分片数1个,或者分片map信息为空,直接返回整个table
    if (shardingCount <= 1 || MapUtils.isEmpty(shardingMap)) {
        return tables;
    }
    // 在设定分片数的时候会特意把分片数设为可以整除表数,如果不能整除说明设置的有问题,也返回整张表
    if (tables.size() % shardingCount != 0) {
        return tables;
    }
    // 每个分片要处理的表数量
    int index = tables.size() / shardingCount;
    // key为分片号,value为该分片要处理的表名称列表
    Map<Integer, List<String>> map = new HashMap<>();
    for (int i = 0; i < shardingCount; i++) {
        List<String> subList = tables.subList(i * index, (i + 1) * index);
        map.put(i, subList);
    }
    List<String> result = new ArrayList<>();
    // 从当前机子分到的分片中找到对应要处理的表名称列表合集
    for (Integer key : shardingMap.keySet()) {
        if (map.get(key) != null) {
            result.addAll(map.get(key));
        }
    }
    return result;
}

这样就可以多机并行处理。

那这样的效率不是最高,我们单机还可以并发。开一个线程池去多线程处理。我们在单机也需要将数据再次分区,分成每个线程去执行一部分。

<pre>private void doTotalDump(int shardingCount, Map<Integer, String> shardingMap) {
    // 拿到所有物理表名
    String s = itemDao.queryTablePartitions("Item");
    List<String> tablesAll = Hints.parseRouteCustom2List(s);
    List<String> tables = getShardingTables(tablesAll, shardingCount, shardingMap);
    if (null != tables && !tables.isEmpty()) {
        // 遍历物理表,找到符合当前分片的,进行处理
        logger.info("total dump...tables:{}", tables.toString());
        // 分线程操作
        int total = tables.size();
        // 每个线程操作的表数目
        // 如果无法除尽,四舍五入,然后最后一个线程操作剩下的表
        if (threadCount > total) {
            // 如果设置了超过总表数的线程,只使用总表数个
            threadCount = total;
        }
        int eachCount = Math.round(total / (float) threadCount);
        for (int i = 0; i < threadCount; i++) {
            List<String> tablei;
            if (i == threadCount - 1) {
                // 最后一个特殊处理
                // 前面几个线程已经处理过的数量
                int start = eachCount * (threadCount - 1);
                tablei = tables.subList(start, total);
            } else {
                tablei = tables.subList(i * eachCount, (i + 1) * eachCount);
            }
            List<String> finalTablei = tablei;
            threadPoolExecutor.execute(() -> operateTable(finalTablei));
        }
    }
}</pre>
博主

Just do it. Now or never.

相关推荐

嗨、骚年、快来消灭0回复。