前两天csdn提醒我又多了一个粉丝,又激发了写作的动力,不要点开看我的粉丝数,哈哈!
正常情况下,kafka的消费线程数据是分区(patition)一对一,单个patition是kafka并行操作的最小单元,kafka只允许单个partition的数据被一个consumer线程消费,例如我们做20个分区,实际上就对应着20个消费线程,当我们做一些活动的时候,就会有发生消息量猛增,而我们的消费线程有限,处理消息的能力有可能跟不上,导致大量的消息堆积处理不完。
这时我们可能就需求要优化,加大处理能力,多数人可能会想到增加分区,分区是可以增加,但是不可能一直无限向上增加,我们这里参用多线程的方案。
package com.imcbb;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.concurrent.*;
@Service
public class KafkaConsumer {
private static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 3, 10, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat("KThread-%d").build(),
(r, executor) -> {
logger.warn("Ops,Rejected!");
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// new ThreadPoolExecutor.CallerRunsPolicy()
);
@KafkaListener(topics = "myTopic")
public void listen(ConsumerRecord<?, ?> cr) {
executor.execute(() -> {
logger.info("---------" + cr.toString());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
在上面的代码中,我们创建了一个线程池来消费一个分区的消息,这里有两个需要特别注意的点:
1.线程池里存放task的队列这里我们使用了SynchronousQueue阻塞队列,这个队列很有趣,是一个无容量队列,具体有兴趣的大家再上网查这里不做过多的解释,使用SynchronousQueue是因为,如果系统发生异常宕机或者应用发版重启时防止队列堆积的消息丢失(当然这也无法完全避免在线程工作中,系统发生异常导致线程挂掉,如果要求可用性更高,可考虑先存入数据库,redis中,再做补偿处理机制)。
2.当消息过多,线程池也忙不过时,我们这里有两种处理办法
2.1 让监听消费线程阻塞,使用new ThreadPoolExecutor.CallerRunsPolicy(),此时消息将不会再消费
2.1 自定义拒绝策略,把任务再放回去
(r, executor) -> {
logger.warn("Ops,Rejected!");
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
以上两种方案都可以解决问题,选一种使用即可,两种方案都无法保证处理消息的顺序。
使用第二种有一个好处,就是可以通过日志查看消费端的一个消费能力,看看有没有进到拒绝里,来适当的调整线程数。
以上就是多线程消费的办法,这个在我们生产环境经过考验的,哈哈!
文章简单帖了些代码,完整写了一个demo给大家,可以自行下载参考:https://github.com/kevinmails/kafka-consumer-demo
前提要在本机上安装一下kafka,官方有安装手册(最小可用版本,本地测试够用了),如果大家看官方留言搞不定,留言给我,我再写一篇安装文章。官方有安装手册:https://kafka.apache.org/quickstart
希望对大家有帮助!
参考:https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster/