数据库同步实践(suishen-cdc)

一、需求背景

需要将业务数据库的数据,向数仓同步,目前包括两种数据库:mongo、mysql。

二、总体实现方案

1、总体流程

alt

  • a、定时任务,加载数据库事件偏移信息,统一监听数据库变更事件;
  • b、本地缓存收集事件变更信息(一定的数据量、一定的收集时间);
  • c、收集达到阈值后,向消息队列发送消息;
  • d、发送消息成功后,使用redis记录最后一条消息的偏移信息;
  • e、消息队列批量读取事件,批量向数仓同步数据。

2、mongo实现方案

mongo-java-driver库,提供了Change Streams API来监听和获取实时变更(change)事件。通过Change Streams,可以监视集合中的插入、更新和删除等操作,并对这些变更事件做出响应。

@Test
public void tst() {  
    // 获取指定数据库连接
    MongoDatabase database = mongoTemplate.getDb().getMongoClient().getDatabase("database");
    // 过滤需要监听的表
    Document matchStage = new Document("$match", new Document("ns.coll", new Document("$in",
            Arrays.asList("colletion", "label"))));
    // 开启监听
    ChangeStreamIterable<Document> changeStream = database
            .watch(Arrays.asList(matchStage)).fullDocument(FullDocument.UPDATE_LOOKUP)
            // 设置事件偏移信息
            .resumeAfter(BsonDocument.parse("{\"_data\": \"8265AB99800000000129295A10048EBDB1DB4C23440CAD9BD906E9098378463C5F6964003C3134313334000004\"}"));

    for (ChangeStreamDocument<Document> document : changeStream) {
        // 根据操作类型进行相应的操作
        if (document.getOperationType() == OperationType.INSERT) {
            // 处理插入操作
            System.out.println(JSON.toJSONString(document.getFullDocument()));
        } else if (document.getOperationType() == OperationType.UPDATE) {
            // 处理更新操作
            System.out.println(JSON.toJSONString(document.getFullDocument()));
        } else if (document.getOperationType() == OperationType.DELETE) {
            // 处理删除操作
            System.out.println(JSON.toJSONString(document.getDocumentKey()));
        }
        // 获取偏移信息
        BsonDocument resumeToken = document.getResumeToken();

        System.out.println(resumeToken.toJson());
    }
}

注意点

  • mongo的changeStream支持订阅指定某些表,但是如果后续要新增监听的表,会导致就的偏移信息不可用,所以建议监听整个库,由业务对不需要处理的表过滤;
  • 一些表存在过期索引,对于这种数据库自动过期的数据变更是否需要处理,业务也要自行处理。
  • 对于偏移的更新,除了业务关心的数据变更事件以外,其余的事件也需要及时的更新偏移信息,避免重启后读取的数据量过大。

3、mysql实现方案

mysql-binlog-connector-java库可以连接到MySQL服务器并订阅binlog事件,监听和解析MySQL的二进制日志(binlog)。

public void tst() throws InterruptedException, IOException {  
    // 连接mysql
    BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306,
            "root", "password");
    // 设置偏移信息
    client.setBinlogFilename("mysql-bin.062888");
    client.setBinlogPosition(6080);

    client.registerEventListener(event -> {
        System.out.println(JSON.toJSONString(event));
        EventData data = event.getData();
        if (data instanceof WriteRowsEventData) {
            WriteRowsEventData writeRowsEventData = (WriteRowsEventData) data;
            System.out.println("Insert operation: " + JSON.toJSONString(writeRowsEventData.getRows()));
            // TODO: 处理插入操作
        } else if (data instanceof UpdateRowsEventData) {
            UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;
            System.out.println("Update operation: " + JSON.toJSONString(updateRowsEventData.getRows()));

            // TODO: 处理更新操作
        } else if (data instanceof DeleteRowsEventData) {
            DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) data;
            System.out.println("Delete operation: " + JSON.toJSONString(deleteRowsEventData.getRows()));
            // TODO: 处理删除操作
        } else if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
            // 处理表信息
            TableMapEventData eventData = event.getData();
            System.out.println("Database name: " + eventData.getDatabase());
            System.out.println("Table name: " + eventData.getTable());
        }
        long binlogPosition = client.getBinlogPosition();
        System.out.println(client.getBinlogFilename());
        System.out.println(binlogPosition);
    });

    client.connect();

}

注意点

  • 对同一数据库监听时,尽量指定serverId,一个serverId同一时间只能有一个客户端监听;
  • 无法单独订阅某一个库、某一个表和某一个事件,需要在handle处理中自行过滤;
  • 单独的事件中不包含语句所执行的库和表信息,只有一个tableId,需要监听TABLE_MAP事件,缓存tableId和具体表的映射,在具体的执行语句中通过此映射找到具体的表信息;
  • mysql对binlog有定期清理策略,需要注意binlog的缓存时间,避免重启时无法找到对应的binlog文件;
  • 对于偏移的处理,除了业务关心的数据变更事件以外,其余的事件也需要及时的更新偏移信息,避免重启后读取的数据量过大。
  • 注意mysql用户权限CREATE USER 'userxxx'@'%' IDENTIFIED BY 'Qwe123!!!'; GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'userxxx' IDENTIFIED BY 'Qwe123!!!';
    FLUSH PRIVILEGES;

三、suishen-cdc核心实现

CdcDataDeque 事件本地缓存队列

这是一个线程安全的单向链表,提供的添加节点、查询头节点及重置头节点方法。

OffsetStorage 偏移量存储器

用户保存和读取数据库的偏移信息,默认提供了基于redis保存的RedisOffsetStorage存储器

DataProcessor 事件处理器

默认提供了基于suishen-queue消息队列的SuishenQueueDataProcessor实现。

当使用queue消息队列的SuishenQueueDataProcessor时,可以通过实现CdcDataOperator完成对具体的队列事件消费,默认提供了WeryaiCdcDataOperator向weryAi服务同步。

AbstractSynchronizer 数据库同步器

  • 继承Runnalbe接口,用于启动具体的监听任务;
  • 继承DisposableBean接口,用户停止监听任务;
  • 通过定时任务,指定时间内批量处理事件。
  • 当前提供了mongo同步的MongoSynchronizer同步器和mysql同步的MysqlSynchronizer同步器实现
public abstract class AbstractSynchronizer implements Runnable, DisposableBean {  
    // 本地缓存队列
    private final CdcDataDeque<CdcData> queue = new CdcDataDeque<>();
    // 偏移量存储器
    protected final OffsetStorage offsetStorage;
    // 事件处理器
    protected final DataProcessor dataProcessor;

    protected AbstractSynchronizer(OffsetStorage offsetStorage, DataProcessor dataProcessor) {
        this.offsetStorage = offsetStorage;
        this.dataProcessor = dataProcessor;
    }

    protected void handler(CdcData data) {
        log.info("cdc AbstractSynchronizer:{}", JSON.toJSONString(data));
        queue.put(data);
    }

    protected void handler(String offset) {
        if (StringUtils.isEmpty(offset)) {
            return;
        }
        log.info("cdc AbstractSynchronizer offset:{}", offset);
        queue.put(new CdcData().setOffset(offset));
    }

    /**
     * 定时任务,每5s消费一次
     */
    @PostConstruct
    public void consume() {
        ThreadPoolTaskScheduler scheduledThreadPoolExecutor = new ThreadPoolTaskScheduler();
        scheduledThreadPoolExecutor.setPoolSize(2);
        scheduledThreadPoolExecutor.initialize();

        scheduledThreadPoolExecutor.execute(this);
        scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
            CdcDataDeque.Node<CdcData> first = queue.get();
            if (first == null) {
                return;
            }
            String offset;
            List<CdcData> list = Lists.newArrayList(first.getItem());
            while (true) {
                CdcDataDeque.Node<CdcData> next = first.getNext();
                if (Objects.isNull(next)) {
                    break;
                }
                CdcData item = next.getItem();
                if (StringUtils.isNotEmpty(item.getId())) {
                    list.add(item);
                }
                offset = item.getOffset();
                // 限制一次性处理数据量
                if (list.size() >= 90) {
                    // 发送数据
                    dataProcessor.handle(list);
                    // 保存偏移
                    offsetStorage.setOffset(offset);
                    // 重置头节点
                    queue.resetFirst(next);
                    list = Lists.newArrayList();
                }
                first = next;
            }
            if (CollectionUtils.isNotEmpty(list)) {
                // 发送数据
                dataProcessor.handle(list);
            }
            // 保存偏移
            offsetStorage.setOffset(first.getItem().getOffset());
            // 重置头节点
            queue.resetFirst(first);
            // 释放对象
            list = null;
            first = null;
        }, 5000);
    }
}

作者介绍

  • 郑亚腾 资深服务端开发工程师

微鲤技术团队

微鲤技术团队承担了中华万年历、Maybe、蘑菇语音、微鲤游戏高达3亿用户的产品研发工作,并构建了完备的大数据平台、基础研发框架、基础运维设施。践行数据驱动理念,相信技术改变世界。