当kafka分区不能再增加的情况下,使用多线程提升kafka消费能力(附源码)

   日期:2020-09-26     浏览:104    评论:0    
核心提示:正常情况下,kafka的消费线程数据是分区(patition)一对一,单个patition是kafka并行操作的最小单元,kafka只允许单个partition的数据被一个consumer线程消费,例如我们做20个分区,实际上就对应着20个消费线程,当我们做一些活动的时候,就会有发生消息量猛增,而我们的消费线程有限,处理消息的能力有可能跟不上,导致大量的消息堆积处理不完。这时我们可能就需求要优化,加大处理能力,多数人可能会想到增加分区,分区是可以增加,但是不可能一直无限向上增加,我们这里参用...

前两天csdn提醒我又多了一个粉丝,又激发了写作的动力,不要点开看我的粉丝数,哈哈!      

正常情况下,kafka的消费线程数据是分区(patition)一对一,单个patition是kafka并行操作的最小单元,kafka只允许单个partition的数据被一个consumer线程消费,例如我们做20个分区,实际上就对应着20个消费线程,当我们做一些活动的时候,就会有发生消息量猛增,而我们的消费线程有限,处理消息的能力有可能跟不上,导致大量的消息堆积处理不完。

这时我们可能就需求要优化,加大处理能力,多数人可能会想到增加分区,分区是可以增加,但是不可能一直无限向上增加,我们这里参用多线程的方案。

package com.imcbb;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.concurrent.*;


@Service
public class KafkaConsumer {
    private static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);


    ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 3, 10, TimeUnit.SECONDS,
            new SynchronousQueue<>(),
            new ThreadFactoryBuilder().setNameFormat("KThread-%d").build(),
            (r, executor) -> {
                logger.warn("Ops,Rejected!");
                try {
                    executor.getQueue().put(r);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
//                        new ThreadPoolExecutor.CallerRunsPolicy()

    );

    @KafkaListener(topics = "myTopic")
    public void listen(ConsumerRecord<?, ?> cr) {

        executor.execute(() -> {
            logger.info("---------" + cr.toString());
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

    }
}

在上面的代码中,我们创建了一个线程池来消费一个分区的消息,这里有两个需要特别注意的点:

1.线程池里存放task的队列这里我们使用了SynchronousQueue阻塞队列,这个队列很有趣,是一个无容量队列,具体有兴趣的大家再上网查这里不做过多的解释,使用SynchronousQueue是因为,如果系统发生异常宕机或者应用发版重启时防止队列堆积的消息丢失(当然这也无法完全避免在线程工作中,系统发生异常导致线程挂掉,如果要求可用性更高,可考虑先存入数据库,redis中,再做补偿处理机制)。

2.当消息过多,线程池也忙不过时,我们这里有两种处理办法

2.1 让监听消费线程阻塞,使用new ThreadPoolExecutor.CallerRunsPolicy(),此时消息将不会再消费

2.1 自定义拒绝策略,把任务再放回去

(r, executor) -> {
    logger.warn("Ops,Rejected!");
    try {
        executor.getQueue().put(r);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

以上两种方案都可以解决问题,选一种使用即可,两种方案都无法保证处理消息的顺序。

使用第二种有一个好处,就是可以通过日志查看消费端的一个消费能力,看看有没有进到拒绝里,来适当的调整线程数。

以上就是多线程消费的办法,这个在我们生产环境经过考验的,哈哈!

文章简单帖了些代码,完整写了一个demo给大家,可以自行下载参考:https://github.com/kevinmails/kafka-consumer-demo

前提要在本机上安装一下kafka,官方有安装手册(最小可用版本,本地测试够用了),如果大家看官方留言搞不定,留言给我,我再写一篇安装文章。官方有安装手册:https://kafka.apache.org/quickstart

希望对大家有帮助!

参考:https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster/

 

 

 

 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服