Kafka常见问题

一、自定义Kafka开发镜像

目的

  1. 能快速启动
  2. 包含必要的功能

Dockerfile

1
2
3
4
5
6
7
8
9
FROM java:8-jre
MAINTAINER Louz
RUN curl -s http://apache.fayea.com/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz | tar -xz -C /usr/local/
RUN cd /usr/local && ln -s kafka_2.11-0.10.0.1 kafka
WORKDIR /usr/local/kafka
CMD bin/zookeeper-server-start.sh config/zookeeper.properties &\
bin/kafka-server-start.sh config/server.properties

执行构建命令

1
docker build -f dockerfile -t louz/kafka0100 .

启动容器

1
2
# 将容器名指定为kafka,容器的hostname指定为kafka(方便后续程序访问)
docker run -d --name kafka -h kafka -p 9092:9092 louz/kafka0100

二、访问kakfa

创建topic

1
docker exec kafka bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

启动consumer

1
docker exec kafka bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

启动producer发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ProducerDemo {
private static KafkaProducer<String, String> procuder;
public static void main(String[] args) {
Producer<String, String> producer = getKafkaProducer();
String topic = "test";
for(int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "NO." + Integer.toString(i));
producer.send(record);
}
producer.close();
}
public static Producer<String, String> getKafkaProducer() {
if (procuder == null) {
Properties props = new Properties();
// 此处的hostname对应docker run中的-h参数,并且需要在hosts文件中做好ip映射
props.put("bootstrap.servers", "kafka:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
procuder = new KafkaProducer<>(props);
}
return procuder;
}
}

可以在consumer的控制台看到类似输出:

1
2
3
4
5
6
7
$ docker exec -it kafka bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
NOOO.0
NOOO.1
NOOO.2
NOOO.3
NOOO.4
......

三、关于consumer

参考文档:【原创】探讨kafka的分区数与多线程消费
Kafka中,每个partition在一个consumer group中最多只有一个consumer线程能消费