0%

Kafka初试

一、简介

Apache Kafka起源于LinkedIn,后来于2011年成为开源Apache项目,然后于2012年成为First-class Apache项目。Kafka是用Scala和Java编写的。 Apache Kafka是基于发布订阅的容错消息系统。 它是快速,可扩展和设计分布。


二、安装Kafka

  1. 安装Java
  2. 安装Zookeeper
    • 启动命令: zkServer.sh start
    • 停止命令: zkServer.sh stop
  3. 安装Kafka
    • 启动命令: kafka-server-start.sh config/server.properties
    • 启动命令: kafka-server-stop.sh config/server.properties

三、代码实践

添加Maven依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>

生产者客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ProducerFastStart {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", brokerList);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka!");
try{
producer.send(record);
}catch (Exception e){
e.printStackTrace();
}
producer.close();
}
}

消费者客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ConsumerFastStart {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("bootstrap.servers", brokerList);
properties.put("group.id", groupId);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord<String, String> record : records){
System.out.println(record.value());
}
}
}
}