通过 KoP 将 Kafka 应用迁移到 Pulsar
版权声明:原文出自 streamnative/kop ,由 Redisant 进行整理和翻译
什么是 KoP
KoP(Pulsar on Kafka)通过在 Pulsar Broker 上引入 Kafka 协议处理程序,为 Apache Pulsar 带来原生 Apache Kafka 协议支持。 通过将 KoP 协议处理程序添加到您现有的 Pulsar 集群,您可以将现有的 Kafka 应用程序和服务迁移到 Pulsar,而无需修改代码。 这使 Kafka 应用程序能够利用 Pulsar 的强大功能,例如:
- 通过企业级多租户简化运营
- 使用rebalance-free架构简化操作
- 使用 Apache BookKeeper 分层存储
- 使用 Pulsar Functions 进行Serverless事件处理
KoP 作为 Pulsar 协议处理插件,在 Pulsar broker 启动时加载。 它通过在 Apache Pulsar 上提供原生 Kafka 协议支持,帮助减少人们采用 Pulsar 实现业务的障碍。
通过整合两个流行的事件流生态系统,KoP 解锁了新的用例。 您可以利用每个生态系统的优势,使用 Apache Pulsar 构建一个真正统一的事件流平台,以加速实时应用程序和服务的开发。
KoP 利用 Pulsar 已有的组件(例如主题发现、分布式日志库 - ManagedLedger、游标等)在 Pulsar 上实现了 Kafka wire 协议。
下图说明了 KoP 是如何在 Pulsar 中实现的:
安装 KoP
如果您有 Apache Pulsar 集群,则可以通过直接下载 KoP 协议处理程序并将其安装到 Pulsar Broker,在现有 Pulsar 集群上启用 Kafka-on-Pulsar。 它需要三个步骤:
- 下载 KoP 协议处理程序,然后将其复制到您的 Pulsar
protocols
目录。 - 在 Pulsar
broker.conf
或standalone.conf
文件中设置 KoP 协议处理程序的配置。 - 重启 Pulsar broker 以加载 KoP 协议处理程序。
然后你可以启动你的Broker并使用 KoP。 以下是每个步骤的详细说明。
下载 KoP 协议处理程序
您可以在这里直接下载 KoP 协议处理程序。
chen_ubuntu@LAPTOP-IH0640SI:~/start_pulsar/apache-pulsar-2.10.3/protocols$ ls
pulsar-protocol-handler-kafka-2.10.3.3.nar
配置 KoP
将 .nar
文件复制到 Pulsar protocols
目录后,您需要通过在 Pulsar 配置文件 broker.conf
或 standalone.conf
中添加配置来配置 Pulsar broker 以插件形式运行 KoP 协议处理程序。
-
在
broker.conf
或standalone.conf
文件中设置 KoP 协议处理程序的配置。messagingProtocols=kafka protocolHandlerDirectory=./protocols allowAutoTopicCreationType=partitioned narExtractionDirectory=./unpacked
属性名 默认值 建议值 messagingProtocols kafka protocolHandlerDirectory ./protocols Location of KoP NAR file allowAutoTopicCreationType non-partitioned partitioned narExtractionDirectory /tmp/pulsar-nar Location of unpacked KoP NAR file 默认情况下,
allowAutoTopicCreationType
设置为未分区。 由于主题在 Kafka 中默认是分区的,因此最好避免为 Kafka 客户端创建非分区主题,除非 Kafka 客户端需要与现有的非分区主题进行交互。默认情况下,
/tmp/pulsar-nar
目录位于/tmp
目录下。 如果我们将 KoP NAR 文件解包到/tmp
目录,一些类可能会被系统自动删除,这将产生一个ClassNotFoundException
或NoClassDefFoundError
错误。 因此,建议将narExtractionDirectory
选项设置为其他路径。 -
设置 Kafka listeners
# Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0 kafkaListeners=PLAINTEXT://127.0.0.1:9092 # This config is not required unless you want to expose another address to the Kafka client. # If it’s not configured, it will be the same with `kafkaListeners` config by default kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092
kafkaListeners
是一个以逗号分隔的侦听器列表以及 Kafka 绑定以进行侦听的主机/IP 和端口。kafkaAdvertisedListeners
是一个以逗号分隔的侦听器列表及其主机/IP 和端口。 -
如下设置偏移量管理,因为 KoP 的偏移量管理取决于 “Broker Entry Metadata”。 KoP 2.8.0 或更高版本需要它。
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
-
禁止删除非活动主题。 这不是必需的,但在 KoP 中非常重要。 目前,Pulsar 会删除分区主题的非活动分区,而不会删除分区主题的元数据。 在这种情况下,KoP 无法创建丢失的分区。
brokerDeleteInactiveTopicsEnabled=false
-
启动 Pulsar
./bin/pulsar-daemon standalone
测试 KoP
-
使用 Kafka Assistant 连接到 KoP
-
创建主题并发送一些消息
-
使用 Pulsar Assistant 连接到 Pulsar Broker 并接收消息