Ch09-Elasticsearch 之 threadpool

Ch09-Elasticsearch 之 threadpool

August 30, 2021
Elasticsearch
elasticsearch

Elasticsearch 之线程池。

1. 实现原理 #

线程池的实现还是比较简单的,本质上都是对 Java 各种线程池的再封装,然后再将这些封装后的线程池放入到一个 Map 中维护起来。如果有任务需要的话,那么再从 Map 中取出/创建指定类型的线程池便可。线程池的初始化是在 Node 刚刚启动的时候,就完成的。在 org.elasticsearch.threadpool.ThreadPool 中有个变量 private final Map<String, ExecutorHolder> executors,而 executors 的构建方式如下所示。

public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
  private final Map<String, ExecutorHolder> executors;
  private final Map<String, ExecutorBuilder> builders;

  public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
    final Map<String, ExecutorBuilder> builders = new HashMap<>();
    builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000));
    builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
    ...
    this.builders = Collections.unmodifiableMap(builders);

    final Map<String, ExecutorHolder> executors = new HashMap<>();
    for (final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {
        final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);
        // 这里会将 ExecutorBuilder 通过 build 方法构建为 ExecutorHolder
        final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);
        if (executors.containsKey(executorHolder.info.getName())) {
            throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered");
        }
        logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info));
        executors.put(entry.getKey(), executorHolder);
   }


    this.executors = unmodifiableMap(executors);
  }
}

比如创建一个 fixed 类型的线程池,其流程如下所示。

threadpool-process

2. 分类 #

org.elasticsearch.threadpool.ThreadPool

2.1 按照用途 #

名称 说明
index.merge.policy.floor_segment 默认 2MB,小于这个大小的 segment,优先被归并。
same 在调用者线程执行,不转移到新的线程池。
generic 用于通用的操作 (例如,节点发现),线程池类型为 scaling。
listener 主要用于 Java 客户端线程监听器被设置为 true 时执行动作。线程池类型为 scaling,最大线程数为 min(10, (处理器数量)/2)。
get 用于 get 操作。线程池类型为 fixed, 大小为处理器的数量,队列大小为 1000。
analyze
write
search 用于count/search/suggest操作。线程池类型为fixed, 大小为 int((处理器数量 3)/2)+1,队列大小为 1000。
snapshot_throttled
management 管理工作的线程池,例如,Node info、Node tats、List tasks 等。
flush 用于索引数据的 flush 操作。
refresh 用于 refresh 操作。线程池类型为 scaling, 线程空闲保持存活时间为 5min,最大线程数为 min(10, (处理器数量)/2)。
warme 用于 segment warm-up 操作。线程池类型为 scaling,线程保持存活时间为 5min,最大线程数为 min(5, (处理器数量)/2)。
snapshot 用于 snaphostrestore 操作。线程池类型为 scaling,线程保持存活时间为 5min,最大线程数为 min(5, (处理器数量)/2)。
force_merge 顾名思义,用于 Lucene 分段的 force merge。
fetch_shard_started 用于 TransportNodesAction.
fetch_shard_store 用于 TransportNodesListShardStoreMetaData。
system_read
system_write

2.2 按照种类来分 #

名称 说明
direct 对用户并不可见,当某个任务不需要在独立的线程执行,又想被线程池管理时,于是诞生了这种特殊类型的线程池:在调用者线程中执行任务
fixed 拥有固定数量的线程来处理请求,当线程空闲时不会销毁,当所有线程都繁忙时,请求被添加到队列中
fixed_auto_queue_size fixed 类型的线程池相似,该线程池的线程数量为固定值,但是队列类型不一样。其队列大小根据利特尔法则 ( Little’s Law) 自动调整大小。
scaling 线程数量是动态的,介于 core 和 max 参数之间变化。线程池的最小线程数为配置的 core 大小,随着请求的增加,当 core 数量的线程全都繁忙时,线程数逐渐增大到 max 数量。max 是线程池可拥有的线程数。上限。当线程空闲时,线程数从 max 大小逐渐降低到 core 大小。