1 Spark Streaming读取Kafka的两种模式
Spark Streaming消费Kafka的数据有两种模式:Receiver和Direct模式,学习时候重点关注下Direct即可,因为在最新读取方式中已经不支持Receiver。
1.1 Receiver模式
在Spark 1.3之前,Spark Streaming消费Kafka中的数据采用基于Kafka高级消费API实现的Receiver模式,如图1所示。首先是Receiver从Kafka中消费数据并存储到Spark Executor中,然后由Spark Streaming启动的Job将处理数据。为了保证高可用,可以启用Spark Streaming的预写日志机制(Write Ahead Log,WAL),将接收到的Kafka数据同步写到外部存储中(如HDFS)。当系统发生故障适合,可以重新将数据加载到系统中,从而避免出现数据丢失的问题。
图 1(来自百度图片)
1.2 Direct模式
在Spark 1.3时,Spark采用无Receiver的Direct模式,如图2所示。首先Spark周期性地查询Kafka中自己订阅的Topic下每个分区的最新offset,通过对比上一批次的offset,从而获取本批次offset的范围。然后启动处理数据的Job, 最后使用Kafka低级消费API去拉取本批次的数据。
图 2(来自百度图片)
1.3 两种接收方式的对比
在最新的API中,Receiver模式已经被废弃。Spark的官网上是推荐Direct模式,理由如下:
1) 简化并行度
Receiver模式中,Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以在KafkaUtils.createStream()中提高partition的数量,只会增加一个Receiver中读取partition的线程数,不会增加Spark处理数据的并行度
Direct模式中,将创建与要使用的Kafka分区一样多的RDD分区,所有这些分区都将从Kafka并行读取数据。因此,Kafka和RDD分区之间存在一对一的映射,这更易于理解和调整。
2) 高效率
Receiver模式中为了保证高可用,需要启用WAL机制,在KafkaUtils.createStream()中设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER,将数据复制一份到外存中。在处理大数据情况下,很容易成为系统瓶颈。
Direct模式中,为了保证不丢失数据,不需要使用WAL这种重量级的保存机制,只需要将上一批次的Offset保存即可。如果系统挂掉,Spark会重新读取上一次保存的Offset,从而保证数据的零丢失。
3) 占用资源低
Receiver不处理数据,只是持续不断的接受数据,与其他Exector是异步的,但却会占用系统资源。在分配相同cup和内存资源的前提下,Receiver会占用Cpu时间,影响Spark的处理速度,如果数据量大,Receiver会占用更大内存。
2 使用Docker安装实验环境
以前博主一般是用虚拟机来从头到尾来安装各种大数据组件,每次学半天都是在安装,但是公司中大数据平台都是运维已经安装好的,我们每天要做的更多是在平台上开发业务,因此学习时候使用Docker快速安装就显得很有意义。
2.1 安装Reids
docker pull redislabs/rebloom
docker run -d -p 6379:6379 redislabs/rebloom
2.2 安装Zookeeper
docker pull wurstmeister/zookeeper
docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
2.3 安装Kafka
docker pull wurstmeister/kafka
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.0.150:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.150:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
这里2个ip分别是zk和kafka所在设备的ip。
2.4 查看运行的容器
docker ps
2.5 注意事项
使用docker最好将镜像仓库配成阿里云,否词下载会很慢,配置方法网上很多,本文不再累述。
3 消费语义
在上Offset管理代码之前,有个重要概念需要交代下,就是消费语义。Spark Streaming从Kafka broker中拉取数据时候,有三种消费语意:
3.1 至少消费一次
从字面上看就是会有重复数据。Spark从Kafka拉取数据进行处理后,如果offset还没有保存而系统挂掉时候,就会重新从上一次的Offset处开始消费,会造成至少消费一次。
3.2 恰好消费一次
Kafka broker存储的数据只会被消费一次。实现这种模式主要有2种方法
a 在至少消费一次的基础上,下游系统自己做幂等。比如有2条uuid一样的数据被Spark处理且存在数据库,下游系统读取的时候要根据uuid去重。
b 使用数据库的事务。保证数据处理入库和提交offset这2个操作处于同一个事务之中。
3.3 最多消费一次
简单点说就是数据可能发生丢失。比如使用自动提交offset时,因为是定时提交offset,有可能刚拉去到一批数据,还没进行处理,系统就自动帮你提交了offset,而你在处理过程中挂了,这就造成了数据丢失。
4 使用Redis管理Kafka Offset(支持多Topiic)
4.1 向Kafka中写入数据
1)创建topic
import org.apache.kafka.clients.admin.*;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CreatTopicService {
public static final String brokerList = "192.168.2.50:9092";
public static final String topic = "first";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
AdminClient client = AdminClient.create(properties);
NewTopic newTopic = new NewTopic(topic, 2, (short) 1);
CreateTopicsResult result = client.createTopics(Collections.singleton(newTopic));
try {
result.all().get();
} catch (InterruptedException | ExecutionException e){
e.printStackTrace();
}finally{
if(client != null){
client.close() ;
}
}
}
}
2)向Kafka中写入数据
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class ProducerService {
public static final String brokerList = "localhost:9092";
public static final String topic = "first";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", brokerList);
//自己直生产者客户端参数并创建KafkaProducer实例
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//构建所需妥发送的消息
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello Spark!");
try {
//发送消息3种方法
//1)发后即忘
// producer.send(record);
//2)同步
// Future<RecordMetadata> future = producer.send(record);
// RecordMetadata metadata = future.get();
// System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
//3)异步
for (int i = 0; i < 200; i++){
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null){
exception.printStackTrace();
}else{
System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
}
}
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
//关闭生产着客户端实例
if(producer != null){
producer.close();
}
}
}
}
4.2 Spark消费Kafka的代码
import com.dhhy.redis.RedisUtil
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory
object KafkaOffseManageDemo2 {
val LOG = LoggerFactory.getLogger(KafkaOffseManageDemo.getClass)
def getOffsets(topics: Array[String]): Map[TopicPartition, Long] = {
val fromOffsets = collection.mutable.Map.empty[TopicPartition, Long]
import scala.collection.JavaConversions._
val jedis = RedisUtil.getResource()
topics.foreach(topic => {
jedis.hgetAll(topic).foreach(kv =>{
fromOffsets += (new TopicPartition(topic, kv._1.toInt) -> kv._2.toLong)
})
})
RedisUtil.returnResource(jedis)
fromOffsets.toMap
}
def main(args: Array[String]): Unit = {
val LOG = LoggerFactory.getLogger(KafkaOffseManageDemo2.getClass)
//设置日志级别
Logger.getLogger("org").setLevel(Level.INFO)
//配置项
val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaOffseManageDemo")
//创建流式上下文,设置批处理间隔
val ssc = new StreamingContext(conf, Seconds(5))
//kafka配置参数
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092", //kafka集群地址
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], //序列化类型,此处为字符类型
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], //序列化类型,此处为字符类型
ConsumerConfig.GROUP_ID_CONFIG -> "KafkaOffseManageDemo1",//Kafka消费组l
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest", //读取最新offset
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)// 是否向定时向zookeeper写入每个分区的offse
)
//消费的topic
//如果消费多个topic,则val topics = Array(topic1,topic2,…)
val topics = Array("first")
val offsets = getOffsets(topics)
// 初始化输入流
val stream = if(offsets.isEmpty){
KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
}else{
KafkaUtils.createDirectStream(ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams, offsets))
}
stream.foreachRDD{rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val jedis = RedisUtil.getResource()
val pipeline = jedis.pipelined()
iter.foreach { msg =>
msg.value().split(" ").foreach (f =>
if (f == "Hello"){
pipeline.incr("Hello")
}
)
}
val o = offsetRanges(TaskContext.get.partitionId)
//打印偏移量信息
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
//保存Kafka的偏移量
pipeline.hset(o.topic, o.partition + "", o.untilOffset + "")
pipeline.sync()
RedisUtil.returnResource(jedis)
}
}
ssc.start()
ssc.awaitTermination()
}
}
4.3 测试结果
登陆redis的客户端,查看key为Hello的值,可以发现kafka 生产者一共写入了200个”Hello”
5 参考文献
1)Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式
https://blog.csdn.net/kwu_ganymede/article/details/50314901
2) Apache spark官方文档
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
3) 耿嘉安,Spark内核设计的艺术:架构设计与实现 第1版. 2018, 机械工业出版社.