包阅导读总结
1. 关键词:Flink、TDMQ for Apache Pulsar、离线场景、部署、实践
2. 总结:本文主要介绍了 Flink 基于 TDMQ for Apache Pulsar 在离线场景的使用实践,包括 Flink 的部署、两个 Demo(Topic 复制和单词计数)的实现,还总结了 Flink Connector 用法及注意事项。
3. 主要内容:
– 背景
– 部署 Flink
– 配置 Flink 集群 JobManager 和 TaskManager 环境信息
– 部署 JobManager 和 TaskManager
– Demo:Topic 复制
– 代码逻辑
– 创建流入和流出 Topic
– 提交任务与发送消息验证
– Demo:单词计数
– 代码逻辑
– 序列化方式和 Batch Send 模式
– Flink Connector 用法总结
– 注意事项
思维导图:
文章地址:https://mp.weixin.qq.com/s/inuxvRa5Vxo6dmcTyIWX5Q
文章来源:mp.weixin.qq.com
作者:腾讯云开发者
发布时间:2024/7/2 0:05
语言:中文
总字数:3979字
预计阅读时间:16分钟
评分:90分
标签:Flink,Apache Pulsar,TDMQ,数据流处理,Docker 部署
以下为原文内容
本内容来源于用户推荐转载,旨在分享知识与观点,如有侵权请联系删除 联系邮箱 media@ilingban.com
👉目录
1背景
2部署 Flink
3Demo:Topic 复制
4Demo:单词计数
5Flink Connector 用法总结
6 注意事项
$FLINK_PROPERTIES=$'\njobmanager.rpc.address:jobmanager\ntaskmanager.memory.task.off-heap.size:1gb\ntaskmanager.memory.process.size:4gb'
$ docker network create flink-network
$dockerrun\
--rm \
--name=jobmanager \
--network flink-network \
--publish 8081:8081 \
--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
flink:1.17.2-scala_2.12jobmanager
$ docker run \
--rm \
--name=taskmanager \
--network flink-network \
--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
flink:1.17.2-scala_2.12 taskmanager
在本地编译打包 Pulsar 任务后,还需要使用 Flink Cli 提交本地任务到 Flink Docker 集群,从下方网址下载与当前 Docker 版本一致的 Flink 二进制文件并且解压到本地。
https://flink.apache.org/downloads/
参考 Flink Pulsar Connector 社区文档和 Oceanus 相关文档,Demo 使用 1.17 版本 Flink SDK 将命名空间的一个 Topic 消息全部复制到另一个 Topic 中,Demo 主要展示 Flink Connector 的基础用法,没有使用自定义序列化器及反序列化器,而是使用的是 Connector 内置的 String 序列化器。
https://cloud.tencent.com/document/product/849/85885#pulsar-source-.E5.92.8C-sink-.E7.A4.BA.E4.BE.8B
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#apache-pulsar-connector
publicstaticvoidmain(String[]args)throwsException{
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
if (parameterTool.getNumberOfParameters() < 2) {
System.err.println("Missing parameters!");
return;
}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(60000);
env.getConfig().setGlobalJobParameters(parameterTool);
String brokerServiceUrl = parameterTool.getRequired("broker-service-url");
String inputTopic = parameterTool.getRequired("input-topic");
String outputTopic = parameterTool.getRequired("output-topic");
String subscriptionName = parameterTool.get("subscription-name", "testDuplicate");
String token = parameterTool.getRequired("token");
PulsarSource<String> source = PulsarSource.builder()
.setServiceUrl(brokerServiceUrl)
.setStartCursor(StartCursor.latest())
.setTopics(inputTopic)
.setDeserializationSchema(new SimpleStringSchema())
.setSubscriptionName(subscriptionName)
.setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
PulsarSink<String> sink = PulsarSink.builder()
.setServiceUrl(brokerServiceUrl)
.setTopics(outputTopic)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
.setConfig(PulsarSinkOptions.PULSAR_BATCHING_ENABLED, false)
.setSerializationSchema(new SimpleStringSchema())
.build();
stream.sinkTo(sink);
env.execute("Pulsar Streaming Message Duplication");
}
/usr/local/services/flink/flink-1.17.2
Job has been submitted with JobID c1bdab89c01ef16e00579bd2c6648859
/usr/local/services/pulsar/apache-pulsar-2.9.5/bin/pulsar-client\
--url http://pulsar-xxxxxx \
--auth-params token:eyJrZXlJZCI6InB1bHNhci1nOGFrajRlb3c4ejgiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItZzhha2o0ZW93OHo4X2Rldi10ZG1xLW5pbmphemhvdS0xNzEzODU2OTI3LWRldmNsb3VkIn0.O89TuTl0PMca7tLN9aGvHvqt7QZ9yMrJh1z3VOz7EVc \
--auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
produce \
-m "i am the bone of my sword" \
-n 5 \
pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaDuplicationInput1
单词计数作为 Flink 中最常见的 Demo,能够比较好的阐述 Flink 的流处理思想。此 Demo 参考 StreamNative 的 Demo,使用 1.17 Flink SDK,将 Pulsar Topic 作为源和目标资源,统计源 Topic 消息中每个时间窗口各个单词出现的次数,并且将结果投递到目标 Topic 中。
https://github.com/streamnative/examples/blob/master/pulsar-flink/README.md
整体 Demo 项目文件见下方链接
pulsar-flink-example.zip
file:////tencent/api/attachments/s3/url?attachmentid=20260421
核心逻辑见下方代码,首先使用 ParameterTool 工具解析命令行中传入的参数,之后使用 Flink 内置的反序列化器解析消息体为字符串,在数据处理部分使用系统时间窗口统计时间窗内流入的消息,并且对于每个出现的单词汇聚生成 WordCount 对象,最后使用自定义的序列化器,将 WordCount 对象序列化为 Json 字节数组,投递到目标 Topic 中。
目前 TDMQ Pulsar Connector 支持 Pulsar Schema、Flink Schema 以及自定义序列化器三种方法将 Java 对象序列化为 Pulsar Sink 的字节数组消息体。推荐代码使用自定义序列化器的方式序列化定义的 WordCount 对象。
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#serializer
还需要注意默认 Sink 配置是开启 Batch Send 模式的,在控制台消息查询时,Batch Message 只会查询到 Batch 中的第一条消息,不利于对照消息数量,Demo 中关闭了 Batch Send 功能。
public class PulsarStreamingWordCount {
private static final Logger LOG = LoggerFactory.getLogger(PulsarStreamingWordCount.class);
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
if (parameterTool.getNumberOfParameters() < 2) {
System.err.println("Missing parameters!");
return;
}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(60000);
env.getConfig().setGlobalJobParameters(parameterTool);
String brokerServiceUrl = parameterTool.getRequired("broker-service-url");
String inputTopic = parameterTool.getRequired("input-topic");
String outputTopic = parameterTool.getRequired("output-topic");
String subscriptionName = parameterTool.get("subscription-name", "WordCountTest");
String token = parameterTool.getRequired("token");
int timeWindowSecond = parameterTool.getInt("time-window", 60);
PulsarSource<String> source = PulsarSource.builder()
.setServiceUrl(brokerServiceUrl)
.setStartCursor(StartCursor.latest())
.setTopics(inputTopic)
.setDeserializationSchema(new SimpleStringSchema())
.setSubscriptionName(subscriptionName)
.setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
DataStream<WordCount> wc = stream
.flatMap((FlatMapFunction<String, WordCount>) (line, collector) -> {
LOG.info("current line = {}, word list = {}", line, line.split("\\s"));
for (String word : line.split("\\s")) {
collector.collect(new WordCount(word, 1, null));
}
})
.returns(WordCount.class)
.keyBy(WordCount::getWord)
.window(TumblingProcessingTimeWindows.of(Time.seconds(timeWindowSecond)))
.reduce((ReduceFunction<WordCount>) (c1, c2) -> {
WordCount reducedWordCount = new WordCount(c1.getWord(), c1.getCount() + c2.getCount(), null);
LOG.info("previous [{}] [{}], current wordCount {}", c1, c2, reducedWordCount);
return reducedWordCount;
});
PulsarSink<WordCount> sink = PulsarSink.builder()
.setServiceUrl(brokerServiceUrl)
.setTopics(outputTopic)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
.setConfig(PulsarSinkOptions.PULSAR_BATCHING_ENABLED, false)
.setSerializationSchema(new PulsarSerializationSchema<WordCount>() {
private ObjectMapper objectMapper;
@Override
public void open(
SerializationSchema.InitializationContext initializationContext,
PulsarSinkContext sinkContext,
SinkConfiguration sinkConfiguration)
throws Exception {
objectMapper = new ObjectMapper();
}
@Override
public PulsarMessage<?> serialize(WordCount wordCount, PulsarSinkContext sinkContext) {
byte[] wordCountBytes;
wordCount.setSinkDateTime(LocalDateTime.now().toString());
try {
wordCountBytes = objectMapper.writeValueAsBytes(wordCount);
} catch (Exception exception) {
wordCountBytes = exception.getMessage().getBytes();
}
return PulsarMessage.builder(wordCountBytes).build();
}
})
.build();
wc.sinkTo(sink);
env.execute("Pulsar Streaming WordCount");
}
}
/usr/local/services/flink/flink-1.17.2
Job has been submitted with JobID 6f608d95506f96c3eac012386f840655
/usr/local/services/pulsar/apache-pulsar-2.9.5/bin/pulsar-client\
--url http://pulsar-xxx \
--auth-params token:eyJrZXlJZCI6InB1bHNhci1nOGFrajRlb3c4ejgiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItZzhha2o0ZW93OHo4X2Rldi10ZG1xLW5pbmphemhvdS0xNzEzODU2OTI3LWRldmNsb3VkIn0.O89TuTl0PMca7tLN9aGvHvqt7QZ9yMrJh1z3VOz7EVc \
--auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
produce \
-m "i am the bone of my sword" \
-n 5 \
pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaWordCountInput1
/usr/local/services/pulsar/apache-pulsar-2.9.5/bin/pulsar-client \
--url http://pulsar-g8akj4eow8z8.sap-8ywks40k.tdmq.ap-gz.qcloud.tencenttdmq.com:8080 \
--auth-params token:eyJrZXlJZCI6InB1bHNhci1nOGFrajRlb3c4ejgiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItZzhha2o0ZW93OHo4X2Rldi10ZG1xLW5pbmphemhvdS0xNzEzODU2OTI3LWRldmNsb3VkIn0.O89TuTl0PMca7tLN9aGvHvqt7QZ9yMrJh1z3VOz7EVc \
--auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
produce \
-m "test1" \
-n 3 \
pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaWordCountInput1
目前 Flink 插件生产和消费经过调研,在不进行管控改造以及非标操作的情况下,能满足基本的 TDMQ Pulsar 版使用需求。截至现在 Apache Flink 已经发布 1.20 版本,目前推荐使用 Apache Flink 1.15-1.17 对应 Pulsar Connector,不推荐使用 1.15 以下版本,1.18 及以上版本可以参考 1.17 版本使用。
下面介绍 1.15 和 1.17 版本 Pulsar Flink Connector 主要配置。Flink 版本对应的 Flink Connector 依赖可以在 Pulsar Connector Dependencies 处获取。
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-pulsar</artifactId>
<version>4.1.0-1.17</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.2</version>
</dependency>
PulsarSource<String>source=PulsarSource.builder()
.setServiceUrl(brokerServiceUrl)
.setStartCursor(StartCursor.latest())
.setTopics(inputTopic)
.setDeserializationSchema(new SimpleStringSchema())
.setSubscriptionName(subscriptionName)
.setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
.build();
PulsarSink<String> sink = PulsarSink.builder()
.setServiceUrl(brokerServiceUrl)
.setTopics(outputTopic)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)
.setSerializationSchema(new SimpleStringSchema())
.build();
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.15.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-pulsar</artifactId>
<version>1.15.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.15.4</version>
</dependency>
PulsarSource<String>source=PulsarSource.builder()
.setServiceUrl(brokerServiceUrl)
.setAdminUrl(brokerServiceUrl)
.setStartCursor(StartCursor.latest())
.setTopics(inputTopic)
.setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
.setSubscriptionName(subscriptionName)
.setSubscriptionType(SubscriptionType.Exclusive)
.setConfig(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME, "org.apache.pulsar.client.impl.auth.AuthenticationToken")
.setConfig(PulsarOptions.PULSAR_AUTH_PARAMS, token)
.setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true)
.build();
PulsarSink<String> sink = PulsarSink.builder()
.setServiceUrl(brokerServiceUrl)
.setAdminUrl(brokerServiceUrl)
.setTopics(outputTopic)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setConfig(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME, "org.apache.pulsar.client.impl.auth.AuthenticationToken")
.setConfig(PulsarOptions.PULSAR_AUTH_PARAMS, token)
.setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
.build();
-
由于 Connector Pulsar 会使用到堆外内存,并且默认任务的堆外内存为 0,因此执行 Pulsar Job 需要显式声明堆外内存大小 taskmanager.memory.task.off-heap.size,例如 1gb。
-
SetSerializationSchema 反序列化提供了两种已经实现的方法,一种是使用 Pulsar 内置 Schema,另一种是使用 Flink 的 Schema。但这两种方法都会造成业务代码与 Schema 耦合。目前建议实现 PulsarSerializationSchema接口,主要需要实现 Serialize(IN Element, PulsarSinkContext SinkContext) 方法,将传入的 IN 对象自定义序列化为 Byte 数组。
-
目前 Sink 默认开启 Enable_batch 批量投递模式,会将消息打包后投递。如果想要关闭批量投递功能,可以配置 SetConfig(PulsarSinkOptions.PULSAR_BATCHING_ENABLED, False)。
-
Flink 时间窗口支持两种 时间获取方式 ,一种直接使用任务的系统时间 ProcessTime,另一种是事件自带时间 EventTime。但目前 Source 只支持解析消息 Payload 中的内容,将 Payload 中的内容解析成 Pulsar Schema 对象或者自定义的 Class 对象,而无法解析 Message 中 Properties 中的其他属性,例如 消息上传时间 Publish_Time。如果需要解析 Message 中的 Properties,根据文档 需要在继承类中 实现 PulsarDeserializationSchema.getProducedType() 方法。这个方法实现较为繁琐,需要声明每个反序列化后的属性,因此目前建议直接使用 ProcessTime 作为时间窗口时间。
-
1.16 及以下版本 Flink Source 的 SetSubscriptionType 方法还保留了 Shared 和 Key_Shared 订阅模式,这两种订阅模式依赖事务 Ack 消息,并且只有当任务 Checkpoint 更新时才会统一提交事务和 Ack。但由于目前 TDMQ Pulsar 没有开放事务功能,因此当前不能同时配置 SetSubscriptionType(SubscriptionType.Shared) 和 SetConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, False) 参数。
-
Oceanus 内置 Pulsar Connector 是基于 StreamNative 版本,适配 Flink 1.13-1.14 版本的 Connector,这两个版本较老,与新版本存在较多 API 不兼容,如果使用 Oceanus 内置版本 Pulsar Connector 与高版本 Flink,可能需要较多代码改造。
📢📢欢迎加入腾讯云开发者社群,享前沿资讯、大咖干货,找兴趣搭子,交同城好友,更有鹅厂招聘机会、限量周边好礼等你来~
(长按图片立即扫码)