一、需求背景
需要将业务数据库的数据,向数仓同步,目前包括两种数据库:mongo、mysql。
二、总体实现方案
1、总体流程
- a、定时任务,加载数据库事件偏移信息,统一监听数据库变更事件;
- b、本地缓存收集事件变更信息(一定的数据量、一定的收集时间);
- c、收集达到阈值后,向消息队列发送消息;
- d、发送消息成功后,使用redis记录最后一条消息的偏移信息;
- e、消息队列批量读取事件,批量向数仓同步数据。
2、mongo实现方案
mongo-java-driver库,提供了Change Streams API来监听和获取实时变更(change)事件。通过Change Streams,可以监视集合中的插入、更新和删除等操作,并对这些变更事件做出响应。
@Test
public void tst() {
// 获取指定数据库连接
MongoDatabase database = mongoTemplate.getDb().getMongoClient().getDatabase("database");
// 过滤需要监听的表
Document matchStage = new Document("$match", new Document("ns.coll", new Document("$in",
Arrays.asList("colletion", "label"))));
// 开启监听
ChangeStreamIterable<Document> changeStream = database
.watch(Arrays.asList(matchStage)).fullDocument(FullDocument.UPDATE_LOOKUP)
// 设置事件偏移信息
.resumeAfter(BsonDocument.parse("{\"_data\": \"8265AB99800000000129295A10048EBDB1DB4C23440CAD9BD906E9098378463C5F6964003C3134313334000004\"}"));
for (ChangeStreamDocument<Document> document : changeStream) {
// 根据操作类型进行相应的操作
if (document.getOperationType() == OperationType.INSERT) {
// 处理插入操作
System.out.println(JSON.toJSONString(document.getFullDocument()));
} else if (document.getOperationType() == OperationType.UPDATE) {
// 处理更新操作
System.out.println(JSON.toJSONString(document.getFullDocument()));
} else if (document.getOperationType() == OperationType.DELETE) {
// 处理删除操作
System.out.println(JSON.toJSONString(document.getDocumentKey()));
}
// 获取偏移信息
BsonDocument resumeToken = document.getResumeToken();
System.out.println(resumeToken.toJson());
}
}
注意点
- mongo的changeStream支持订阅指定某些表,但是如果后续要新增监听的表,会导致就的偏移信息不可用,所以建议监听整个库,由业务对不需要处理的表过滤;
- 一些表存在过期索引,对于这种数据库自动过期的数据变更是否需要处理,业务也要自行处理。
- 对于偏移的更新,除了业务关心的数据变更事件以外,其余的事件也需要及时的更新偏移信息,避免重启后读取的数据量过大。
3、mysql实现方案
mysql-binlog-connector-java库可以连接到MySQL服务器并订阅binlog事件,监听和解析MySQL的二进制日志(binlog)。
public void tst() throws InterruptedException, IOException {
// 连接mysql
BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306,
"root", "password");
// 设置偏移信息
client.setBinlogFilename("mysql-bin.062888");
client.setBinlogPosition(6080);
client.registerEventListener(event -> {
System.out.println(JSON.toJSONString(event));
EventData data = event.getData();
if (data instanceof WriteRowsEventData) {
WriteRowsEventData writeRowsEventData = (WriteRowsEventData) data;
System.out.println("Insert operation: " + JSON.toJSONString(writeRowsEventData.getRows()));
// TODO: 处理插入操作
} else if (data instanceof UpdateRowsEventData) {
UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;
System.out.println("Update operation: " + JSON.toJSONString(updateRowsEventData.getRows()));
// TODO: 处理更新操作
} else if (data instanceof DeleteRowsEventData) {
DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) data;
System.out.println("Delete operation: " + JSON.toJSONString(deleteRowsEventData.getRows()));
// TODO: 处理删除操作
} else if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
// 处理表信息
TableMapEventData eventData = event.getData();
System.out.println("Database name: " + eventData.getDatabase());
System.out.println("Table name: " + eventData.getTable());
}
long binlogPosition = client.getBinlogPosition();
System.out.println(client.getBinlogFilename());
System.out.println(binlogPosition);
});
client.connect();
}
注意点
- 对同一数据库监听时,尽量指定serverId,一个serverId同一时间只能有一个客户端监听;
- 无法单独订阅某一个库、某一个表和某一个事件,需要在handle处理中自行过滤;
- 单独的事件中不包含语句所执行的库和表信息,只有一个tableId,需要监听TABLE_MAP事件,缓存tableId和具体表的映射,在具体的执行语句中通过此映射找到具体的表信息;
- mysql对binlog有定期清理策略,需要注意binlog的缓存时间,避免重启时无法找到对应的binlog文件;
- 对于偏移的处理,除了业务关心的数据变更事件以外,其余的事件也需要及时的更新偏移信息,避免重启后读取的数据量过大。
- 注意mysql用户权限CREATE USER 'userxxx'@'%' IDENTIFIED BY 'Qwe123!!!';
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'userxxx' IDENTIFIED BY 'Qwe123!!!';
FLUSH PRIVILEGES;
三、suishen-cdc核心实现
CdcDataDeque 事件本地缓存队列
这是一个线程安全的单向链表,提供的添加节点、查询头节点及重置头节点方法。
OffsetStorage 偏移量存储器
用户保存和读取数据库的偏移信息,默认提供了基于redis保存的RedisOffsetStorage存储器
DataProcessor 事件处理器
默认提供了基于suishen-queue消息队列的SuishenQueueDataProcessor实现。
当使用queue消息队列的SuishenQueueDataProcessor时,可以通过实现CdcDataOperator完成对具体的队列事件消费,默认提供了WeryaiCdcDataOperator向weryAi服务同步。
AbstractSynchronizer 数据库同步器
- 继承Runnalbe接口,用于启动具体的监听任务;
- 继承DisposableBean接口,用户停止监听任务;
- 通过定时任务,指定时间内批量处理事件。
- 当前提供了mongo同步的MongoSynchronizer同步器和mysql同步的MysqlSynchronizer同步器实现
public abstract class AbstractSynchronizer implements Runnable, DisposableBean {
// 本地缓存队列
private final CdcDataDeque<CdcData> queue = new CdcDataDeque<>();
// 偏移量存储器
protected final OffsetStorage offsetStorage;
// 事件处理器
protected final DataProcessor dataProcessor;
protected AbstractSynchronizer(OffsetStorage offsetStorage, DataProcessor dataProcessor) {
this.offsetStorage = offsetStorage;
this.dataProcessor = dataProcessor;
}
protected void handler(CdcData data) {
log.info("cdc AbstractSynchronizer:{}", JSON.toJSONString(data));
queue.put(data);
}
protected void handler(String offset) {
if (StringUtils.isEmpty(offset)) {
return;
}
log.info("cdc AbstractSynchronizer offset:{}", offset);
queue.put(new CdcData().setOffset(offset));
}
/**
* 定时任务,每5s消费一次
*/
@PostConstruct
public void consume() {
ThreadPoolTaskScheduler scheduledThreadPoolExecutor = new ThreadPoolTaskScheduler();
scheduledThreadPoolExecutor.setPoolSize(2);
scheduledThreadPoolExecutor.initialize();
scheduledThreadPoolExecutor.execute(this);
scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
CdcDataDeque.Node<CdcData> first = queue.get();
if (first == null) {
return;
}
String offset;
List<CdcData> list = Lists.newArrayList(first.getItem());
while (true) {
CdcDataDeque.Node<CdcData> next = first.getNext();
if (Objects.isNull(next)) {
break;
}
CdcData item = next.getItem();
if (StringUtils.isNotEmpty(item.getId())) {
list.add(item);
}
offset = item.getOffset();
// 限制一次性处理数据量
if (list.size() >= 90) {
// 发送数据
dataProcessor.handle(list);
// 保存偏移
offsetStorage.setOffset(offset);
// 重置头节点
queue.resetFirst(next);
list = Lists.newArrayList();
}
first = next;
}
if (CollectionUtils.isNotEmpty(list)) {
// 发送数据
dataProcessor.handle(list);
}
// 保存偏移
offsetStorage.setOffset(first.getItem().getOffset());
// 重置头节点
queue.resetFirst(first);
// 释放对象
list = null;
first = null;
}, 5000);
}
}
作者介绍
- 郑亚腾 资深服务端开发工程师