使用docker搭建Kafka集群
@ sasaba | 星期四,四月 8 日,2021 年 | 2 分钟阅读 | 更新于 星期四,四月 8 日,2021 年

怎么用docker搭建kafka集群,用于测试。

工作中需要用到一个kafka集群做开发调试任务,因此记录下搭建配置文件。

未校验

配置

docker-compose.yaml

version: '2'
services:
    zookeeper:
        image: wurstmeister/zookeeper
        container_name: zookeeper
        restart: always
        ports:
          - "2181:2181"
          
    kafka1:
        image: wurstmeister/kafka
        container_name: kafka1
        restart: always
        ports:
          - "9092:9092"
        environment:
          - KAFKA_BROKER_ID=0
          - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
          - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{宿主机IP}:9092
          - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092

    kafka2:
        image: wurstmeister/kafka
        container_name: kafka2
        restart: always
        ports:
          - "9093:9093"
        environment:
          - KAFKA_BROKER_ID=1
          - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
          - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{宿主机IP}:9093
          - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093

    kafka3:
        image: wurstmeister/kafka
        container_name: kafka3
        restart: always
        ports:
          - "9094:9094"
        environment:
          - KAFKA_BROKER_ID=2
          - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
          - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{宿主机IP}:9094
          - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094

使用

附上go的测试脚本,基于Shopify/sarama。

生产者

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          //赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。
	config.Producer.Partitioner = sarama.NewRandomPartitioner //写到随机分区中,默认设置8个分区
	config.Producer.Return.Successes = true
	msg := &sarama.ProducerMessage{}
	msg.Topic = `nginx_log`
	msg.Value = sarama.StringEncoder("this is a good test")
	client, err := sarama.NewSyncProducer([]string{"10.1.1.81:9092"}, config)
	if err != nil {
		fmt.Println("producer close err, ", err)
		return
	}
	defer client.Close()
	pid, offset, err := client.SendMessage(msg)

	if err != nil {
		fmt.Println("send message failed, ", err)
		return
	}
	fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)
}

消费者

package main

import (
	"fmt"
	"github.com/Shopify/sarama"

)

func main() {
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Version = sarama.V0_11_0_2

	// consumer
	consumer, err := sarama.NewConsumer([]string{"10.1.1.81:9092"}, config)
	if err != nil {
		fmt.Printf("consumer_test create consumer error %s\n", err.Error())
		return
	}

	defer consumer.Close()

	partition_consumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
	if err != nil {
		fmt.Printf("try create partition_consumer error %s\n", err.Error())
		return
	}
	defer partition_consumer.Close()

	for {
		select {
		case msg := <-partition_consumer.Messages():
			fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s\n",
				msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))

		case err := <-partition_consumer.Errors():
			fmt.Printf("err :%s\n", err.Error())
		}
	}

}

有校验

目前认知还处于比较浅薄的阶段,所以先简单附上配置。 > docker-compose.yaml

version: '2'
services:
    zookeeper:
        image: wurstmeister/zookeeper
        container_name: zookeeper
        restart: always
        ports:
          - "2181:2181"
        environment:
          - ZOOKEEPER_CLIENT_PORT=2181
          - ZOOKEEPER_TICK_TIME=2000
          - ZOOKEEPER_MAXCLIENTCNXNS=0
          - ZOOKEEPER_AUTHPROVIDER.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
          - ZOOKEEPER_REQUIRECLIENTAUTHSCHEME=sasl
          - ZOOKEEPER_JAASLOGINRENEW=3600000
          - KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/secrets/zk_server_jaas.conf
        volumes:
          - ./zk_server_jaas.conf:/etc/kafka/secrets/zk_server_jaas.conf
          
    kafka1:
        image: wurstmeister/kafka
        container_name: kafka1
        restart: always
        depends_on:
          - zookeeper
        ports:
             - "9092:9092"
        volumes:
          - ./server_jaas.conf:/etc/kafka/secrets/kafka_server_jaas.conf
        environment:
          - KAFKA_BROKER_ID=1
          - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
          - KAFKA_ADVERTISED_LISTENERS=SASL_PLAINTEXT://192.168.50.102:9092
          - KAFKA_LISTENERS=SASL_PLAINTEXT://0.0.0.0:9092
          - KAFKA_SECURITY_INTER_BROKER_PROTOCOL=SASL_PLAINTEXT
          - KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
          - KAFKA_SASL_ENABLED_MECHANISMS=PLAIN
          - KAFKA_AUTHORIZER_CLASS_NAME=kafka.security.auth.SimpleAclAuthorizer
          - KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf
          # 超级用户对应KafkaServer里面的用户可以多个User:geting;User:alice
          - KAFKA_SUPER_USERS=User:geting

server_jaas.conf

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    # 用于访问其他节点
    username="geting"
    password="geting"
    #表示创建一个新用户名alice密码alice
    user_geting="geting"
    user_alice="alice"; 
};

# 用于连接到zookeeper
KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="geting"
    password="geting";
};

zk_server_jaas.conf

zookeeper {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="geting"
    password="geting";
};

有校验 SHA256

需要在zookeeper中注册

kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin

docker-compose.yaml

  - KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=SCRAM-SHA-256
  - KAFKA_SASL_ENABLED_MECHANISMS=SCRAM-SHA-256
  - KAFKA_SUPER_USERS=User:admin

server_jaas.conf

KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin-secret"
    user_geting="geting";
};

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin";
};

zk_server_jaas.conf

zookeeper {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin";
};