微鲤大数据集群FLINK版本升级踩坑总结

1、背景

微鲤大数据Flink集群依托于华为云于2020年建设,Flink版本为1.10.0。Flink集群主要负责埋点日志实时处理、业务指标实时计算、实时ETL、智能推荐、广告竞价等工作。现有Flink Job 超百个,slot 1800+,日均处理数据超十亿,日均200GB左右数据。

在Flink使用过程中,我们遇到了以下问题,也正是这些问题驱使我们推进Flink版本升级的工作。

  1. Flink SQL支持度不完善,DDL语句支持度不完善。
  2. 反压时checkpoint对齐失败,持续反压造成恶性循环。
  3. Flink connector的支持度不够,需要自己实现一些连接器。
  4. Flink CDC生态的发展,我们希望借助Flink CDC来取代原有canal+kafka的复杂技术架构,并且Flink CDC能够支持更多数据库种类。借助Flink CDC的能力来升级我们的数据集成架构。
  5. 维表join情况下的正确性及性能问题。
  6. Flink on hive的能力欠缺,不能很好的在Flink中支持hive。
  7. 官方目前发布的Flink 最新release版本为1.16,版本落后较多,需要保持技术先进性。

经综合评估,决定将Flink的版本由Flink 1.10.0升级至Flink1.14.6。

2、升级计划

基于当前Flink集群的现状及重要性,我们对Flink版本的升级制定如下步骤。(由于涉及一些敏感信息,故只列出了基本步骤,细节不再赘述)

  1. 升级可行性评估
  2. 测试环境验证
    • Flink 1.14.6部署,相关能力验证。
    • Flink CDC 生态集成,功能验证。
    • Flink 压测
    • ......
  3. 确认升级失败回滚方案,Flink job异常监控方案,并实施落地。
  4. Flink job梳理,业务影响确认,优先级重要程度确认。
  5. 生产环境Flink1.14.6集群部署
  6. Flink job升级,代码修改。
  7. Flink job灰度迁移。
  8. 持续观察,问题处理。

3、踩坑总结

3.1 Flink创建yarn-session失败

在$FLINK_HOME/bin目录下,使用yarn-session命令创建一个新的yarn-session。

./yarn-session.sh -nm test -d

报错如下

Error: A JNI error has occurred, please check your installation and try again  
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException  
        at java.lang.Class.getDeclaredMethods0(Native Method)
        at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
        at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
        at java.lang.Class.getMethod0(Class.java:3018)
        at java.lang.Class.getMethod(Class.java:1784)
        at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
        at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException  
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 7 more

原因是缺少Hadoop的环境信息,需要在在/etc/profile中添加如下配置:

export HADOOP_CLASSPATH=`hadoop classpath`  

然后执行source /etc/profile,使环境变量生效即可。

使用local模型启动Flink集群,进行测试工作时,报错如下:

[root@node1 bin]# sh start-cluster.sh 
/data/flink/flink-1.15.2/bin/config.sh:行32: 未预期的符号 `<' 附近有语法错误
/data/flink/flink-1.15.2/bin/config.sh:行32: `    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z)'

因为config中的语法在sh中不支持,但是在bash中支持。所以直接使用./start-cluster.sh来启动即可。

关于bash和sh的区别:

你可以简单把sh理解成bash的子集,sh中的所有语法在bash中都支持,但是bash中语法不一定在sh中支持。并且sh在遇到某行代码错误时,不会继续向下解释执行,而bash即使遇到错误,也会继续向下执行。

3.3 使用hive方言执行DDL操作失败

此问题主要发生在使用CDH或HDP发行版本的Hadoop集群中。

从[Flink-hive-connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/hive/overview/ )下载对应版本的hive连接器。将下载后的包放到$FLINK_HOME/lib目录下,启动sql-client。

然后使用如下命令创建一个hive table时报错

SET execution.checkpointing.interval = 3s;  
CREATE TABLE hive_table (  
  user_id STRING,
  order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

报错如下

从sql-client中抛出的异常信息有限,我们可以到$FLINK_HOME/log目录下查看flink-sql-client-xxx.log中的具体报错信息。报错如下:

Unrecognized Hadoop major version number: 3.0.0-cdh6.3.1。

此报错的原因为Flink默认使用的hive connector不能识别此hive的cdh版本号,所以需要我们自己对flink-sql-connector-hive pom.xml中的hive版本进行修改,修改你所使用的cdh-hive版本,然后重新执行打包编译。将打好的包替换到$FLINK_HOME/lib下,重启sql-client即可。

另外可能会遇到类似如下缺少类或方法的一些错误:

可以根据实际报错,将$HIVEHOME/lib目录下的 hive-common.jar hive-exec.jar等jar包放到$FLINKHOME/lib目录下,重启相关yarn-session后,再次提交即可。

3.4 NoClassDefFoundError或NosuchMethodError

这个报错是最常见的,缺少相关的类或方法。此报错基本是两个原因:

  1. 少包
  2. 版本冲突

可以使用如下命令检查lib目录下是否存在此方法或类,如果没有则找到此类所在jar包,并进行引入。

ls *.jar | while read jarfile; do  
 echo "$jarfile"
 jar -tf $jarfile | grep "xxx(你缺少的类或方法名)"
done  

如果在lib目录下找到此方法,则判断是否为冲突,如果是冲突的话按照实际情况解决冲突即可。

3.5 zk node限额满导致无法创建新的yarn-session

在没有任何改动的情况下,突然无法创建新的yarn-session,即使kill掉一些yarn-session也无法创建新的。并且从日志上没有相关的报错信息,只有一个zk的空指针异常如下:

怀疑是zk的服务异常,但是检查了zk的服务也正常,只是有一个node限额的告警,使用zk客户端检查node的限额使用情况。

#查看限额配置情况
bin/zkCli.sh -server xxx:2181 listquota /flink  
或者在进入客户端后,使用如下命令查看
get /zookeeper/quota/flink/zookeeper_limits

#查看当前限额使用情况,发现限额已满1000
get /zookeeper/quota/flink/zookeeper_stats  

发现zk的flink node限额已满1000,无法继续创建新的node。于是手动删除一些历史无效node,释放配额。

deleteall /flink/application_1642060369182_4949589  

重新提交创建yarn-session命令,成功。

事后措施

1、增加zk 的flink node 限额

2、禁止直接使用yarn application -kill命令停止session。

实测发现直接kill application的方式zk下的相关node 不会删除, 通过stop的方式,zk node会自动删除释放,进行相关的清理工作。

ster has been started in detached mode. In order to stop Flink gracefully, use the following command:  
$ echo "stop" | ./bin/yarn-session.sh -id application_1642060369182_4960459
If this should not be possible, then you can also kill Flink via YARN's web interface or via:  
$ yarn application -kill application_1642060369182_4960459
Note that killing Flink might not clean up all job artifacts and temporary files.  

3.6 Direct buffer memory oom

在yarn-session模式下,提交flink job成功,但是任务执行很快就失败终止了,具体报错信息如下

2023-02-03 19:19:02  
java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...  
    at java.nio.Bits.reserveMemory(Bits.java:695)
    at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
    at sun.nio.ch.IOUtil.read(IOUtil.java:195)
    at .....

从报错中可以明确看到,直接内存发生oom了,可能是你的job需要更大的直接内存或者直接内存存在内存泄漏情况。

Flink的直接内存由以下三部分组成

  1. 框架堆外内存(Framework Off-heap Memory) :用于 Flink 框架的堆外内存(直接内存或本地内存)。
  2. 任务堆外内存(Task Off-heap Memory) :用于 Flink 应用的算子及用户代码的堆外内存(直接内存或本地内存)。
  3. 网络内存(Network Memory):用于任务之间数据传输的直接内存(例如网络传输缓冲)

根据报错,修改$FLINK_HOME/conf/flink-conf.yaml配置文件,适当增大框架堆外内存和任务堆外内存大小。

taskmanager.memory.framework.off-heap.size: 256m  
taskmanager.memory.task.off-heap.size: 128m  

3.7 netty.exception.RemoteTransportException错误

flink job在yarn-session模式下,任务运行频繁抛出netty.exception.RemoteTransportException异常,具体报错信息如下:

2023-02-07 10:31:38  
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'xxx:46219'. This might indicate that the remote task manager was lost.  
    at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:136)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)。。。。。

taskmanager进程频繁被kill,跟踪jobmanager中的日志可以看到如下报错信息

2023-02-07 10:31:37,801 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker container_e04_1642060369182_5707916_01_000077 is terminated. Diagnostics: Container container_e04_1642060369182_5707916_01_000077 marked as failed.  
 Exit code:-104.
 Diagnostics:[2023-02-07 10:31:37.085]Container [pid=4167,containerID=container_e04_1642060369182_5707916_01_000077] is running 12828672B beyond the 'PHYSICAL' memory limit. Current usage: 8.0 GB of 8 GB physical memory used; 10.2 GB of 40 GB virtual memory used. Killing container.
Dump of the process-tree for container_e04_1642060369182_5707916_01_000077 :  
    |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
    |- 4264 4167 4167 4167 (java) 468494 51912 10696712192 2099533 /opt/Bigdata/common/runtime0/jdk1.8.0_242//bin/java -Xmx3597035049 -Xms3597035049 -XX:MaxDirectMemorySize=880468305 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/srv/BigData/data2/nm/containerlogs/application_1642060369182_5707916/container_e04_1642060369182_5707916_01_000077/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.network.min=746250577b -D taskmanager.cpu.cores=6.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=858993472b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=746250577b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=2985002310b -D taskmanager.memory.task.heap.size=3462817321b -D taskmanager.numberOfTaskSlots=6 -D taskmanager.memory.jvm-overhead.max=858993472b --configDir . -Djobmanager.rpc.address=node-group-1tewe.df108301-a014-448f-a6df-68e3f3bea0fe.com -Dweb.port=0 -Djobmanager.memory.off-heap.size=134217728b -Dweb.tmpdir=/tmp/flink-web-76b43dd8-0ceb-44d8-b3fb-0fcbacd8b90b -Djobmanager.rpc.port=42559 -Drest.address=node-group-1tEWe.df108301-a014-448f-a6df-68e3f3bea0fe.com -Djobmanager.memory.jvm-overhead.max=429496736b -Djobmanager.memory.jvm-overhead.min=429496736b -Dtaskmanager.resource-id=container_e04_1642060369182_5707916_01_000077 -Dinternal.taskmanager.resource-id.metadata=node-group-105h0.df108301-a014-448f-a6df-68e3f3bea0fe.com:8041 -Djobmanager.memory.jvm-metaspace.size=268435456b -Djobmanager.memory.heap.size=3462817376b 
    |- 4167 4165 4167 4167 (bash) 0 2 219017216 751 /bin/bash -c /opt/Bigdata/common/runtime0/jdk1.8.0_242//bin/java -Xmx3597035049 -Xms3597035049 -XX:MaxDirectMemorySize=880468305 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/srv/BigData/data2/nm/containerlogs/application_1642060369182_5707916/container_e04_1642060369182_5707916_01_000077/taskmanager.log -Dlog4j.configuration=file:./log4j.properties -Dlog4j.configurationFile=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner -D taskmanager.memory.network.min=746250577b -D taskmanager.cpu.cores=6.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=858993472b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=746250577b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=2985002310b -D taskmanager.memory.task.heap.size=3462817321b -D taskmanager.numberOfTaskSlots=6 -D taskmanager.memory.jvm-overhead.max=858993472b --configDir . -Djobmanager.rpc.address='node-group-1tewe.df108301-a014-448f-a6df-68e3f3bea0fe.com' -Dweb.port='0' -Djobmanager.memory.off-heap.size='134217728b' -Dweb.tmpdir='/tmp/flink-web-76b43dd8-0ceb-44d8-b3fb-0fcbacd8b90b' -Djobmanager.rpc.port='42559' -Drest.address='node-group-1tEWe.df108301-a014-448f-a6df-68e3f3bea0fe.com' -Djobmanager.memory.jvm-overhead.max='429496736b' -Djobmanager.memory.jvm-overhead.min='429496736b' -Dtaskmanager.resource-id='container_e04_1642060369182_5707916_01_000077' -Dinternal.taskmanager.resource-id.metadata='node-group-105h0.df108301-a014-448f-a6df-68e3f3bea0fe.com:8041' -Djobmanager.memory.jvm-metaspace.size='268435456b' -Djobmanager.memory.heap.size='3462817376b' 1> /srv/BigData/data2/nm/containerlogs/application_1642060369182_5707916/container_e04_1642060369182_5707916_01_000077/taskmanager.out 2> /srv/BigData/data2/nm/containerlogs/application_1642060369182_5707916/container_e04_1642060369182_5707916_01_000077/taskmanager.err 

[2023-02-07 10:31:37.108]Container killed on request. Exit code is 143
[2023-02-07 10:31:37.172]Container exited with a non-zero exit code 143. 

可以看到TaskManager的Container 内存使用超出限制,被kill了。

根据flink 的内存模型,我们有两种方式,第一种方式直接增大TaskManager的总内存,第二种方式就是减少其它堆内存使用,把更多的内存分配到TaskManager的任务堆内存上。

增大TaskManager的总内存可以通过修改$FLINK_HOME/conf/flink-conf.yaml配置文件,增大如下配置或者在创建yarn-session时指定更大的tm 值

# 默认1728m
taskmanager.memory.process.size:4096m  

减少其它堆内存使用,可以通过flink web-ui查看TaskManager的内存分配情况,看哪一块堆内存使用较小,然后适当调小其值。

我这里调整的是托管内存(Managed memory)

可以通过以下两种范式指定托管内存的大小:

当同时指定二者时,会优先采用指定的大小(Size)。 若二者均未指定,会根据默认占比进行计算,默认占比为0.4,可以适当减少占比分配。

3.8 各类方法过期或在高版本被移除

在flink社区自身版本升级过程中,存在大量的方法过期或者在高版本被移除。以下是一些示例:

  1. rocksDBStateBackend.enableTtlCompactionFilter()方法找不到。这个方法在高版本移除了,默认值就是开启状态。参阅
  2. split及select方法在高版本被删除 建议使用侧输出流进行替换
  3. checkpoint.enableExternalizedCheckpoints()方法过期,替换为checkpointConf.setExternalizedCheckpointCleanup()

flink 在版本迭代过程中的改动,可参阅具体版本的Flink发行说明,了解原因和使用建议。

4、注意事项

1、Flink在自身的版本迭代过程中,存在某些类或方法被标记为过期或删除,因此可能会涉及到你的代码修改,需要多加注意。

2、另外在高版本的Flink中,配置文件中的一些参数也被移除,具体可以参阅Flink配置说明。相关参数配置也需要注意,如果配置不正确或失效的参数,可能会造成job失败或性能不佳。

3、将你的job从低版本迁移至高版本的flink集群时,请提前做好回滚方案,并且必须使用savepoint的方式进行任务迁移。

4、回滚方案、异常监控方案必须前置完成,避免在出现问题的情况不能快速回滚,出现异常的情况下不能及时发现,影响业务造成事故。

5、runbook非常重要,提前准备好runbook,按照runbook顺序执行。在迁移过程中,精神高度紧绷的状态下,再进行代码修改,命令编辑,出现问题的概率会直线飙升。

6、选择业务低峰期操作,并通知相关业务方知晓操作时间和可能的影响,协同关注业务的健康度。

5、目前升级后的收益

  1. 借助于Flink CDC的生态能力,我们对现有数据集成架构进行了升级,构建了业务数据实时同步链路。大幅简化了数据集成架构,提高数据同步时效。

  2. 借助于Flink unaligned checkpoints,缓解了反压情况下的checkpoints失败问题,我们也在持续探索优化中。

  3. Flink SQL更简洁,更丰富,我们会在后续的开发中将Flink SQL作为主力,提高开发效率。

  4. Flink on hive的能力增强,DML、DDL语句支持度达到96%,可以丝滑的将hive SQL迁移至Flink执行,使Flink流批一体的技术演进有了可能。

6、参考资料

Flink官方文档

Flink版本发行说明

Flink版本升级指导

有赞实时计算 Flink 1.13 升级实践

Flink 1.10 升级预期收益评估

作者介绍

  • 作者介绍:冯成杨 资深大数据开发工程师

微鲤技术团队

微鲤技术团队承担了中华万年历、Maybe、蘑菇语音、微鲤游戏高达3亿用户的产品研发工作,并构建了完备的大数据平台、基础研发框架、基础运维设施。践行数据驱动理念,相信技术改变世界。