上篇blog安装了可视化的监控工具后,就到了我们最常用的环节,也就是通过代码来控制Kafka,使用API来调用。Kafka文档地址为Kafka官方文档,接下来我们会充分使用到官方文档中的示例,本篇blog分为如下几个部分:
- 环境准备:创建一个java project,用来进行kafka代码的编写
- 生产者API:探讨生产者的发送方式,使用不同的生产者接口发送【同步发送、异步发送】
- 消费者API:探讨生产者的发送方式,使用不同的生产者接口发送【offset提交】
接下来按照如下流程来一起学习吧,奥利给!
环境准备
首先新建一个java project,打开idea新建一个maven项目:
然后引入kafka的的maven依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>7</source>
<target>7</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.5</version>
</dependency>
</dependencies>
</project>
生产者API
在官方文档中,我们可以看到Kafka的消费者API列表生产者API,这些都是当前Kafka支持的生产者相关的API,有如下四种构造方法:
也有如下13种方法【非抽象的实例方法】:接下来分成几个模式分别介绍下
发送方式
发送方式分为两种,同步发送和异步发送,主体的发送流程二者是相同的,主体流程如下:
- 首先创建ProducerRecord对象,此对象除了包括需要发送的数据value之外还必须指定topic,另外也可以指定key和分区。当发送ProducerRecord的时候,生产者做的第一件事就是把key和value序列化为ByteArrays,以便它们可以通过网络发送。
- 接下来,数据会被发送到分区器。如果在ProducerRecord中指定了一个分区,那么分区器会直接返回指定的分区;否则,分区器通常会基于ProducerRecord的key值计算出一个分区。一旦分区被确定,生产者就知道数据会被发送到哪个topic和分区。然后数据会被添加到同一批发送到相同topic和分区的数据里面,一个单独的线程会负责把那些批数据发送到对应的brokers。
- 当broker接收到数据的时候,如果数据已被成功写入到Kafka,会返回一个包含topic、分区和偏移量offset的RecordMetadata对象;如果broker写入数据失败,会返回一个异常信息给生产者。当生产者接收到异常信息时会尝试重新发送数据,如果尝试失败则抛出异常。
Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程 ——main 线程和 Sender 线程,以及 一个线程共享变量 ——RecordAccumulator。main 线程将消息发送给RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker
只有数据积累到 batch.size
之后,sender 才会发送数据。如果数据迟迟未达到 batch.size,sender 等待 linger.time
之后就会发送数据,也就是发往broker的数据是一批一批过去的。
异步发送
异步发送的含义是:消息的发送者只是将消息发送过去,并不关心消息的发送状态,如果leader在发送ack后宕机的话,重复发送的消息将不能保证原来的顺序。最好选用带回调函数的方法。
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class Producer {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
//设置kafka集群的地址
props.put("bootstrap.servers", "192.168.5.101:9092,192.168.5.102:9092,192.168.5.103:9092");
//ack模式,all是最慢但最安全的
props.put("acks", "-1");
//失败重试次数
props.put("retries", 1);
//每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端
props.put("batch.size", 10);
//props.put("max.request.size",10);
//消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
props.put("linger.ms", 10000);
//整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端
//buffer.memory要大于batch.size,否则会报申请内存不足的错误
props.put("buffer.memory", 10240);
//序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
org.apache.kafka.clients.producer.Producer producer=new KafkaProducer(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("tml-second", Integer.toString(i), "tml-second消息:"+i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("消息发送状态监测");
}
});
producer.close();
}
}
我们可以从机器上看到消息记录
为了更准确一些,我们用命令消费一下:
同步发送
同步发送用的比较少,唯一的不同就是他要求发送时按照顺序,如果当条数据发送失败,那么就阻塞线程,这样就保证了消息的严格顺序【即使在重试状态下发送的消息】
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Producer {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties props = new Properties();
//设置kafka集群的地址
props.put("bootstrap.servers", "192.168.5.101:9092,192.168.5.102:9092,192.168.5.103:9092");
//ack模式,all是最慢但最安全的
props.put("acks", "-1");
//失败重试次数
props.put("retries", 1);
//每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端
props.put("batch.size", 10);
//props.put("max.request.size",10);
//消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
props.put("linger.ms", 10000);
//整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端
//buffer.memory要大于batch.size,否则会报申请内存不足的错误
props.put("buffer.memory", 10240);
//序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
org.apache.kafka.clients.producer.Producer producer=new KafkaProducer(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("tml-second", Integer.toString(i), "tml-second消息:"+i)).get();
producer.close();
}
}
防止消息重复提交
在生产者策略的时候我们提到过,需要防止消息重复提交,也即精准一次提交,我们有两种级别,一种是幂等模式【一个broker的会话周期精准一次】,另一种是事务模式【全局的精准一次】。
幂等模式
代码写法类似,只需要给配置里加一个配置项
//幂等模式
props.put("enable.idempotence", true);
一旦设置了该属性,那么retries默认是Integer.MAX_VALUE ,acks默认是all【-1】。
事务模式
事务模式的写法略有不同:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.5.101:9092,192.168.5.102:9092,192.168.5.103:9092");
props.put("transactional.id", "my_transactional_id");
org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
//数据发送必须在beginTransaction()和commitTransaction()中间,否则会报状态不对的异常
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("tml-second", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// 这些异常不能被恢复,因此必须要关闭并退出Producer
producer.close();
} catch (KafkaException e) {
// 出现其它异常,终止事务
producer.abortTransaction();
}
producer.close();
}
}
消费者API
在官方文档中,我们可以看到Kafka的消费者API列表消费者API,有构造方法,和实例方法。构造方法有如下四种:
也有45种方法【非抽象的实例方法】以及4种弃用方法。消费者提交方式有以下几种:
- 自动提交:kafka管理offset的提交
- 手动提交:手动同步提交和手动异步提交
按照这种结构我们看下提交方式。
自动提交offset
提交的代码如下:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties props = new Properties();
//设置kafka集群的地址
props.put("bootstrap.servers", "192.168.5.101:9092,192.168.5.102:9092,192.168.5.103:9092");
//设置消费者组,组名字自定义,组名字相同的消费者在一个组
props.put("group.id", "tml-group");
//开启offset自动提交
props.put("enable.auto.commit", "true");
//自动提交时间间隔
props.put("auto.commit.interval.ms", "1000");
//序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//实例化一个消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//消费者订阅主题,可以订阅多个主题
consumer.subscribe(Arrays.asList("tml-second"));
//死循环不停的从broker中拿数据
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
可以看到提交的效果
手动同步提交offset
通常从Kafka拿到的消息是要做业务处理,而且业务处理完成才算真正消费成功,所以需要客户端控制offset提交时间
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties props = new Properties();
//设置kafka集群的地址
props.put("bootstrap.servers", "192.168.5.101:9092,192.168.5.102:9092,192.168.5.103:9092");
//设置消费者组,组名字自定义,组名字相同的消费者在一个组
props.put("group.id", "tml_group");
//开启offset自动提交
props.put("enable.auto.commit", "false");
//序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//实例化一个消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//消费者订阅主题,可以订阅多个主题
consumer.subscribe(Arrays.asList("tml-second"));
final int minBatchSize = 50;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
//insertIntoDb(buffer);
for (ConsumerRecord bf : buffer) {
System.out.printf("offset = %d, key = %s, value = %s%n", bf.offset(), bf.key(), bf.value());
}
consumer.commitSync();
buffer.clear();
}
}
}
}
手动异步提交offset
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class Consumer {
public static void main(String[] args) {
Properties props = new Properties();
//设置kafka集群的地址
props.put("bootstrap.servers", "192.168.5.101:9092,192.168.5.102:9092,192.168.5.103:9092");
//设置消费者组,组名字自定义,组名字相同的消费者在一个组
props.put("group.id", "tml_group");
//开启offset自动提交
props.put("enable.auto.commit", "false");
//序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//实例化一个消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//消费者订阅主题,可以订阅多个主题
consumer.subscribe(Arrays.asList("tml-second"));
final int minBatchSize = 50;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
//insertIntoDb(buffer);
for (ConsumerRecord bf : buffer) {
System.out.printf("offset = %d, key = %s, value = %s%n", bf.offset(), bf.key(), bf.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for" +
offsets);
}
}
});
buffer.clear();
}
}
}
}
趟了无数的坑,终于把Kafka学习完了,接下来开始Redis之旅,开始由业务架构向基础架构渗透,上可接客户,中可玩儿平台,下可探基础。完成SaaS、PaaS以及IaaS的闭环
部分内容来自 https://blog.csdn.net/wangzhanzheng/article/details/80801059