目录
- 一.发送的几种方式
- 二.主要源码
- 三.自定义负载均衡
- 四.消息传递保障
- 五.一些参数配置
- 五.Producer发送数据打印日志重复
一.发送的几种方式
在producer端,存在2个线程,一个是producer主线程,用户端调用send消息时,是在主线程执行的,数据被缓存到RecordAccumulator中,send方法即刻返回,也就是说此时并不能确定消息是否真正的发送到broker。另外一个是sender IO线程,其不断轮询RecordAccumulator,满足一定条件后,就进行真正的网络IO发送,使用的是异步非阻塞的NIO。主线程的send方法提供了一个用于回调的参数,当sender线程发送完后,回调函数将被调用,可以用来处理成功,失败或异常的逻辑。
主要就是主线程不会参与io,实际的io需要新的线程去做,则主线程和新线程是异步的关系。
参考文章:https://www.cnblogs.com/benfly/p/10000034.html
1.异步发送
for(int i=0 ; i<10 ; i++){
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
producer.send(record);
}
2.阻塞异步发送,主线程会被get()函数阻塞
for(int i=0;i<10;i++){
String key = "key-"+i;
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,key,"value-"+i);
Future<RecordMetadata> send = producer.send(record);
RecordMetadata recordMetadata = send.get();
System.out.println(key + "partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset());
}
3.异步回调发送
for(int i=0;i<10;i++){
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println(
"partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset());
}
});
}
二.主要源码
KafkaProducer:
1.metricConfig
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
2.加载负载均衡器
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
3.初始化Serializer
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.keySerializer.configure(config.originals(), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = keySerializer;
}
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.valueSerializer.configure(config.originals(), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = valueSerializer;
}
4.初始化accumulator,类似于计数器
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
5.启动newSender,守护线程
this.sender = newSender(logContext, kafkaClient, this.metadata);
可以得到结论:1.producer线程安全 2.批量发送,不会一条一条发送。
producer.send(record):
1.计算分区
int partition = partition(record, serializedKey, serializedValue, cluster);
2.计算批次 accumulator.append
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
大致流程图:
三.自定义负载均衡
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,“com.chengyanban.kafka_study.producer.SamplePartition”);
public class SamplePartition implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String keyStr = key + "";
String keyInt = keyStr.substring(4);
System.out.println("keyStr : " + keyStr + "keyInt : " + keyInt);
int i = Integer.parseInt(keyInt);
return i%2; //两个分区
}
四.消息传递保障
properties.put(ProducerConfig.ACKS_CONFIG,“all”);
(1)acks=0: 设置为 0 表示 producer 不需要等待任何确认收到的信息。副本将立即加到socket buffer 并认为已经发送。没有任何保障可以保证此种情况下 server 已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的 offset 会总是设置为-1;
(2)acks=1: 这意味着至少要等待 leader已经成功将数据写入本地 log,但是并没有等待所有 follower 是否成功写入。这种情况下,如果 follower 没有成功备份数据,而此时 leader又挂掉,则消息会丢失。
(3)acks=all: 这意味着 leader 需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
(4)其他的设置,例如 acks=2 也是可以的,这将需要给定的 acks 数量,但是这种策略一般很少用。
五.一些参数配置
Properties properties = new Properties();
//批次最大字节数 16KB
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
//一旦我们获得某个 partition 的batch.size,他将会立即发送而不顾这项设置,然而如果我们获得消息字节数比这项设置要小的多,我们需要“linger”特定的时间以获取更多的消息。
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
//缓存数据的内存大小,先在客户端缓存,然后再一批一批发送 32MB
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
五.Producer发送数据打印日志重复
参考文章:https://www.cnblogs.com/yangxusun9/p/12561986.html