一、简介
Apache Kafka起源于LinkedIn,后来于2011年成为开源Apache项目,然后于2012年成为First-class Apache项目。Kafka是用Scala和Java编写的。 Apache Kafka是基于发布订阅的容错消息系统。 它是快速,可扩展和设计分布。
二、安装Kafka
- 安装Java
- 安装Zookeeper
- 启动命令:
zkServer.sh start
- 停止命令:
zkServer.sh stop
- 启动命令:
- 安装Kafka
- 启动命令:
kafka-server-start.sh config/server.properties
- 启动命令:
kafka-server-stop.sh config/server.properties
- 启动命令:
三、代码实践
添加Maven依赖1
2
3
4
5<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
生产者客户端代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public class ProducerFastStart {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", brokerList);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka!");
try{
producer.send(record);
}catch (Exception e){
e.printStackTrace();
}
producer.close();
}
}
消费者客户端代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class ConsumerFastStart {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("bootstrap.servers", brokerList);
properties.put("group.id", groupId);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord<String, String> record : records){
System.out.println(record.value());
}
}
}
}