在初学kafkaAPI的时候,查阅大量大佬文章,发现开始都是要进行构建kafka依赖,但是由于博主对于maven依赖不熟悉以及对idea的操作略少,故在进行 构建maven依赖,写入pom.xml的时候出现错误,让博主很是烦心,但是本文的重点不在于纠结如何写依赖,关于maven依赖问题,博主会在有空的时候去研究,这个不急(是不是感觉博主好菜啊~~~~~)
因此本文直接把Linux中的kafka集群的j有关jar包,直接copy了出来,这样也能实现调用kafkaAPI,实现生产者与消费者案例(嘿嘿,博主聪明吧),借此告诉大家一个道理:解决问题要寻求多渠道解决,不要死磕在一条道上。好了,话不多说,直接进入主题!!!
首先,找到任一台kafka节点的根目录下,如博主的kafka路径为如下:
然后进入该目录下的libs目录:
ls查看,我们发现libs目录下有很多jar包:
本文所用到的jar包就在其中,故下一步我们就要把libs文件夹添加到我们Window上的idea依赖上。至于如何把libs文件夹传到Windows上,想必大家都有办法的吧,利用VM虚拟机的共享文件夹或者xftp等都可以实现Linux文件传到Window,博主用的是MobaXterm,故可以直接把libs文件夹拖到Windows,但是利用其它软件的话,需要提前压缩libs文件夹,然后再传到Windows!!!
接着还要修改kafka集群的端口,每个节点(博主的集群为:master,slave1,slave2)都要设置,进入kafka的config目录下,vim server.properties,先设置master节点:
在如下地方添加:port=9092
同理,另外两台节点(slave1,slave2)也同样设置:
slave1:
slave2:
然后在每个节点上启动kafka集群(既然要设置了端口号,就必须先启用kafka集群,方可让Java实现kafka生产者跟消费者功能。
---------------------------------------------------------------------------------------------------------------------------------------------------------------------
接下来进行具体代码实现环节:
首先找到提前传好的libs文件(如果你的libs文件夹没有解压的话,必须先解压)。
在idea中创建kafka工程,依次点击工具栏的File-->New-->Project。
然后创建maven
填写点击Next,设置工程名字,博主的是Kafka,然后点击Finish
接着把刚才传过来libs文件夹放到这个工程,工程路径不清楚的话,参考上面的路径,最后情况如下:
到这里就可以写代码了,(再次确保虚拟机中的kafka集群启动成功),在src下创建生产者类MyProducer跟消费者类
生产者代码如下:
import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; public class MyProducer implements Runnable { private final KafkaProducer<String, String> producer; private final String topic; public MyProducer(String topicName) { Properties props = new Properties(); props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); this.producer = new KafkaProducer<String, String>(props); this.topic = topicName; } public void run() { int messageNo = 1; try { for(;;) { String messageStr="你好,这是第"+messageNo+"条数据"; producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr)); //生产了100条就打印 if(messageNo%100==0){ System.out.println("发送的信息:" + messageStr); } //生产1000条就退出 if(messageNo%1000==0){ System.out.println("成功发送了"+messageNo+"条"); break; } messageNo++; } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } public static void main(String args[]) { MyProducer test= new MyProducer("KAFKA_TEST"); Thread thread = new Thread(test); thread.start(); } }
消费者代码:
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; public class MyConsumer implements Runnable { private final KafkaConsumer<String, String> consumer; private ConsumerRecords<String, String> msgList; private final String topic; private static final String GROUPID = "groupA"; public MyConsumer(String topicName) { Properties props = new Properties(); props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092"); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); this.consumer = new KafkaConsumer<String, String>(props); this.topic = topicName; this.consumer.subscribe(Arrays.asList(topic)); } public void run() { int messageNo = 1; System.out.println("---------开始消费---------"); try { for (;;) { msgList = consumer.poll(1000); if(null!=msgList&&msgList.count()>0){ for (ConsumerRecord<String, String> record : msgList) { //消费100条就打印 ,但打印的数据不一定是这个规律的 if(messageNo%100==0){ System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset()); } //当消费了1000条就退出 if(messageNo%1000==0){ break; } messageNo++; } }else{ Thread.sleep(1000); } } } catch (InterruptedException e) { e.printStackTrace(); } finally { consumer.close(); } } public static void main(String args[]) { MyConsumer test1 = new MyConsumer("KAFKA_TEST"); Thread thread1 = new Thread(test1); thread1.start(); } }
其中在生产者demo跟消费者demo中的 配置“bootstrap.servers”,要跟你kafka节点的ip或者hostname对应,如下图
有关配置说明,博主不在这解释,有兴趣的同学可以去查阅资料(本文主要让小白掌握如何让idea跑kafka集群的生产者跟消费者)
最后便是run啦,先运行消费者demo,如下:
然后再是运行生产者demo,如下:
最后回过头看消费者终端,发现接收了信息,如下:
好啦,最后消费者出现上面的信息,代表Java实现了本文目标:生产者与消费者java代码实现实例。
感谢大家的阅读,有啥问题或者建议下方评论指出哦,有帮助请点点赞哈!!!