Kafka 消息框架入门三连:分布式部署

Published: 2020-02-17

Tags: Kafka

本文总阅读量

测试环境

编号 服务器IP 服务
1 192.168.50.252 Zookeeper,Kafka
2 192.168.50.176 Zookeeper,Kafka
3 192.168.50.24 Zookeeper,Kafka

添加防火墙端口

在每个服务器上开启 90922181端口。

firewall-cmd --add-port=9092/tcp --permanent
firewall-cmd --add-port=2181/tcp --permanent
firewall-cmd --reload

部署 Zookeeper

Kafka 是由 zookeeper 进行管理的,每个服务器都部署一个 Zookeeper。

docker run -d --restart="always" \
    --name zookeeper \
    -p 2181:2181 \
    -v /etc/localtime:/etc/localtime \
    wurstmeister/zookeeper

部署 Kafka

不同的参数为 KAFKA_BROKER_IDKAFKA_ADVERTISED_LISTENERS,前一个为就集群的编号,之后的环境变量是把 Kafka 的地址给 Zookeeper,用来告知生产者消费者使用的地址,如果不设置则使用 KAFKA_LISTENERS

服务器_01(192.168.50.252)

docker run -d --restart=always \
    --name kafka \
    -p 9092:9092 \
    -e KAFKA_BROKER_ID=0 \
    -e KAFKA_ZOOKEEPER_CONNECT=192.168.50.252:2181,192.168.50.176:2181,192.168.50.24:2181 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.50.252:9092 \
    -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
    -v /opt/kafka:/kafka \
    -v /opt/etc/localtime:/etc/localtime \
    wurstmeister/kafka

服务器_02( 192.168.50.176)

docker run -d --restart=always \
    --name kafka \
    -p 9092:9092 \
    -e KAFKA_BROKER_ID=1 \
    -e KAFKA_ZOOKEEPER_CONNECT=192.168.50.252:2181,192.168.50.176:2181,192.168.50.24:2181 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.50.176:9092 \
    -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
    -v /opt/kafka:/kafka \
    -v /opt/etc/localtime:/etc/localtime \
    wurstmeister/kafka

服务器_03( 192.168.50.24)

docker run -d --restart=always \
    --name kafka \
    -p 9092:9092 \
    -e KAFKA_BROKER_ID=2 \
    -e KAFKA_ZOOKEEPER_CONNECT=192.168.50.252:2181,192.168.50.176:2181,192.168.50.24:2181 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.50.24:9092 \
    -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
    -v /opt/kafka:/kafka \
    -v /opt/etc/localtime:/etc/localtime \
    wurstmeister/kafka

创建主题

$KAFKA_HOME/bin/kafka-topics.sh --create \
--zookeeper 192.168.50.252:2181,192.168.50.176:2182,192.168.50.24:2182 \
--replication-factor 3 \
--partitions 1 \
--topic chat

或者

$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server 192.168.50.252:9092 --create --replication-factor 3 --partitions 1 --topic chat

--replication-factor 参数用于指定副本数,副本数需要小于等于集群节点数。

--partitions 参数用于指定分区数,分区数量决定消费群内最多支持多少用户同时读取消息。

如果创建一个副本数为2,分区数为3的主题(主题名:message),发送一些测试数据,是这样的:

服务器 文件夹名称
服务器_01 message-0,message-1
服务器_02 message-0,message-2
服务器_03 message-1,message-2

同理,如果创建一个副本数为3,分区数为3的主题(主题名:chat),发送一些测试数据,是这样的:

服务器 文件夹名称
服务器_01 chat-0,chat-1,chat-2
服务器_02 chat-0,chat-1,chat-2
服务器_03 chat-0,chat-1,chat-2

副本数即为每个分区的备份数量,分区数则代表这个主题的数据被存储在几个分区中。

测试脚本

测试脚本使用 Node.js 搭配 kafkajs 模块。

生产者,每秒钟发送一条测试数据到服务器。

// producer.js

const { Kafka } = require('kafkajs')
const moment = require('moment')

const sleep = () => new Promise((res, rej) => setTimeout(res, 1000));

function getRandomInt(min, max) {
    min = Math.ceil(min);
    max = Math.floor(max);
    return Math.floor(Math.random() * (max - min)) + min;
}

const kafka = new Kafka({
  clientId: 'aha-js-producer',
  brokers: ['127.0.0.1:9092']
})

;(async function () {
  const producer = kafka.producer()
  await producer.connect()
  while (true) {
    let msg_content = {
      id: '000001',
      data: getRandomInt(2800, 3600).toString(),
      time: moment().format('YYYY-MM-DD hh:mm:ss').toString()
    }
    await producer.send({
      topic: 'test',
      messages: [
        {
          value: JSON.stringify(msg_content)
        },
      ],
    })
    console.log(moment().format('YYYY-MM-DD hh:mm:ss'))
    await sleep();
  }

  await producer.disconnect()
})();

消费者,监听 test 主题,实时接收消息。

// consumer.js

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'aha-js-server',
  brokers: ['127.0.0.1:9092']
})

;(async function () {
  const consumer = kafka.consumer({ groupId: 'test-group' })

  await consumer.connect()
  await consumer.subscribe({ topic: 'test', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        value: message.value.toString(),
      })
    },
  })
})();

参考

  1. Kafka集群部署指南