业务背景
原大数据团队不再维护DB数据同步至es的服务,由我们业务团队自己维护。实现方案:使用canal监听DB binlog,将数据写入es
问题描述
为啥要看es线程池呢?因为线上突然疯狂报错es线程池被打满,但竟然看不懂该如何修改es线程池配置。线上异常堆栈如下:
EsRejectedExecutionException[rejected execution of org.elasticsearch.transport.TransportService$7@4d334adf on EsThreadPoolExecutor[bulk, queue capacity = 2000, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@3c0a3704[Running, pool size = 16, active threads = 16, queued tasks = 2009, completed tasks = 284167676]]]
at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:50)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.doExecute(EsThreadPoolExecutor.java:94)
at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:89)
at org.elasticsearch.transport.TransportService.sendLocalRequest(TransportService.java:614)
at org.elasticsearch.transport.TransportService.access$000(TransportService.java:73)
at org.elasticsearch.transport.TransportService$3.sendRequest(TransportService.java:133)
at org.elasticsearch.transport.TransportService.sendRequestInternal(TransportService.java:562)
at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:495)
at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:483)
at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase.performAction(TransportReplicationAction.java:751)
at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase.performLocalAction(TransportReplicationAction.java:670)
at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase.doRun(TransportReplicationAction.java:658)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at org.elasticsearch.action.support.replication.TransportReplicationAction.doExecute(TransportReplicationAction.java:147)
at org.elasticsearch.action.support.replication.TransportReplicationAction.doExecute(TransportReplicationAction.java:93)
at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:170)
at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:142)
at org.elasticsearch.action.support.replication.TransportReplicationAction$OperationTransportHandler.messageReceived(TransportReplicationAction.java:222)
at org.elasticsearch.action.support.replication.TransportReplicationAction$OperationTransportHandler.messageReceived(TransportReplicationAction.java:219)
at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:69)
at org.elasticsearch.transport.TcpTransport$RequestHandler.doRun(TcpTransport.java:1488)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at org.elasticsearch.common.util.concurrent.EsExecutors$1.execute(EsExecutors.java:109)
at org.elasticsearch.transport.TcpTransport.handleRequest(TcpTransport.java:1445)
at org.elasticsearch.transport.TcpTransport.messageReceived(TcpTransport.java:1329)
at org.elasticsearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:74)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:527)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:481)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at java.lang.Thread.run(Thread.java:745)
问题分析
分析个锤子哦,很明显es线程池被打满了,且线程池配置相当过分,只有16的size,es的线程池默认值也是很过分啦
服务中es客户端初始化代码
InetSocketTransportAddress[] addresses = new InetSocketTransportAddress[hostNames.length];
for (int i = 0; i < hostNames.length; i++) {
addresses[i] = new InetSocketTransportAddress(
new InetSocketAddress(hostNames[i], esOutputConfig.getPort()));
}
Settings.Builder builder = Settings.builder()
.put("cluster.name", esOutputConfig.getClusterName());
Settings settings = builder.build();
transportClient = new PreBuiltTransportClient(settings);
transportClient.addTransportAddresses(addresses);
transportClients.add(transportClient);
问题解决
调大线程池配置。
起初由于不知道怎样配置es线程池数量,且查资料也没查到,临时方案:增加es客户端数量(变相调增线程池大小)。
查看es代码,增加配置如下
Settings.Builder builder = Settings.builder()
.put("cluster.name", esOutputConfig.getClusterName());
if (taskConfig.getName().contains("order_detail_shard")
|| taskConfig.getName().contains("order_shard")) {
builder.put("thread_pool.bulk.size", 100);
看到V1版本就应该知道结论了吧,没错,没生效,线上依然疯狂报错线程被打满,size依然是16
查看es客户端线程池源码
线程池代码,N多个线程池,重点在于要搞懂es配置Settings的使用方法
public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
super(settings);
assert Node.NODE_NAME_SETTING.exists(settings);
final Map<String, ExecutorBuilder> builders = new HashMap<>();
final int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);
final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));
builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse
...
}
我们业务场景主要使用的写入线程池,BULK线程池:FixedExecutorBuilder
FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize) {
this(settings, name, size, queueSize, "thread_pool." + name);
}
size为availableProcessors参数
availableProcessors参数取值逻辑
public static final Setting<Integer> PROCESSORS_SETTING =
Setting.intSetting("processors", Math.min(32, Runtime.getRuntime().availableProcessors()), 1, Property.NodeScope);
public static int boundedNumberOfProcessors(Settings settings) {
return PROCESSORS_SETTING.get(settings);
}
初始化Settings,该Settings构造器参数含义:
- 配置key:processors
- 默认值提供者函数:与入参无关,直接返回默认值
- 最小值提供者函数:将入参转为Integer,并且入参在最小值-最大值之间,否则抛出参数异常
- 属性
public static Setting<Integer> intSetting(String key, int defaultValue, int minValue, Property... properties) {
return new Setting<>(key, (s) -> Integer.toString(defaultValue), (s) -> parseInt(s, minValue, key), properties);
}
即availableProcessors的取值逻辑为:min(32与cpu核心数)取最小值作为默认值,最小值为硬编码1
FixedExecutorBuilder构造器
- settings:构造客户端时由开发者自定义的配置信息
- name:配置名称,例如:Names.BULK
- size:大小
- prefix:配置前缀
public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) {
super(name);
final String sizeKey = settingsKey(prefix, "size");
this.sizeSetting =
new Setting<>(
sizeKey,
s -> Integer.toString(size),
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
Setting.Property.NodeScope);
final String queueSizeKey = settingsKey(prefix, "queue_size");
this.queueSizeSetting =
Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope);
}
暂不关心队列配置,sizeSetting配置构造器参数含义:
- sizeKey:配置key
- 默认值提供者函数:与入参无关,直接返回默认值,即:availableProcessors
- parser:配置解析函数,根据配置的字符串解析出结果类型数据,
- 属性
配置解析函数parser
- 最小值:硬编码为1
- 最大值:applyHardSizeLimit,如果配置名称为:BULK、INDEX,则使用availableProcessors+1(即最大值实际为availableProcessors),否则为Integer最大值
- sizeKey:前缀+size拼接,实际规则:“thread_pool.”+name+“size”,例如:“thread_pool.bulk.size”
构建线程池:FixedExecutorBuilder
根据开发者提供的settings配置构建线程池
final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);
final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);
获取配置:org.elasticsearch.threadpool.ExecutorBuilder#getSettings,线程池构建者实现类:org.elasticsearch.threadpool.FixedExecutorBuilder#getSettings
@Override
FixedExecutorSettings getSettings(Settings settings) {
final String nodeName = Node.NODE_NAME_SETTING.get(settings);
final int size = sizeSetting.get(settings);
final int queueSize = queueSizeSetting.get(settings);
return new FixedExecutorSettings(nodeName, size, queueSize);
}
Setting获取配置逻辑
public T get(Settings settings) {
String value = getRaw(settings);
try {
return parser.apply(value);
} ...
}
获取配置值
- 配置key为:sizeSetting.getKey(),即:thread_pool.bulk.size
- 默认值提供者函数:与入参无关,直接返回默认值,即:availableProcessors,有效cpu核心数
// 1. settings:开发者自定义参数配置
// 2. getKey():当前setting实例的key,例如:sizeSetting
// 3. defaultValue:当前setting实例的默认值提供者函数,例如:sizeSetting
public String getRaw(Settings settings) {
checkDeprecation(settings);
return settings.get(getKey(), defaultValue.apply(settings));
}
此时不会返回默认值,因为我们配置了thread_pool.bulk.size=100
配置解析函数解析配置值,配置最小值为1,最大值为有效cpu核心数量,即:availableProcessors+1,线上机器为16核,最大值为:16
s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey);
// applyHardSizeLimit
private int applyHardSizeLimit(final Settings settings, final String name) {
if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) {
return 1 + EsExecutors.boundedNumberOfProcessors(settings);
} else {
return Integer.MAX_VALUE;
}
}
因此修改的配置没有生效,因为100(thread_pool.bulk.size)>16(有效cpu核心数)直接抛出参数异常
总结
es客户端配置,对于bulk、index两个线程池的大小如果想要修改不能单单修改线程池配置,还需要一并修改jvm所在服务器的cpu有效核心数配置,修改后的代码如下,问题解决
Settings.Builder builder = Settings.builder()
.put("cluster.name", esOutputConfig.getClusterName());
if (taskConfig.getName().contains("order_detail_shard")
|| taskConfig.getName().contains("order_shard")) {
builder.put("thread_pool.bulk.size", 100)
.put("thread_pool.bulk.queue_size", 100)
.put("processors", 200);