使用 Kafka 和 Docker 开发事件驱动的应用程序

随着微服务的兴起,事件驱动的架构变得越来越流行。Apache Kafka 是一个分布式事件流平台,通常是这些架构的核心。然而,为开发目的设置和部署自己的 Kafka 实例通常很棘手。幸运的是,Docker 和容器让这一切变得容易得多。

在本指南中,您将学习如何

  1. 使用 Docker 启动 Kafka 集群
  2. 将非容器化的应用连接到集群
  3. 将容器化的应用连接到集群
  4. 部署 Kafka-UI 以帮助进行故障排除和调试

先决条件

要学习本操作指南,需要满足以下先决条件

启动 Kafka

Kafka 3.3 开始,Kafka 的部署得到了极大的简化,这要归功于 KRaft(Kafka Raft),不再需要 Zookeeper。有了 KRaft,为本地开发设置 Kafka 实例变得容易得多。随着 Kafka 3.8 的发布,一个新的 kafka-native Docker 镜像现已可用,其启动速度显著加快,内存占用也更低。

提示

本指南将使用 apache/kafka 镜像,因为它包含了许多用于管理和操作 Kafka 的实用脚本。不过,您可能希望使用 apache/kafka-native 镜像,因为它启动更快,所需资源更少。

启动 Kafka

按照以下步骤启动一个基本的 Kafka 集群。本示例将启动一个集群,将端口 9092 暴露到主机上,以便本地运行的应用程序可以连接到它。

  1. 通过运行以下命令启动 Kafka 容器

    $ docker run -d --name=kafka -p 9092:9092 apache/kafka
    
  2. 镜像拉取完成后,您将在几秒钟内拥有一个运行中的 Kafka 实例。

  3. apache/kafka 镜像在 /opt/kafka/bin 目录中附带了几个有用的脚本。运行以下命令以验证集群是否已启动并运行,并获取其集群 ID

    $ docker exec -ti kafka /opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server :9092
    

    这样做会产生类似以下的输出

    Cluster ID: 5L6g3nShT-eMCtK--X86sw
  4. 通过运行以下命令创建一个示例主题并生产(或发布)几条消息

    $ docker exec -ti kafka /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server :9092 --topic demo
    

    运行后,您可以每行输入一条消息。例如,每行输入几条消息。一些示例可能是

    First message

    Second message

    enter 键发送最后一条消息,完成后按 ctrl+c。消息将被发布到 Kafka。

  5. 通过消费消息来确认消息已发布到集群中

    $ docker exec -ti kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server :9092 --topic demo --from-beginning
    

    然后您应该会在输出中看到您的消息

    First message
    Second message

    如果需要,您可以打开另一个终端并发布更多消息,并看到它们出现在消费者中。

    完成后,按 ctrl+c 停止消费消息。

您现在拥有一个本地运行的 Kafka 集群,并且已经验证了您可以连接到它。

从未容器化的应用连接到 Kafka

现在您已经展示了可以从命令行连接到 Kafka 实例,是时候从应用程序连接到集群了。在本例中,您将使用一个简单的 Node 项目,该项目使用 KafkaJS 库。

由于集群在本地运行并暴露在端口 9092,应用程序可以连接到 localhost:9092 上的集群(因为它目前是在本地而不是在容器中运行)。连接后,这个示例应用程序将记录它从 demo 主题消费的消息。此外,当它在开发模式下运行时,如果主题不存在,它也会创建该主题。

  1. 如果您没有运行上一步的 Kafka 集群,请运行以下命令启动一个 Kafka 实例

    $ docker run -d --name=kafka -p 9092:9092 apache/kafka
    
  2. GitHub 仓库克隆到本地。

    $ git clone https://github.com/dockersamples/kafka-development-node.git
    
  3. 进入项目目录。

    cd kafka-development-node/app
    
  4. 使用 yarn 安装依赖项。

    $ yarn install
    
  5. 使用 yarn dev 启动应用程序。这会将 NODE_ENV 环境变量设置为 development 并使用 nodemon 来监视文件更改。

    $ yarn dev
    
  6. 应用程序现在正在运行,它会将接收到的消息记录到控制台。在一个新的终端中,使用以下命令发布几条消息

    $ docker exec -ti kafka /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server :9092 --topic demo
    

    然后向集群发送一条消息

    Test message

    完成后,请记得按 ctrl+c 停止生产消息。

同时从容器和本地应用连接到 Kafka

现在您有了一个通过其暴露端口连接到 Kafka 的应用程序,是时候探索从另一个容器连接到 Kafka 需要进行哪些更改了。为此,您现在将从容器中而不是本地运行该应用程序。

但在开始之前,了解 Kafka 监听器的工作原理以及这些监听器如何帮助客户端连接是很重要的。

理解 Kafka 监听器

当客户端连接到 Kafka 集群时,它实际上连接到一个“broker”。虽然 broker 有很多角色,但其中之一是支持客户端的负载均衡。当客户端连接时,broker 会返回一组连接 URL,客户端应该使用这些 URL 来生产或消费消息。这些连接 URL 是如何配置的呢?

每个 Kafka 实例都有一组监听器和宣告监听器。“监听器”是 Kafka 绑定的地址,而“宣告监听器”则配置了客户端应该如何连接到集群。客户端收到的连接 URL 是基于客户端连接到哪个监听器。

定义监听器

为了更好地理解,我们来看看如何配置 Kafka 以支持两种连接方式

  1. 主机连接(通过主机的映射端口传入的连接) - 这些需要使用 localhost 连接
  2. Docker 连接(从 Docker 网络内部传入的连接) - 这些不能使用 localhost 连接,而应使用 Kafka 服务的网络别名(或 DNS 地址)

由于客户端需要通过两种不同的方式进行连接,因此需要两个不同的监听器 - HOSTDOCKERHOST 监听器会告诉客户端使用 localhost:9092 连接,而 DOCKER 监听器会通知客户端使用 kafka:9093 连接。请注意,这意味着 Kafka 同时在端口 9092 和 9093 上监听。但是,只有主机监听器需要暴露给主机。

Diagram showing the DOCKER and HOST listeners and how they are exposed to the host and Docker networks

为了进行此设置,Kafka 的 compose.yaml 文件需要一些额外的配置。一旦您开始覆盖某些默认设置,您还需要指定其他一些选项才能使 KRaft 模式工作。

services:
  kafka:
    image: apache/kafka-native
    ports:
      - "9092:9092"
    environment:
      # Configure listeners for both docker and host communication
      KAFKA_LISTENERS: CONTROLLER://:9091,HOST://0.0.0.0:9092,DOCKER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: HOST://:9092,DOCKER://kafka:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT

      # Settings required for KRaft mode
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9091

      # Listener to use for broker-to-broker communication
      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER

      # Required for a single node cluster
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

请按照以下步骤尝试一下。

  1. 如果您正在运行上一步的 Node 应用程序,请在终端中按 ctrl+c 将其停止。

  2. 如果您正在运行上一节的 Kafka 集群,请使用以下命令停止该容器

    $ docker rm -f kafka
    
  3. 在克隆的项目目录的根目录下运行以下命令来启动 Compose 堆栈

    $ docker compose up
    

    片刻之后,应用程序将启动并运行。

  4. 堆栈中还有另一个服务可以用来发布消息。通过访问 https://:3000 来打开它。当您输入消息并提交表单时,您应该会看到应用程序接收到消息的日志记录。

    这有助于演示容器化方法如何轻松地添加额外的服务来帮助测试和排查您的应用程序。

添加集群可视化工具

一旦您开始在开发环境中使用容器,您就会意识到添加仅专注于辅助开发的其他服务是多么容易,例如可视化工具和其他支持服务。既然您已经运行了 Kafka,那么可视化 Kafka 集群中发生的事情可能会很有帮助。为此,您可以运行 Kafbat UI Web 应用程序

要将其添加到您自己的项目中(它已经包含在演示应用程序中),您只需将以下配置添加到您的 Compose 文件中即可

services:
  kafka-ui:
    image: kafbat/kafka-ui:main
    ports:
      - 8080:8080
    environment:
      DYNAMIC_CONFIG_ENABLED: "true"
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
    depends_on:
      - kafka

然后,一旦 Compose 堆栈启动,您就可以在浏览器中打开 https://:8080 并四处浏览以查看有关集群的更多详细信息、检查消费者、发布测试消息等等。

使用 Kafka进行测试

如果您有兴趣了解如何轻松地将 Kafka 集成到您的集成测试中,请查看使用 Testcontainers 测试 Spring Boot Kafka 监听器指南。本指南将教您如何使用 Testcontainers 来管理测试中 Kafka 容器的生命周期。

结论

通过使用 Docker,您可以简化使用 Kafka 开发和测试事件驱动应用程序的过程。容器简化了设置和部署开发所需各种服务的过程。一旦它们在 Compose 中定义,团队中的每个人都可以从其易用性中受益。

如果您之前错过了,所有的示例应用程序代码都可以在 dockersamples/kafka-development-node 中找到。

© . This site is unofficial and not affiliated with Kubernetes or Docker Inc.