线程池 ThreadPool

池化技术:为了优化资源的使用(线程池、连接池、对象池...)

优点:防止频繁创建和销毁线程,实现了线程复用,提高了响应速度,方便管理线程

三大方法、七大参数、四种拒绝策略

阿里巴巴开发手册

一、三大方法

package com.chstack.learning.concurrent.pool;

import java.util.concurrent.*;

public class Demo01 {

    // Executors 工具类:包含三大方法
    public static void main(String[] args) {
        // 单个线程
        ExecutorService singlePool = Executors.newSingleThreadExecutor();
        // 创建一个固定大小的线程池
        ExecutorService fixedPool = Executors.newFixedThreadPool(5);
        // 可伸缩的,遇强则强
        ExecutorService cachedPool = Executors.newCachedThreadPool();

        try {
            for (int i = 1; i <= 10; i++) {
                int temp = i;  // 会被隐式转为 final int temp = i;
                // 注意线程池提交任务的方式
                cachedPool.execute(()->{
                    System.out.println(Thread.currentThread().getName() +  " 执行第" +
                            temp + "个任务");
                });
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            // 线程池用完需要关闭
            cachedPool.shutdown();
        }
    }
}

运行结果:

# singlePool
pool-1-thread-1 执行第1个任务
pool-1-thread-1 执行第2个任务
pool-1-thread-1 执行第3个任务
pool-1-thread-1 执行第4个任务
pool-1-thread-1 执行第5个任务
pool-1-thread-1 执行第6个任务
pool-1-thread-1 执行第7个任务
pool-1-thread-1 执行第8个任务
pool-1-thread-1 执行第9个任务
pool-1-thread-1 执行第10个任务

# fixedPool
pool-2-thread-1 执行第1个任务
pool-2-thread-4 执行第4个任务
pool-2-thread-3 执行第3个任务
pool-2-thread-2 执行第2个任务
pool-2-thread-3 执行第8个任务
pool-2-thread-4 执行第7个任务
pool-2-thread-5 执行第5个任务
pool-2-thread-1 执行第6个任务
pool-2-thread-3 执行第10个任务
pool-2-thread-2 执行第9个任务

# cachedPool 目前只有10个任务,理论上可以有无穷多个线程
pool-3-thread-1 执行第1个任务
pool-3-thread-5 执行第5个任务
pool-3-thread-4 执行第4个任务
pool-3-thread-6 执行第6个任务
pool-3-thread-3 执行第3个任务
pool-3-thread-8 执行第8个任务
pool-3-thread-9 执行第9个任务
pool-3-thread-2 执行第2个任务
pool-3-thread-10 执行第10个任务
pool-3-thread-7 执行第7个任务

如何理解 Lambda 表达式与循环变量的绑定

for (int i = 0; i < 10; i++) {
    executor.execute(() -> System.out.println(i)); // 所有任务输出 9
}

Lambda 表达式可能在定义后很久才执行(例如在异步任务或定时任务中)。如果捕获的是变量的当前值,当 Lambda 执行时,变量的值可能已经改变。

// 最佳实践
for (int i = 0; i < 10; i++) {
    final int temp = i;
    executor.execute(() -> System.out.println(temp)); // 从0输出到9
}

二、七大参数

public static ExecutorService newSingleThreadExecutor() {
    return new Executors.FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
}

本质:ThreadPoolExecutor()

public ThreadPoolExecutor(int corePoolSize,  // 核心线程池大小
                          int maximumPoolSize,  // 最大线程池大小
                          long keepAliveTime,  // 超时了没被调用则释放
                          TimeUnit unit,  // 超时单位
                          BlockingQueue<Runnable> workQueue,  // 阻塞队列
                          ThreadFactory threadFactory,  // 线程工厂:创建线程的,不用动
                          RejectedExecutionHandler handler) {  // 拒绝策略
    if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

手动创建线程池

package com.chstack.learning.concurrent.pool;

import java.util.concurrent.*;

public class Demo02 {
    public static void main(String[] args) {
        // 手动创建线程池
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                // 线程池满了,还有任务,不处理这个任务,抛出异常
                new ThreadPoolExecutor.AbortPolicy()
        );

        try {
            // 最大承载量:Deque + max
            for (int i = 1; i <= 9; i++) {
                int temp = i;
                // 注意线程池提交任务的方式
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName() + " 执行第" + 
                                                                                                         temp + "个任务");
                });
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            // 线程池用完需要关闭
            threadPool.shutdown();
        }
    }
}

三、四种拒绝策略

// 线程池满了,还有任务,不处理这个任务,抛出异常
new ThreadPoolExecutor.AbortPolicy()
pool-1-thread-5 执行第8个任务
pool-1-thread-4 执行第7个任务
pool-1-thread-3 执行第6个任务
pool-1-thread-2 执行第2个任务
pool-1-thread-1 执行第1个任务
pool-1-thread-3 执行第5个任务
pool-1-thread-4 执行第4个任务
pool-1-thread-5 执行第3个任务
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: Task org.example.pool.Demo01$$Lambda$15/0x0000020a61001200@4f3f5b24 rejected from java.util.concurrent.ThreadPoolExecutor@15aeb7ab[Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 0]
	at org.example.pool.Demo01.main(Demo01.java:21)
Caused by: java.util.concurrent.RejectedExecutionException: Task org.example.pool.Demo01$$Lambda$15/0x0000020a61001200@4f3f5b24 rejected from java.util.concurrent.ThreadPoolExecutor@15aeb7ab[Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 0]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365)
	at org.example.pool.Demo01.main(Demo01.java:16)

// 线程池满了,还有任务,由调用者执行
new ThreadPoolExecutor.CallerRunsPolicy()
pool-1-thread-5 执行第8个任务
pool-1-thread-1 执行第1个任务
pool-1-thread-5 执行第3个任务
pool-1-thread-1 执行第4个任务
pool-1-thread-5 执行第5个任务
pool-1-thread-2 执行第2个任务
main 执行第9个任务
pool-1-thread-4 执行第7个任务
pool-1-thread-3 执行第6个任务

// 线程池满了,还有任务,不处理这个任务,不抛出异常
new ThreadPoolExecutor.DiscardPolicy()
pool-1-thread-3 执行第6个任务
pool-1-thread-5 执行第8个任务
pool-1-thread-2 执行第2个任务
pool-1-thread-1 执行第1个任务
pool-1-thread-4 执行第7个任务
pool-1-thread-2 执行第5个任务
pool-1-thread-5 执行第4个任务
pool-1-thread-3 执行第3个任务


// 线程池满了,尝试丢掉任务队列中的最早的任务
new ThreadPoolExecutor.DiscardOldestPolicy()
pool-1-thread-3 执行第6个任务
pool-1-thread-1 执行第1个任务
pool-1-thread-1 执行第5个任务
pool-1-thread-1 执行第9个任务
pool-1-thread-2 执行第2个任务
pool-1-thread-4 执行第7个任务
pool-1-thread-3 执行第4个任务
pool-1-thread-5 执行第8个任务

第四种拒绝策略:

A handler for rejected tasks that discards the oldest unhandled request and then retries execute, unless the executor is shut down, in which case the task is discarded.

AI翻译:当执行器(executor)未关闭时,该拒绝任务处理器(RejectedExecutionHandler)会丢弃任务队列中最旧的未处理任务,并重新尝试执行被拒绝的任务;若执行器已关闭,则直接丢弃该任务。

四种拒绝策略总结:

策略行为适用场景
AbortPolicy抛出异常,拒绝任务关键任务(如支付、交易),需立即感知失败并处理
CallerRunsPolicy调用者线程执行任务非核心但需保证执行的任务(如日志记录),且调用者线程可承担额外负载
DiscardPolicy静默丢弃任务非关键任务(如统计、监控数据),任务丢失可接受
DiscardOldestPolicy丢弃队列中最旧任务,为新任务腾出空间。实时性要求高的场景(如消息推送),新任务优先级更高
This article was updated on