Java并发工具
一 Executor 并发框架介绍
1 整体结构介绍
executor [ɪɡˈzekjətə(r)] 执行者 execute [ˈeksɪkjuːt] 执行
从 JDK 1.5 开始,java 中将工作单元和执行机制做了分离,于是 Executor 并行框架出现。
什么是工作单元(或称为任务)呢?其实就是我们需要运行的一段逻辑代码。不管什么逻辑的工作单元,最终都需要通过线程运行。
Executor 并行框架对工作单元、以及工作单元的执行做了高度抽象,形成了一整套完整模型。这个模型包括 3 大部分:
- 对工作单元的抽象,即任务。
- 任务的执行机制,即如何组织任务的提交、如何管理提交的任务、如何组织多个线程执行。
- 对任务执行结果的抽象,即如何跟踪任务执行状态,如何获取任务执行结果。
2 核心接口和实现类
整个 Executor 框架的核心接口和实现类型如下:
- 工作单元:Runnable,Callable
- 工作单元执行:Executor,ExecutorService
- 工作单元执行结果:Future,FutureTask
Executor 框架核心接口的使用逻辑如下图:
2.1 Runnable & Callable
当不需要关注任务执行结果时,使用 Runnable 很合适,反之使用 Callable。代码举例:
Runnable task = new Runnable() {public void run() {// 任何想要执行的逻辑}
}Callable<String> task = new Callable<String>() {public String call() throws Exception {// 任何想要执行的逻辑return "任务执行后任何想返回的信息";}
};
2.2 Executor & ExecutorService
Executor 接口定义了一个用于执行任务的 execute 方法。ExecutorService 是 Executor 的子接口,其职责是对一堆用于执行任务的线程做管理,即定义了线程池的基本操作接口,有很多具体的实现子类,其核心操作有:
ExecutorService的创建
1.newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
2.newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
3.newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
4.newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
ExecutorService的使用
- execute (Runnable):提交 Runnable 任务。
- submit (Callable 或 Runnable):提交 Callable 或 Runnable 任务,并返回代表此任务的 Future 对象。
- invokeAny(…)方法接收的是一个Callable的集合,执行这个方法不会返回Future,但是会返回所有Callable任务中其中一个任务的执行结果。这个方法也无法保证返回的是哪个任务的执行结果,反正是其中的某一个。
- invokeAll(…)与 invokeAny(…)类似也是接收一个Callable集合,但是前者执行之后会返回一个Future的List,其中对应着每个Callable任务执行后的Future对象。
- shutdown ():关闭新的外部任务提交。
- shutdownNow ():尝试中断正在执行的所有任务,清空并返回待执行的任务列表。
- isTerminated ():测试是否所有任务都执行完毕了。
- isShutdown ():测试是否该 ExecutorService 已被关闭。
2.3 Future & FutureTask
Future 接口定义了对任务执行结果的取消、状态查询、结果获取方法。FutureTask 是 Future 的唯一实现类,其职责是提供方便地构建带有返回结果的任务。Future 接口的核心操作有:
- cancel (boolean):尝试取消已经提交的任务。
- isCancelled ():查询任务是否在完成之前被取消了。
- isDone ():查询任务是否已经完成。
- get ():获取异步任务的执行结果(如果任务没执行完将等待)。
二 Executor 应用示例
1 案例描述
我们可以通过手工创建线程做逻辑单元的执行,但是当存在大量的需要执行的逻辑单元也是这样处理,就会出现很多麻烦的事情,且效率非常低下。手工创建线程并做线程管理,需要我们实现很多与业务无关的控制代码,另外手工不停的创建线程并做线程销毁,会浪费很多系统资源。
我们在实际项目中,常常通过使用 java 提供好的非常好用的线程框架 Executor 进行任务执行操作。
有这样一个场景:需要对某个目录下的所有文件(成百上千)进行加密并用文件的 MD5 串修改文件名称。
在开始动手实现之前,我们先做一个简单的分析。在这个案例中,我们将 “对文件进行加密、生成 MD5 串、修改文件名称” 作为待执行任务的内容。所有文件形成的列表就是我们待处理的数据范围。为了校验整个处理过程是否有文件遗漏,我们最终需要核对处理结果。为了方便演示,下面编码中部分数据采用了模拟的方式生成。
2 编码实现
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;public class ExecutorTest {// 模拟待处理的文件列表private static int fileListSize = new Random().nextInt(6);private static String[] fileList = new String[fileListSize];static {for(int i=0; i<fileListSize; i++) {fileList[i] = "fileName" + i;}}// 主线程public static void main(String[] args) throws Exception {// 创建用于处理任务的线程池ExecutorService executorService = Executors.newFixedThreadPool(10);// 任务提交,每一个任务处理一个文件List<FileDealTask> tasks = new ArrayList<>();for(int i=0; i<fileListSize; i++) {tasks.add(new FileDealTask(0, fileListSize, fileList[i]));}// 等待异步处理结果返回List<Future<Integer>> results = executorService.invokeAll(tasks);// 获取任务执行结果Integer total = 0;for (Future<Integer> result : results) {total = total + result.get();}System.out.println("预备处理的文件个数" + fileListSize + ",总共处理的文件个数:" + total);// 关闭线程池executorService.shutdown();}
}
上面代码注释已经很清楚了,我们观察下面的代码,看看任务代码
import java.util.Random;
import java.util.concurrent.Callable;public class FileDealTask implements Callable<Integer> {private String fileName;public FileDealTask(int first, int last, String fileName) {this.fileName = fileName;}@Overridepublic Integer call() throws Exception {try {Thread.sleep(new Random().nextInt(2000));System.out.println(Thread.currentThread().getName() + ":文件" + fileName + "已处理完毕");} catch (Exception e) {return 0;}return 1;}
}
上面代码逻辑中有随机内容,每次运行结果会有差异,运行上面的代码,我们观察运行结果:
pool-1-thread-2:文件fileName1已处理完毕
pool-1-thread-3:文件fileName2已处理完毕
pool-1-thread-1:文件fileName0已处理完毕
预备处理的文件个数3,总共处理的文件个数:3
3 注意事项
- Executors 是 Executor 框架体系中的一个独立的工具类,用于快速创建各类线程池,在实际应用中,如果需要对线程池的各类参数做更多的自定义,可以参考此类的实现。
- 做好评估权衡,当需要处理的数据量不是特别大时,没有必要使用 Executor。其底层使用多线程的方式处理任务,涉及到线程上下文的切换,当数据量不大的时候使用串行会比使用多线程快。
- 在使用时,如果主线程不关心子任务的执行结果,请使用 Runnable 接口封装任务的执行逻辑。
三 ForkJoin 并发框架
1 分治法
基本思想
把一个规模大的问题划分为规模较小的子问题,然后分而治之,最后合并子问题的解得到原问题的解。
步骤
①分割原问题;
②求解子问题;
③合并子问题的解为原问题的解。
我们可以使用如下伪代码来表示这个步骤。
if(任务很小){直接计算得到结果
}else{分拆成N个子任务调用子任务的fork()进行计算调用子任务的join()合并计算结果
}
在分治法中,子问题一般是相互独立的,因此,经常通过递归调用算法来求解子问题。
2 ForkJoin框架概述
Java 1.7 引入了一种新的并发框架—— Fork/Join Framework,主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数。
ForkJoin框架的本质是一个用于并行执行任务的框架, 能够把一个大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务的计算结果。在Java中,ForkJoin框架与ThreadPool共存,并不是要替换ThreadPool
其实,在Java 8中引入的并行流计算,内部就是采用的ForkJoinPool来实现的。例如,下面使用并行流实现打印数组元组的程序。
public class SumArray {public static void main(String[] args){List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);numberList.parallelStream().forEach(System.out::println);}
}
这段代码的背后就使用到了ForkJoinPool。
说到这里,可能有读者会问:可以使用线程池的ThreadPoolExecutor来实现啊?为什么要使用ForkJoinPool啊?ForkJoinPool是个什么鬼啊?! 接下来,我们就来回答这个问题。
3 ForkJoin框架原理
ForkJoin框架是从jdk1.7中引入的新特性,它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入指定的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。
ForkJoinPool主要使用**分治法(Divide-and-Conquer Algorithm)**来解决问题。典型的应用比如快速排序算法。这里的要点在于,ForkJoinPool能够使用相对较少的线程来处理大量的任务。
比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。
比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概200万+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。
所以当使用ThreadPoolExecutor时,使用分治法会存在问题,因为ThreadPoolExecutor中的线程无法向任务队列中再添加一个任务并在等待该任务完成之后再继续执行。而使用ForkJoinPool就能够解决这个问题,它就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。
那么使用ThreadPoolExecutor或者ForkJoinPool,性能上会有什么差异呢?
首先,使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,很显然这是不可行的,也是很不合理的!!
4 ForkJoin框架的实现
ForkJoin框架中一些重要的类如下所示。
ForkJoinPool 框架中涉及的主要类如下所示。
4.1 ForkJoinPool类
实现了ForkJoin框架中的线程池,由类图可以看出,ForkJoinPool类实现了线程池的Executor接口。
我们也可以从下图中看出ForkJoinPool的类图关系。
其中,可以使用Executors.newWorkStealPool()方法创建ForkJoinPool。
ForkJoinPool中提供了如下提交任务的方法。
public void execute(ForkJoinTask<?> task)
public void execute(Runnable task)
public <T> T invoke(ForkJoinTask<T> task)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(Callable<T> task)
public <T> ForkJoinTask<T> submit(Runnable task, T result)
public ForkJoinTask<?> submit(Runnable task)
4.2 ForkJoinWorkerThread类
实现ForkJoin框架中的线程。
4.3 ForkJoinTask<V>类
ForkJoinTask封装了数据及其相应的计算,并且支持细粒度的数据并行。ForkJoinTask比线程要轻量,ForkJoinPool中少量工作线程能够运行大量的ForkJoinTask。
ForkJoinTask类中主要包括两个方法fork()和join(),分别实现任务的分拆与合并。
fork()方法类似于Thread.start(),但是它并不立即执行任务,而是将任务放入工作队列中。跟Thread.join()方法不同,ForkJoinTask的join()方法并不简单的阻塞线程,而是利用工作线程运行其他任务,当一个工作线程中调用join(),它将处理其他任务,直到注意到目标子任务已经完成。
我们可以使用下图来表示这个过程。
ForkJoinTask有3个子类:
- RecursiveAction:无返回值的任务。
- RecursiveTask:有返回值的任务。
- CountedCompleter:完成任务后将触发其他任务。
4.4 RecursiveTask<V> 类
有返回结果的ForkJoinTask实现Callable。
4.5 RecursiveAction类
无返回结果的ForkJoinTask实现Runnable。
4.6 CountedCompleter<T> 类
在任务完成执行后会触发执行一个自定义的钩子函数。
5 ForkJoin示例程序
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {public static final int threshold = 2;private int start;private int end;public ForkJoinTaskExample(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;//如果任务足够小就计算任务boolean canCompute = (end - start) <= threshold;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {// 如果任务大于阈值,就分裂成两个子任务计算int middle = (start + end) / 2;ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);// 执行子任务leftTask.fork();rightTask.fork();// 等待任务执行结束合并其结果int leftResult = leftTask.join();int rightResult = rightTask.join();// 合并子任务sum = leftResult + rightResult;}return sum;}public static void main(String[] args) {ForkJoinPool forkjoinPool = new ForkJoinPool();//生成一个计算任务,计算1+2+3+4+...+100ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);//执行一个任务Future<Integer> result = forkjoinPool.submit(task);try {log.info("result:{}", result.get());} catch (Exception e) {log.error("exception", e);}}
}
16:39:08.917 [main] INFO ForkJoinTaskExample - result:5050
四 补充 - 创建线程池的方法 - ThreadPoolExecutor和Executors
1 线程池介绍
什么是线程池?
线程池就是一个可以复用线程的技术。
不使用线程池的问题:
如果用户每发起一个请求,后台就创建一个新线程来处理,下次新任务来了又要创建新线程,而创建新线程的开销是很大的,这样会严重影响系统的性能。
线程池工作原理:
例如线程池中最多可以允许创建三个工作线程, 也叫核心线程, 前面三个任务来的时候会给前面三个任务单独创建三个线程; 但是后面任务再来的时候, 因为创建的工作线程已达到最大数, 那么后面的任务就会进入任务队列中排队等待; 等前面的任务执行完成, 有空闲的线程的时候使用空闲的线程依次执行任务队列中的任务
2 实现线程池的方式
谁代表线程池?
JDK 5.0起提供了代表线程池的接口:ExecutorService
如何得到线程池对象?
- 方式一:使用ExecutorService的实现类ThreadPoolExecutor自创建一个线程池对象
- 方式二:使用Executors(线程池的工具类)调用方法返回不同特点的线程池对象
方式一: 实现类ThreadPoolExecutor
ThreadPoolExecutor构造器的参数
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
参数介绍:
参数一:指定线程池的线程数量(核心线程): corePoolSize ----> 不能小于0
参数二:指定线程池可支持的最大线程数: maximumPoolSize ----> 最大数量 >= 核心线程数量
参数三:指定临时线程的最大存活时间: keepAliveTime ----> 不能小于0
参数四:指定存活时间的单位(秒、分、时、天): unit ----> 时间单位
参数五:指定任务队列: workQueue ----> 不能为null
参数六:指定用哪个线程工厂创建线程: threadFactory ----> 不能为null
参数七:指定线程忙,任务满的时候,新任务拒绝策略: handler ----> 不能为null
新任务拒绝策略:
策略 | 详解 |
---|---|
ThreadPoolExecutor.AbortPolicy | 丢弃任务并抛出RejectedExecutionException异常。是默认的策略 |
ThreadPoolExecutor.DiscardPolicy | 丢弃任务,但是不抛出异常 这是不推荐的做法 |
ThreadPoolExecutor.DiscardOldestPolicy | 抛弃队列中等待最久的任务 然后把当前任务加入队列中 |
ThreadPoolExecutor.CallerRunsPolicy | 由主线程负责调用任务的run()方法从而绕过线程池直接执行 |
ThreadPoolExecutor创建线程池对象示例:
ExecutorService pools = new ThreadPoolExecutor(3, 5, 8, TimeUnit.SECONDS, new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
思考:
临时线程什么时候创建啊?
- 新任务提交时发现核心线程都在忙,任务队列也满了,并且还可以创建临时线程,此时才会创建临时线程。
什么时候会开始拒绝任务?
- 核心线程和临时线程都在忙,任务队列也满了,新的任务过来的时候才会开始任务拒绝。
线程池处理Runnable任务
ExecutorService的常用方法:
方法名称 | 说明 |
---|---|
execute(Runnable command) | 执行任务/命令,没有返回值,一般用来执行 Runnable 任务 |
shutdown() | 等任务执行完毕后关闭线程池(一般不会关闭线程池) |
shutdownNow() | 立刻关闭,停止正在执行的任务,并返回队列中未执行的任务(一般不会关闭线程池) |
演示代码:
创建一个Runnable任务线程类
public class MyRunnable implements Runnable{@Overridepublic void run() {for (int i = 0; i < 5; i++) {System.out.println(Thread.currentThread().getName() + "线程执行输出: " + i);}// 为了测试, 我们让每个线程睡眠3秒try {System.out.println(Thread.currentThread().getName() + "线程进入休眠");Thread.sleep(3000);System.out.println(Thread.currentThread().getName() + "线程执行完成");} catch (Exception e) {e.printStackTrace();}}
}
在主类中, 创建一个线程池对象并创建Runnable任务交给线程池处理
- 如果只有三个任务, 那么会被三个核心线程同时执行
public static void main(String[] args) {// 1. 创建一个线程池对象 核心线程为3, 最大线程数5, 临时线程存活6秒, 任务队列最大为5ExecutorService es = new ThreadPoolExecutor(3, 5, 6,TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());// 2. Runnable任务交给任务线程池处理Runnable target = new MyRunnable();// 三个Runnable任务会被核心线程执行es.execute(target);es.execute(target);es.execute(target);
}
- 打印结果如下
pool-1-thread-1线程执行输出: 0
pool-1-thread-2线程执行输出: 0
pool-1-thread-3线程执行输出: 0
pool-1-thread-2线程执行输出: 1
pool-1-thread-2线程执行输出: 2
pool-1-thread-2线程执行输出: 3
pool-1-thread-1线程执行输出: 1
pool-1-thread-2线程执行输出: 4
pool-1-thread-3线程执行输出: 1
pool-1-thread-3线程执行输出: 2
pool-1-thread-3线程执行输出: 3
pool-1-thread-1线程执行输出: 2
pool-1-thread-3线程执行输出: 4
pool-1-thread-1线程执行输出: 3
pool-1-thread-1线程执行输出: 4
pool-1-thread-2线程进入休眠
pool-1-thread-3线程进入休眠
pool-1-thread-1线程进入休眠
pool-1-thread-3线程执行完成
pool-1-thread-1线程执行完成
pool-1-thread-2线程执行完成
- 如果核心线程全部被占用, 那么后面的任务会进入任务队列中排队等待有空闲的核心线程; 由于我们设置的任务队列数是5, 所以进入任务队列的任务数量小于等于5时, 不会创建临时线程
public static void main(String[] args) {// 1. 创建一个线程池对象 核心线程为3, 最大线程数5, 临时线程存活6秒, 任务队列最大为5ExecutorService es = new ThreadPoolExecutor(3, 5, 6,TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());// 2. Runnable任务交给任务线程池处理Runnable target = new MyRunnable();// 三个Runnable任务会被核心线程执行es.execute(target);es.execute(target);es.execute(target);// 三个核心线程被占满, 会进入任务队列排队等待有空闲的核心线程es.execute(target);es.execute(target);es.execute(target);es.execute(target);es.execute(target);
}
- 由于核心线程都在忙, 任务队列也满了, 这时候我们再继续添加任务时, 就会创建临时线程; 因为我们设置的核心线程数是3个, 最大线程数是5个, 所以临时线程最多只会创建两个
public static void main(String[] args) {// 1. 创建一个线程池对象 核心线程为3, 最大线程数5, 临时线程存活6秒, 任务队列最大为5ExecutorService es = new ThreadPoolExecutor(3, 5, 6,TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());// 2. Runnable任务交给任务线程池处理Runnable target = new MyRunnable();// 三个Runnable任务会被核心线程执行es.execute(target);es.execute(target);es.execute(target);// 三个核心线程被占满, 会进入任务队列排队等待有空闲的核心线程(我们设置的任务队列最大允许排队5个任务)es.execute(target);es.execute(target);es.execute(target);es.execute(target);es.execute(target);// 三个核心线程都在忙, 任务队列满, 创建临时线程es.execute(target);es.execute(target);
}
- 核心线程都忙, 任务队列满, 临时线程忙, 此时再继续添加任务就会触发拒绝任务策略
public static void main(String[] args) {// 1. 创建一个线程池对象 核心线程为3, 最大线程数5, 临时线程存活6秒, 任务队列最大为5ExecutorService es = new ThreadPoolExecutor(3, 5, 6,TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());// 2. Runnable任务交给任务线程池处理Runnable target = new MyRunnable();// 三个Runnable任务会被核心线程执行es.execute(target);es.execute(target);es.execute(target);// 三个核心线程被占满, 会进入任务队列排队等待有空闲的核心线程(我们设置的任务队列最大允许排队5个任务)es.execute(target);es.execute(target);es.execute(target);es.execute(target);es.execute(target);// 三个核心线程都在忙, 任务队列满, 创建临时线程es.execute(target);es.execute(target);// 后续任务拒绝任务会被拒绝es.execute(target);
}
线程池处理Callable任务
ExecutorService的常用方法:
方法名称 | 说明 |
---|---|
submit(Callable<T> task) | 执行Callable任务,返回未来任务对象(FutureTask)获取线程结果 |
shutdown() | 等任务执行完毕后关闭线程池 |
shutdownNow() | 立刻关闭,停止正在执行的任务,并返回队列中未执行的任务 |
注意: submit方法返回的是Future对象, Future对象是FutureTask对象继承的父类
演示代码:
定义一个Callable任务类, 用于计算1到n的和返回
import java.util.concurrent.Callable;public class MyCallable implements Callable<String> {private int n;public MyCallable(int n) {this.n = n;}@Overridepublic String call() throws Exception {int sum = 0;for (int i = 1; i <= n ; i++) {sum += i;}// 为了测试, 让每一个线程任务睡眠3秒System.out.println(Thread.currentThread().getName() + "线程进入睡眠");Thread.sleep(3000);System.out.println(Thread.currentThread().getName() + "线程执行完成");return Thread.currentThread().getName() + "执行1-" + n + "结果是: " + sum;}
}
线程池处理Callable任务时的细节和处理Callable的一样, 这里不再一一赘述, 代码如下
public static void main(String[] args) {// 1. 创建一个线程池对象 核心线程为5, 最大线程数5, 临时线程存活6秒, 任务队列最大为5ExecutorService pool = new ThreadPoolExecutor(3, 5, 6,TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());// 2. 创建Callable任务, 调用submit方法, 交给任务线程池处理, 返回未来线程对象// 三个Runnable任务会被核心线程执行Future<String> f1 = pool.submit(new MyCallable(10));Future<String> f2 = pool.submit(new MyCallable(20));Future<String> f3 = pool.submit(new MyCallable(30));// 三个核心线程被占满, 会进入任务队列排队等待有空闲的核心线程(我们设置的任务队列最大允许排队5个任务)Future<String> f4 = pool.submit(new MyCallable(30));Future<String> f5 = pool.submit(new MyCallable(40));Future<String> f6 = pool.submit(new MyCallable(50));Future<String> f7 = pool.submit(new MyCallable(40));Future<String> f8 = pool.submit(new MyCallable(50));// 三个核心线程都在忙, 任务队列满, 创建临时线程Future<String> f9 = pool.submit(new MyCallable(80));Future<String> f10 = pool.submit(new MyCallable(60));// 后续任务拒绝任务会被拒绝Future<String> f11 = pool.submit(new MyCallable(30));// 3. 互获取未来线程的返回结果try { // 正常的结果System.out.println(f1.get());System.out.println(f2.get());System.out.println(f3.get());System.out.println(f4.get());System.out.println(f5.get());System.out.println(f6.get());System.out.println(f7.get());System.out.println(f8.get());System.out.println(f9.get());System.out.println(f10.get());System.out.println(f11.get());} catch (Exception e) { // 异常的结果e.printStackTrace();}
}
方式二: Executors工具类创建线程池
文本一二个章节已阐述
Executors使用可能存在的陷阱
大型并发系统环境中使用Executors如果不注意可能会出现系统风险。
方法名称 | 存在问题 |
---|---|
ExecutorService newCachedThreadPool() | 创建的线程数量最大上限是无穷大(Integer.MAX_VALUE), 线程数可能会随着任务1:1增长,也可能出现OOM内存溢出错误 ( java.lang.OutOfMemoryError ) |
ExecutorService newFixedThreadPool(int nThreads) | 允许创建的线程是固定的, 但是线程允许请求的任务队列长度是无穷大的(Integer.MAX_VALUE),可能出现OOM内存溢出错误(java.lang.OutOfMemoryError ) |
ScheduledExecutorService newScheduledThreadPool(int corePoolSize) | 创建的线程数量最大上限是无穷大(Integer.MAX_VALUE), 线程数可能会随着任务1:1增长,也可能出现OOM内存溢出错误 ( java.lang.OutOfMemoryError ) |
ExecutorService newSingleThreadExecutor() | 允许创建的线程是固定的, 允许请求的任务队列长度是无穷大的(Integer.MAX_VALUE),可能出现OOM内存溢出错误(java.lang.OutOfMemoryError ) |
阿里巴巴开发手册中明确说明, 不允许Executors创建线程池
Java并发工具
一 Executor 并发框架介绍
1 整体结构介绍
executor [ɪɡˈzekjətə(r)] 执行者 execute [ˈeksɪkjuːt] 执行
从 JDK 1.5 开始,java 中将工作单元和执行机制做了分离,于是 Executor 并行框架出现。
什么是工作单元(或称为任务)呢?其实就是我们需要运行的一段逻辑代码。不管什么逻辑的工作单元,最终都需要通过线程运行。
Executor 并行框架对工作单元、以及工作单元的执行做了高度抽象,形成了一整套完整模型。这个模型包括 3 大部分:
- 对工作单元的抽象,即任务。
- 任务的执行机制,即如何组织任务的提交、如何管理提交的任务、如何组织多个线程执行。
- 对任务执行结果的抽象,即如何跟踪任务执行状态,如何获取任务执行结果。
2 核心接口和实现类
整个 Executor 框架的核心接口和实现类型如下:
- 工作单元:Runnable,Callable
- 工作单元执行:Executor,ExecutorService
- 工作单元执行结果:Future,FutureTask
Executor 框架核心接口的使用逻辑如下图:
2.1 Runnable & Callable
当不需要关注任务执行结果时,使用 Runnable 很合适,反之使用 Callable。代码举例:
Runnable task = new Runnable() {public void run() {// 任何想要执行的逻辑}
}Callable<String> task = new Callable<String>() {public String call() throws Exception {// 任何想要执行的逻辑return "任务执行后任何想返回的信息";}
};
2.2 Executor & ExecutorService
Executor 接口定义了一个用于执行任务的 execute 方法。ExecutorService 是 Executor 的子接口,其职责是对一堆用于执行任务的线程做管理,即定义了线程池的基本操作接口,有很多具体的实现子类,其核心操作有:
ExecutorService的创建
1.newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
2.newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
3.newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
4.newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
ExecutorService的使用
- execute (Runnable):提交 Runnable 任务。
- submit (Callable 或 Runnable):提交 Callable 或 Runnable 任务,并返回代表此任务的 Future 对象。
- invokeAny(…)方法接收的是一个Callable的集合,执行这个方法不会返回Future,但是会返回所有Callable任务中其中一个任务的执行结果。这个方法也无法保证返回的是哪个任务的执行结果,反正是其中的某一个。
- invokeAll(…)与 invokeAny(…)类似也是接收一个Callable集合,但是前者执行之后会返回一个Future的List,其中对应着每个Callable任务执行后的Future对象。
- shutdown ():关闭新的外部任务提交。
- shutdownNow ():尝试中断正在执行的所有任务,清空并返回待执行的任务列表。
- isTerminated ():测试是否所有任务都执行完毕了。
- isShutdown ():测试是否该 ExecutorService 已被关闭。
2.3 Future & FutureTask
Future 接口定义了对任务执行结果的取消、状态查询、结果获取方法。FutureTask 是 Future 的唯一实现类,其职责是提供方便地构建带有返回结果的任务。Future 接口的核心操作有:
- cancel (boolean):尝试取消已经提交的任务。
- isCancelled ():查询任务是否在完成之前被取消了。
- isDone ():查询任务是否已经完成。
- get ():获取异步任务的执行结果(如果任务没执行完将等待)。
二 Executor 应用示例
1 案例描述
我们可以通过手工创建线程做逻辑单元的执行,但是当存在大量的需要执行的逻辑单元也是这样处理,就会出现很多麻烦的事情,且效率非常低下。手工创建线程并做线程管理,需要我们实现很多与业务无关的控制代码,另外手工不停的创建线程并做线程销毁,会浪费很多系统资源。
我们在实际项目中,常常通过使用 java 提供好的非常好用的线程框架 Executor 进行任务执行操作。
有这样一个场景:需要对某个目录下的所有文件(成百上千)进行加密并用文件的 MD5 串修改文件名称。
在开始动手实现之前,我们先做一个简单的分析。在这个案例中,我们将 “对文件进行加密、生成 MD5 串、修改文件名称” 作为待执行任务的内容。所有文件形成的列表就是我们待处理的数据范围。为了校验整个处理过程是否有文件遗漏,我们最终需要核对处理结果。为了方便演示,下面编码中部分数据采用了模拟的方式生成。
2 编码实现
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;public class ExecutorTest {// 模拟待处理的文件列表private static int fileListSize = new Random().nextInt(6);private static String[] fileList = new String[fileListSize];static {for(int i=0; i<fileListSize; i++) {fileList[i] = "fileName" + i;}}// 主线程public static void main(String[] args) throws Exception {// 创建用于处理任务的线程池ExecutorService executorService = Executors.newFixedThreadPool(10);// 任务提交,每一个任务处理一个文件List<FileDealTask> tasks = new ArrayList<>();for(int i=0; i<fileListSize; i++) {tasks.add(new FileDealTask(0, fileListSize, fileList[i]));}// 等待异步处理结果返回List<Future<Integer>> results = executorService.invokeAll(tasks);// 获取任务执行结果Integer total = 0;for (Future<Integer> result : results) {total = total + result.get();}System.out.println("预备处理的文件个数" + fileListSize + ",总共处理的文件个数:" + total);// 关闭线程池executorService.shutdown();}
}
上面代码注释已经很清楚了,我们观察下面的代码,看看任务代码
import java.util.Random;
import java.util.concurrent.Callable;public class FileDealTask implements Callable<Integer> {private String fileName;public FileDealTask(int first, int last, String fileName) {this.fileName = fileName;}@Overridepublic Integer call() throws Exception {try {Thread.sleep(new Random().nextInt(2000));System.out.println(Thread.currentThread().getName() + ":文件" + fileName + "已处理完毕");} catch (Exception e) {return 0;}return 1;}
}
上面代码逻辑中有随机内容,每次运行结果会有差异,运行上面的代码,我们观察运行结果:
pool-1-thread-2:文件fileName1已处理完毕
pool-1-thread-3:文件fileName2已处理完毕
pool-1-thread-1:文件fileName0已处理完毕
预备处理的文件个数3,总共处理的文件个数:3
3 注意事项
- Executors 是 Executor 框架体系中的一个独立的工具类,用于快速创建各类线程池,在实际应用中,如果需要对线程池的各类参数做更多的自定义,可以参考此类的实现。
- 做好评估权衡,当需要处理的数据量不是特别大时,没有必要使用 Executor。其底层使用多线程的方式处理任务,涉及到线程上下文的切换,当数据量不大的时候使用串行会比使用多线程快。
- 在使用时,如果主线程不关心子任务的执行结果,请使用 Runnable 接口封装任务的执行逻辑。
三 ForkJoin 并发框架
1 分治法
基本思想
把一个规模大的问题划分为规模较小的子问题,然后分而治之,最后合并子问题的解得到原问题的解。
步骤
①分割原问题;
②求解子问题;
③合并子问题的解为原问题的解。
我们可以使用如下伪代码来表示这个步骤。
if(任务很小){直接计算得到结果
}else{分拆成N个子任务调用子任务的fork()进行计算调用子任务的join()合并计算结果
}
在分治法中,子问题一般是相互独立的,因此,经常通过递归调用算法来求解子问题。
2 ForkJoin框架概述
Java 1.7 引入了一种新的并发框架—— Fork/Join Framework,主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数。
ForkJoin框架的本质是一个用于并行执行任务的框架, 能够把一个大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务的计算结果。在Java中,ForkJoin框架与ThreadPool共存,并不是要替换ThreadPool
其实,在Java 8中引入的并行流计算,内部就是采用的ForkJoinPool来实现的。例如,下面使用并行流实现打印数组元组的程序。
public class SumArray {public static void main(String[] args){List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);numberList.parallelStream().forEach(System.out::println);}
}
这段代码的背后就使用到了ForkJoinPool。
说到这里,可能有读者会问:可以使用线程池的ThreadPoolExecutor来实现啊?为什么要使用ForkJoinPool啊?ForkJoinPool是个什么鬼啊?! 接下来,我们就来回答这个问题。
3 ForkJoin框架原理
ForkJoin框架是从jdk1.7中引入的新特性,它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入指定的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。
ForkJoinPool主要使用**分治法(Divide-and-Conquer Algorithm)**来解决问题。典型的应用比如快速排序算法。这里的要点在于,ForkJoinPool能够使用相对较少的线程来处理大量的任务。
比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。
比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概200万+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。
所以当使用ThreadPoolExecutor时,使用分治法会存在问题,因为ThreadPoolExecutor中的线程无法向任务队列中再添加一个任务并在等待该任务完成之后再继续执行。而使用ForkJoinPool就能够解决这个问题,它就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。
那么使用ThreadPoolExecutor或者ForkJoinPool,性能上会有什么差异呢?
首先,使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,很显然这是不可行的,也是很不合理的!!
4 ForkJoin框架的实现
ForkJoin框架中一些重要的类如下所示。
ForkJoinPool 框架中涉及的主要类如下所示。
4.1 ForkJoinPool类
实现了ForkJoin框架中的线程池,由类图可以看出,ForkJoinPool类实现了线程池的Executor接口。
我们也可以从下图中看出ForkJoinPool的类图关系。
其中,可以使用Executors.newWorkStealPool()方法创建ForkJoinPool。
ForkJoinPool中提供了如下提交任务的方法。
public void execute(ForkJoinTask<?> task)
public void execute(Runnable task)
public <T> T invoke(ForkJoinTask<T> task)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(Callable<T> task)
public <T> ForkJoinTask<T> submit(Runnable task, T result)
public ForkJoinTask<?> submit(Runnable task)
4.2 ForkJoinWorkerThread类
实现ForkJoin框架中的线程。
4.3 ForkJoinTask<V>类
ForkJoinTask封装了数据及其相应的计算,并且支持细粒度的数据并行。ForkJoinTask比线程要轻量,ForkJoinPool中少量工作线程能够运行大量的ForkJoinTask。
ForkJoinTask类中主要包括两个方法fork()和join(),分别实现任务的分拆与合并。
fork()方法类似于Thread.start(),但是它并不立即执行任务,而是将任务放入工作队列中。跟Thread.join()方法不同,ForkJoinTask的join()方法并不简单的阻塞线程,而是利用工作线程运行其他任务,当一个工作线程中调用join(),它将处理其他任务,直到注意到目标子任务已经完成。
我们可以使用下图来表示这个过程。
ForkJoinTask有3个子类:
- RecursiveAction:无返回值的任务。
- RecursiveTask:有返回值的任务。
- CountedCompleter:完成任务后将触发其他任务。
4.4 RecursiveTask<V> 类
有返回结果的ForkJoinTask实现Callable。
4.5 RecursiveAction类
无返回结果的ForkJoinTask实现Runnable。
4.6 CountedCompleter<T> 类
在任务完成执行后会触发执行一个自定义的钩子函数。
5 ForkJoin示例程序
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {public static final int threshold = 2;private int start;private int end;public ForkJoinTaskExample(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;//如果任务足够小就计算任务boolean canCompute = (end - start) <= threshold;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {// 如果任务大于阈值,就分裂成两个子任务计算int middle = (start + end) / 2;ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);// 执行子任务leftTask.fork();rightTask.fork();// 等待任务执行结束合并其结果int leftResult = leftTask.join();int rightResult = rightTask.join();// 合并子任务sum = leftResult + rightResult;}return sum;}public static void main(String[] args) {ForkJoinPool forkjoinPool = new ForkJoinPool();//生成一个计算任务,计算1+2+3+4+...+100ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);//执行一个任务Future<Integer> result = forkjoinPool.submit(task);try {log.info("result:{}", result.get());} catch (Exception e) {log.error("exception", e);}}
}
16:39:08.917 [main] INFO ForkJoinTaskExample - result:5050
四 补充 - 创建线程池的方法 - ThreadPoolExecutor和Executors
1 线程池介绍
什么是线程池?
线程池就是一个可以复用线程的技术。
不使用线程池的问题:
如果用户每发起一个请求,后台就创建一个新线程来处理,下次新任务来了又要创建新线程,而创建新线程的开销是很大的,这样会严重影响系统的性能。
线程池工作原理:
例如线程池中最多可以允许创建三个工作线程, 也叫核心线程, 前面三个任务来的时候会给前面三个任务单独创建三个线程; 但是后面任务再来的时候, 因为创建的工作线程已达到最大数, 那么后面的任务就会进入任务队列中排队等待; 等前面的任务执行完成, 有空闲的线程的时候使用空闲的线程依次执行任务队列中的任务
2 实现线程池的方式
谁代表线程池?
JDK 5.0起提供了代表线程池的接口:ExecutorService
如何得到线程池对象?
- 方式一:使用ExecutorService的实现类ThreadPoolExecutor自创建一个线程池对象
- 方式二:使用Executors(线程池的工具类)调用方法返回不同特点的线程池对象
方式一: 实现类ThreadPoolExecutor
ThreadPoolExecutor构造器的参数
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
参数介绍:
参数一:指定线程池的线程数量(核心线程): corePoolSize ----> 不能小于0
参数二:指定线程池可支持的最大线程数: maximumPoolSize ----> 最大数量 >= 核心线程数量
参数三:指定临时线程的最大存活时间: keepAliveTime ----> 不能小于0
参数四:指定存活时间的单位(秒、分、时、天): unit ----> 时间单位
参数五:指定任务队列: workQueue ----> 不能为null
参数六:指定用哪个线程工厂创建线程: threadFactory ----> 不能为null
参数七:指定线程忙,任务满的时候,新任务拒绝策略: handler ----> 不能为null
新任务拒绝策略:
策略 | 详解 |
---|---|
ThreadPoolExecutor.AbortPolicy | 丢弃任务并抛出RejectedExecutionException异常。是默认的策略 |
ThreadPoolExecutor.DiscardPolicy | 丢弃任务,但是不抛出异常 这是不推荐的做法 |
ThreadPoolExecutor.DiscardOldestPolicy | 抛弃队列中等待最久的任务 然后把当前任务加入队列中 |
ThreadPoolExecutor.CallerRunsPolicy | 由主线程负责调用任务的run()方法从而绕过线程池直接执行 |
ThreadPoolExecutor创建线程池对象示例:
ExecutorService pools = new ThreadPoolExecutor(3, 5, 8, TimeUnit.SECONDS, new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
思考:
临时线程什么时候创建啊?
- 新任务提交时发现核心线程都在忙,任务队列也满了,并且还可以创建临时线程,此时才会创建临时线程。
什么时候会开始拒绝任务?
- 核心线程和临时线程都在忙,任务队列也满了,新的任务过来的时候才会开始任务拒绝。
线程池处理Runnable任务
ExecutorService的常用方法:
方法名称 | 说明 |
---|---|
execute(Runnable command) | 执行任务/命令,没有返回值,一般用来执行 Runnable 任务 |
shutdown() | 等任务执行完毕后关闭线程池(一般不会关闭线程池) |
shutdownNow() | 立刻关闭,停止正在执行的任务,并返回队列中未执行的任务(一般不会关闭线程池) |
演示代码:
创建一个Runnable任务线程类
public class MyRunnable implements Runnable{@Overridepublic void run() {for (int i = 0; i < 5; i++) {System.out.println(Thread.currentThread().getName() + "线程执行输出: " + i);}// 为了测试, 我们让每个线程睡眠3秒try {System.out.println(Thread.currentThread().getName() + "线程进入休眠");Thread.sleep(3000);System.out.println(Thread.currentThread().getName() + "线程执行完成");} catch (Exception e) {e.printStackTrace();}}
}
在主类中, 创建一个线程池对象并创建Runnable任务交给线程池处理
- 如果只有三个任务, 那么会被三个核心线程同时执行
public static void main(String[] args) {// 1. 创建一个线程池对象 核心线程为3, 最大线程数5, 临时线程存活6秒, 任务队列最大为5ExecutorService es = new ThreadPoolExecutor(3, 5, 6,TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());// 2. Runnable任务交给任务线程池处理Runnable target = new MyRunnable();// 三个Runnable任务会被核心线程执行es.execute(target);es.execute(target);es.execute(target);
}
- 打印结果如下
pool-1-thread-1线程执行输出: 0
pool-1-thread-2线程执行输出: 0
pool-1-thread-3线程执行输出: 0
pool-1-thread-2线程执行输出: 1
pool-1-thread-2线程执行输出: 2
pool-1-thread-2线程执行输出: 3
pool-1-thread-1线程执行输出: 1
pool-1-thread-2线程执行输出: 4
pool-1-thread-3线程执行输出: 1
pool-1-thread-3线程执行输出: 2
pool-1-thread-3线程执行输出: 3
pool-1-thread-1线程执行输出: 2
pool-1-thread-3线程执行输出: 4
pool-1-thread-1线程执行输出: 3
pool-1-thread-1线程执行输出: 4
pool-1-thread-2线程进入休眠
pool-1-thread-3线程进入休眠
pool-1-thread-1线程进入休眠
pool-1-thread-3线程执行完成
pool-1-thread-1线程执行完成
pool-1-thread-2线程执行完成
- 如果核心线程全部被占用, 那么后面的任务会进入任务队列中排队等待有空闲的核心线程; 由于我们设置的任务队列数是5, 所以进入任务队列的任务数量小于等于5时, 不会创建临时线程
public static void main(String[] args) {// 1. 创建一个线程池对象 核心线程为3, 最大线程数5, 临时线程存活6秒, 任务队列最大为5ExecutorService es = new ThreadPoolExecutor(3, 5, 6,TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());// 2. Runnable任务交给任务线程池处理Runnable target = new MyRunnable();// 三个Runnable任务会被核心线程执行es.execute(target);es.execute(target);es.execute(target);// 三个核心线程被占满, 会进入任务队列排队等待有空闲的核心线程es.execute(target);es.execute(target);es.execute(target);es.execute(target);es.execute(target);
}
- 由于核心线程都在忙, 任务队列也满了, 这时候我们再继续添加任务时, 就会创建临时线程; 因为我们设置的核心线程数是3个, 最大线程数是5个, 所以临时线程最多只会创建两个
public static void main(String[] args) {// 1. 创建一个线程池对象 核心线程为3, 最大线程数5, 临时线程存活6秒, 任务队列最大为5ExecutorService es = new ThreadPoolExecutor(3, 5, 6,TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());// 2. Runnable任务交给任务线程池处理Runnable target = new MyRunnable();// 三个Runnable任务会被核心线程执行es.execute(target);es.execute(target);es.execute(target);// 三个核心线程被占满, 会进入任务队列排队等待有空闲的核心线程(我们设置的任务队列最大允许排队5个任务)es.execute(target);es.execute(target);es.execute(target);es.execute(target);es.execute(target);// 三个核心线程都在忙, 任务队列满, 创建临时线程es.execute(target);es.execute(target);
}
- 核心线程都忙, 任务队列满, 临时线程忙, 此时再继续添加任务就会触发拒绝任务策略
public static void main(String[] args) {// 1. 创建一个线程池对象 核心线程为3, 最大线程数5, 临时线程存活6秒, 任务队列最大为5ExecutorService es = new ThreadPoolExecutor(3, 5, 6,TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());// 2. Runnable任务交给任务线程池处理Runnable target = new MyRunnable();// 三个Runnable任务会被核心线程执行es.execute(target);es.execute(target);es.execute(target);// 三个核心线程被占满, 会进入任务队列排队等待有空闲的核心线程(我们设置的任务队列最大允许排队5个任务)es.execute(target);es.execute(target);es.execute(target);es.execute(target);es.execute(target);// 三个核心线程都在忙, 任务队列满, 创建临时线程es.execute(target);es.execute(target);// 后续任务拒绝任务会被拒绝es.execute(target);
}
线程池处理Callable任务
ExecutorService的常用方法:
方法名称 | 说明 |
---|---|
submit(Callable<T> task) | 执行Callable任务,返回未来任务对象(FutureTask)获取线程结果 |
shutdown() | 等任务执行完毕后关闭线程池 |
shutdownNow() | 立刻关闭,停止正在执行的任务,并返回队列中未执行的任务 |
注意: submit方法返回的是Future对象, Future对象是FutureTask对象继承的父类
演示代码:
定义一个Callable任务类, 用于计算1到n的和返回
import java.util.concurrent.Callable;public class MyCallable implements Callable<String> {private int n;public MyCallable(int n) {this.n = n;}@Overridepublic String call() throws Exception {int sum = 0;for (int i = 1; i <= n ; i++) {sum += i;}// 为了测试, 让每一个线程任务睡眠3秒System.out.println(Thread.currentThread().getName() + "线程进入睡眠");Thread.sleep(3000);System.out.println(Thread.currentThread().getName() + "线程执行完成");return Thread.currentThread().getName() + "执行1-" + n + "结果是: " + sum;}
}
线程池处理Callable任务时的细节和处理Callable的一样, 这里不再一一赘述, 代码如下
public static void main(String[] args) {// 1. 创建一个线程池对象 核心线程为5, 最大线程数5, 临时线程存活6秒, 任务队列最大为5ExecutorService pool = new ThreadPoolExecutor(3, 5, 6,TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());// 2. 创建Callable任务, 调用submit方法, 交给任务线程池处理, 返回未来线程对象// 三个Runnable任务会被核心线程执行Future<String> f1 = pool.submit(new MyCallable(10));Future<String> f2 = pool.submit(new MyCallable(20));Future<String> f3 = pool.submit(new MyCallable(30));// 三个核心线程被占满, 会进入任务队列排队等待有空闲的核心线程(我们设置的任务队列最大允许排队5个任务)Future<String> f4 = pool.submit(new MyCallable(30));Future<String> f5 = pool.submit(new MyCallable(40));Future<String> f6 = pool.submit(new MyCallable(50));Future<String> f7 = pool.submit(new MyCallable(40));Future<String> f8 = pool.submit(new MyCallable(50));// 三个核心线程都在忙, 任务队列满, 创建临时线程Future<String> f9 = pool.submit(new MyCallable(80));Future<String> f10 = pool.submit(new MyCallable(60));// 后续任务拒绝任务会被拒绝Future<String> f11 = pool.submit(new MyCallable(30));// 3. 互获取未来线程的返回结果try { // 正常的结果System.out.println(f1.get());System.out.println(f2.get());System.out.println(f3.get());System.out.println(f4.get());System.out.println(f5.get());System.out.println(f6.get());System.out.println(f7.get());System.out.println(f8.get());System.out.println(f9.get());System.out.println(f10.get());System.out.println(f11.get());} catch (Exception e) { // 异常的结果e.printStackTrace();}
}
方式二: Executors工具类创建线程池
文本一二个章节已阐述
Executors使用可能存在的陷阱
大型并发系统环境中使用Executors如果不注意可能会出现系统风险。
方法名称 | 存在问题 |
---|---|
ExecutorService newCachedThreadPool() | 创建的线程数量最大上限是无穷大(Integer.MAX_VALUE), 线程数可能会随着任务1:1增长,也可能出现OOM内存溢出错误 ( java.lang.OutOfMemoryError ) |
ExecutorService newFixedThreadPool(int nThreads) | 允许创建的线程是固定的, 但是线程允许请求的任务队列长度是无穷大的(Integer.MAX_VALUE),可能出现OOM内存溢出错误(java.lang.OutOfMemoryError ) |
ScheduledExecutorService newScheduledThreadPool(int corePoolSize) | 创建的线程数量最大上限是无穷大(Integer.MAX_VALUE), 线程数可能会随着任务1:1增长,也可能出现OOM内存溢出错误 ( java.lang.OutOfMemoryError ) |
ExecutorService newSingleThreadExecutor() | 允许创建的线程是固定的, 允许请求的任务队列长度是无穷大的(Integer.MAX_VALUE),可能出现OOM内存溢出错误(java.lang.OutOfMemoryError ) |
阿里巴巴开发手册中明确说明, 不允许Executors创建线程池