一、背景
企业生产环境中,会出现大量依赖中间状态的实时任务,目前flink的状态存储有Memory、FileSystem 和 RocksDB 三种可选,且 RocksDB 是状态数据量较大(GB 到 TB 级别)时的唯一选择。RocksDB 的性能发挥非常仰赖调优,如果全部采用默认配置,读写性能有可能会很差。但是,RocksDB 的配置也是极为复杂的,可调整的参数多达百个,没有放之四海而皆准的优化方案。如果仅考虑 Flink 状态存储这一方面,我们仍然可以总结出一些相对普适的优化思路。本文先介绍一些基础知识,再列举方法。
二、状态简述
Flink的状态分为三种:
-
MemoryStateBackend:默认的方式,即基于JVM的堆内存进行存储,主要适用于本地开发和调试;
-
FsStateBackend:基于文件系统进行存储,可以是本地文件系统,也可以是HDFS等分布式文件系统。需要注意,虽然选择使用 FsStateBackend,但是正在进行的数据仍然存储在 TaskManager的内存中,只有在 checkpoint 时,才会将状态快照写入到指定文件系统上;
-
RocksDBStateBackend:Flink内置的第三方状态管理器,采用嵌入式的 key-value 型数据库RocksDB 来存储正在进行的数据。等到 checkpoint 时,再讲其中的数据持久化到指定的文件系统中,所以采用 RocksDBStateBackend 时,也需要配置持久化存储的文件系统。之所以这样做,是因为 RocksDB 作为嵌入式数据库安全性比较低,但是比起全文件系统的方式,其读取速度更快,比起全内存的方式,其存储空间更大,因此是一种比较均衡的方案。
在状态比较大的情况下,推荐使用 RocksDB,除了上面提到的特点,可以更方便地进行优化。下面的介绍和调优,主要以 RocksDB 状态为例。
三、RocksDB 大状态调优
3.1 开启State访问性能监控
Flink1.13中引入了 State 访问的性能监控,即 latency tracking state。此功能不局限于 State Backend 的类型,自定义实现的 State Backend 也可以复用次功能。
当然,State 访问的性能监控会产生一定的性能影响,所以,默认每 100 次做一次取样(sample),对不同的 State Backend 性能损失影响不同: 对于 RocksDB State Backend,性能损失大概在 1%左右 对于 Heap State Backend,性能损失最多可达 10%
开启监控的方式,可以在指令中加入
state.backend.latency-track.keyed-state-enabled: true #启用访问状态的性能监控 state.backend.latency-track.sample-interval: 100 #采样间隔 state.backend.latency-track.history-size: 128 #保留的采样数据个数,越大越精确 state.backend.latency-track.state-name-as-viriable: true #将状态名作为变量
正常情况下,开启第一个参数即可。
3.2 开启增量检查点和本地恢复
3.2.1 开启增量检查点
RocksDB 是目前唯一可用于支持有状态流处理应用程序增量检查点的状态后端,可以修改参数开启增量检查点:
state.backend.incremental: true #默认false,改为true 或代码中指定 new EmbeddedRocksDBStateBackend(true)
增量检查点,表示在 checkpoint 时,只备份和上个检查点相比,发生变化的检查点。在状态比较大的情况下,是否开启增量检查点,对性能的影响会非常大。比如,在程序运行很长时间之后,总的状态量达到1G,每次变化的状态只有100M甚至更低,那么在不开启增量备份的情况下,每次备份都要全量备份,也就是1G的状态量;如果开启了增量备份,每次只需要备份100M甚至更低;两者相比,增量备份检查点,可以大大节省备份的时间。
在项目的实际使用过程中,曾经经理过一次大的性能问题,在没有开启增量备份检查点的情况下,每次备份需要消耗几十秒的时间,这对于实时计算来说,简直是个灾难;在使用 RocksDBStateBackend,并开启增量备份检查点之后,每次备份只需要几秒甚至几十毫秒就可以完成,大大节省了状态备份的时间。
3.2.2 开启本地恢复
当Flink任务失败时,可以基于本地的状态信息进行恢复任务,可能不需要从 hdfs 拉取数据。本地恢复目前仅涵盖监控类型的状态后端(RocksDB),MemoryStateBackend 不支持本地恢复并忽略此选项。
开启指令
state.backend.local-recovery: true
3.3 调整预定义选项
Flink 为 RocksDB 提供了一些预定义的选项集合,比如 DEFAULT、SPINNING_DISK_OPTIMIZED、SPINING_DISK_OPTIMIZED_HIGH_MEM 或 FLASH_SSD_OPTIMIZED。
DEFAULT:啥都不配; SPINNING\_DISK\_OPTIMIZED:基于磁盘的优化; SPINING\_DISK\_OPTIMIZED\_HIGH\_MEM:基于磁盘和内存的优化; FLASH\_SSD\_OPTIMIZED:基于固态硬盘的优化;
一般使用 SPINING_DISK_OPTIMIZED_HIGH_MEM 即可,如果条件充足,可以指定 FLASH_SSD_OPTIMIZED。
3.3.1 配置方式
1)代码指定
EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true); embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM); env.setStateBackend(embeddedRocksDBStateBackend);
2)启动指令指定
-Dstate.backend.rocksdb.predefined-options: SPINING_DISK_OPTIMIZED_HIGH_MEM #机械硬盘+内存
3.4 其他高阶配置
3.4.1 增大block缓存
整个 RocksDB 共享一个 block cache(对应上图的Read Only Block Cache,最近读取的数据会放到 block cache 中),读取数据时内存的 cache 大小,直接影响数据读取效率;读取数据时,优先从内存读取,读取不到时,再从磁盘加载,所以,内存 cache 越大,缓存命中率越高。默认大小Wie 8MB,建议设置到 64~256MB,根据自身资源而定。
state.backend.rocksdb.block.cache-size: 64m #默认 8m
3.4.2 增大 write buffer 和 level 阈值大小
RocksDB 中,每个 State 使用一个 Column Family,每个 Column Family 使用单独的 write buffer,默认是64MB,建议调大。
调整 write buffer 时,通常要适当增加 L1 层的大小阈值 max-size-level-base,默认是 256 MB。该值太小,会导致能存档的 SST 文件过少,层级变多造成查找困难,需要更多层索引,才能命中需要的文件;值太大,造成文件过大,合并困难。建议设置为 target_file_size_base(默认64MB)的倍数,且不能太小,建议 5~10 倍,即 320~640MB。
state.backend.rocksdb.writebuffer.size: 128m state.backend.rocksdb.compaction.level.max-size-level-base: 320m
3.4.3 增大 write buffer 数量
每个 Column Family 对应的 write buffer 最大数量,实际上是内存中“ReadOnly MemTable(只读内存表)”的最大数量,默认值是2。对于机械磁盘来说,如果内存足够大,可以调大到 5 左右,人多力量大!
state.backend.rocksdb.writebuffer.count: 5
3.4.4 增大后台线程数和 write buffer 合并数
1)增大后台线程数
用于后台 flush 和合并 sst 文件的线程数,默认为 1,建议调大,机械硬盘用户可以改为 4 等更大的值,人多力量大!
state.backend.rocksdb.thread.num: 4
2)增大 write buffer 最小合并数
将数据从 write buffer 中 flush 到磁盘时,需要合并的 write buffer 最小数量,默认值为 1,可以调到 3,减小合并次数。
state.backend.rocksdb.writebuffer.number-to-merge: 3
3.4.5 开启分区索引功能
Flink 1.13 中对 RocksDB 增加了分区索引功能,复用了 RocksDB 的 partitioned Index & filter 功能,简单来说就是对 RocksDB 的partitioned Index 做了多级索引。也就是将内存中的最上层常驻,下层根据需要再 load 回来,这样就大大降低了数据 Swap 竞争。线上测试中,相对于内存较小的场景中,性能提升10倍左右。如果在内存管控下 RocksDB 性能不如预期的话,这个也能作为一个性能优化点。
state.backend.rocksdb.memory.partitioned-index-filters:true #默认 false
3.5 参数设定案例
yarn-session模式启动参数设置 nohup ./bin/yarn-session.sh -nm flink14-pv-event-nginx-parse -s 6 -tm 15360 -jm 2048 -D taskmanager.memory.size=0m -D taskmanager.memory.off-heap.enabled=true -D taskmanager.memory.jvm-overhead.min=2048m -D taskmanager.memory.jvm-overhead.max=3072m -D state.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED -D state.backend.rocksdb.writebuffer.size=256m -D state.backend.rocksdb.writebuffer.count=5 -D state.backend.rocksdb.compaction.level.max-size-level-base=320m -D state.backend.rocksdb.writebuffer.number-to-merge=3 -D state.backend.rocksdb.memory.partitioned-index-filters=true -d &>/dev/null 2>&1 &
四、Checkpoint设置
Checkpoint 时间间隔,需要根据业务场景对时效性的要求而定。如果时效性要求不高,可以设置到分钟级别,比如5分钟、10分钟;如果对时效性要求很高,结合 flink 控制页面 Checkpoints 的Summary 中的 End to End Duration,通过最大值、最小值和平均值,合理设置时间间隔。注意,时间间隔需要比 End to End Duration 的时间要长,否则,可能会导致上一个 checkpoint 没结束,下一个 checkpoint 已经开始。为了避免这一情况的发生,除了设置时间间隔,两次 checkpoint 的最小时间间隔也可以起到作用,该配置决定在上一次checkpoint 结束之后,至少等待多长时间开始下一次的 checkpoint。
具体配置,可以参考下面的代码:
// 使⽤ RocksDBStateBackend 做为状态后端,并开启增量 Checkpoint RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("hdfs://hadoop01:8020/flink/checkpoints", true); env.setStateBackend(rocksDBStateBackend); // 开启 Checkpoint,间隔为 1 分钟 env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1)); // 配置 Checkpoint CheckpointConfig checkpointConf = env.getCheckpointConfig(); checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 最小间隔 2 分钟 checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(2)) // 超时时间 10 分钟 checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10)); // 保存 checkpoint checkpointConf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
五、调优实践总结
在flink1.10版本及以后,由于 TaskManager 内存模型重构,RocksDB 内存默认成为了堆外托管内存的一部分,可以免去一些手动调整的麻烦。如果性能仍然不佳,需要干预,则必须将 state.backend.rocksdb.memory.managed 参数设为 false 来禁用 RocksDB 内存托管,然后根据具体任务状态来判断需要调整那些参数。
下面是生产环境实践RocksDB状态调优效果对比,flink版本1.14
第一种默认使用RocksDB 内存托管方式启动任务
启动yarn session nohup ./bin/yarn-session.sh -nm flink14-pv-event-nginx-parse2 -s 6 -tm 15360 -jm 2048 -D state.backend.rocksdb.memory.managed=true -d &>/dev/null 2>&1 & 启动flink任务 nohup ./bin/flink run -c suishen.bigdata.pv.event.PvEventNginxLogParseAppRepair -p 1 -yid application_1642060369182_7487586 -d module-event-etl.jar &>/dev/null 2>&1 &
第二种禁用 RocksDB 内存托管,调整部分RocksDB参数启动任务
启动yarn session nohup ./bin/yarn-session.sh -nm flink14-pv-event-nginx-parse -s 6 -tm 15360 -jm 2048 -D taskmanager.memory.size=0m -D taskmanager.memory.off-heap.enabled=true -D taskmanager.memory.jvm-overhead.min=2048m -D taskmanager.memory.jvm-overhead.max=3072m -D state.backend.rocksdb.memory.managed=false -D state.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED -D state.backend.rocksdb.writebuffer.size=256m -D state.backend.rocksdb.block.cache-size=512m -D state.backend.rocksdb.writebuffer.count=5 -D state.backend.rocksdb.compaction.level.max-size-level-base=64m -D state.backend.rocksdb.log.level=ERROR_LEVEL -D state.backend.rocksdb.writebuffer.number-to-merge=3 -D state.backend.rocksdb.options.max-manifest-file-size=32m -D state.backend.rocksdb.memory.partitioned-index-filters=true -d &>/dev/null 2>&1 & 启动flink任务 nohup ./bin/flink run -c suishen.bigdata.pv.event.PvEventNginxLogParseApp -p 1 -yid application_1642060369182_6869813 -d module-event-etl.jar &>/dev/null 2>&1 &
可以看到两种方式启动的任务状态大小区别还是比较明显的,对于大状态任务,设置合适的 RocksDB参数可以确保flink任务有良好的内存使用,从而既不会在容器(Docker/Kubernetes, Yarn等)环境中由于内存超用被杀掉,也不会因为内存利用率过低导致不必要的数据落盘或是缓存命中率下降,致使性能下降。
RocksDB调优总结:
-
对于一般使用状态的任务(状态小于GB)来说,直接使用默认设置即可(默认是使用RocksDB 内存托管方式管理状态)。
-
对于内存比较大的任务可以禁用 RocksDB 内存托管,手动根据任务运行状况调节合适参数。这里只是举例部分RocksDB内存参数设置,还有很多参数需要根据官方文档和业务状况结合判断。
-
参数调优只是一部分,最主要的调优方式还是减少状态大小,如设置状态过期时间、状态清理策略等等。
参考资料
作者介绍
- 徐伟奇 大数据开发工程师