继前面的几次线程池的概括,作者又双叕的再次整理一遍线程池,篇幅可能有点长,请搬好小凳子,细细观赏:
前面的文章提到过:
多线程快速处理List集合(结合线程池的使用)
多线程杀手锏—countDownLatch&&CyclicBarrier
上面的问题其实也可以作为拓展去看一下,因为既然用到了线程池,那么就涉及到多线程,随之而来的就是并发问题。好了开始正文,参考文章附文末。
在平时的业务场景,如果只是一个简单的管理系统可能体会不到多线程的优势,甚至连为什么用多线程都不知道,更别谈线程池,那么什么是线程池:
什么是线程池
线程池其实就是一种多线程处理形式,处理过程中可以将任务添加到队列中,然后在创建线程后自动启动这些任务。这里的线程就是我们前面学过的线程,这里的任务就是我们前面学过的实现了Runnable或Callable接口的实例对象;
为什么使用线程池
使用线程池最大的原因就是可以根据系统的需求和硬件环境灵活的控制线程的数量,且可以对所有线程进行统一的管理和控制,从而提高系统的运行效率,降低系统运行运行压力;当然了,使用线程池的原因不仅仅只有这些,我们可以从线程池自身的优点上来进一步了解线程池的好处;
其实我们应该了解到创建线程执行任务和任务执行完成销毁线程都是需要一定的时间的,所以就可以体现出线程池的优势
使用线程池有哪些优势
线程和任务分离,提升线程重用性;
控制线程并发数量,降低服务器压力,统一管理所有线程;
提升系统响应速度,假如创建线程用的时间为T1,执行任务用的时间为T2,销毁线程用的时间为T3,那么使用线程池就免去了T1和T3的时间;
线程池应用场景介绍
网购商品秒杀
云盘文件上传和下载
12306网上购票系统等
总之,只要有并发的地方、任务数量大或小、每个任务执行时间长或短的都可以使用线程池;
只不过在使用线程池的时候,注意一下设置合理的线程池大小即可;(关于如何合理设置线程池大小在后面的章节中讲解)
主要讲下面三种
java内置线程池
自定义线程池
异步计算结果(Future)
Java内置线程池原理剖析
我们要想自定义线程池,必须先了解线程池的工作原理,才能自己定义线程池;
这里我们通过观察java中ThreadPoolExecutor的源码来学习线程池的原理;
ThreadPoolExecutor部分源码
构造方法:
public ThreadPoolExecutor(
int corePoolSize, //核心线程数量
int maximumPoolSize,// 最大线程数
long keepAliveTime, // 最大空闲时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 饱和处理机制
)
{ ... }
ThreadPoolExecutor参数详解
我们可以通过下面的场景理解ThreadPoolExecutor中的各个参数:
a客户(任务)去银行(线程池)办理业务,但银行刚开始营业,窗口服务员还未就位(相当于线程池中初始线程数量为0),于是经理(线程池管理者)就安排1号工作人员(创建1号线程执行任务)接待a客户(创建线程);
在a客户业务还没办完时,b客户(任务)又来了,于是经理(线程池管理者)就安排2号工作人员(创建2号线程执行任务)接待b客户(又创建了一个新的线程);假设该银行总共就2个窗口(核心线程数量是2);
紧接着在a,b客户都没有结束的情况下c客户来了,于是经理(线程池管理者)就安排c客户先坐到银行大厅的座位上(空位相当于是任务队列)等候,并告知他: 如果1、2号工作人员空出,c就可以前去办理业务;
此时d客户又到了银行,(工作人员都在忙,大厅座位也满了)于是经理赶紧安排临时工(新创建的线程)在大堂站着,手持pad设备给d客户办理业务;
假如前面的业务都没有结束的时候e客户又来了,此时正式工作人员都上了,临时工也上了,座位也满了(临时工加正式员工的总数量就是最大线程数),于是经理只能按《超出银行最大接待能力处理办法》(饱和处理机制)拒接接待e客户;
最后,进来办业务的人少了,大厅的临时工空闲时间也超过了1个小时(最大空闲时间),经理就会让这部分空闲的员工人下班.(销毁线程)但是为了保证银行银行正常工作(有一个allowCoreThreadTimeout变量控制是否允许销毁核心线程,默认false),即使正式工闲着,也不得提前下班,所以1、2号工作人员继续待着(池内保持核心线程数量);
线程池工作流程总结示意图
自定义线程池-参数设计分析
通过观察Java中的内置线程池参数讲解和线程池工作流程总结,我们不难发现,要设计一个好的线程池,就必须合理的设置线程池的4个参数;那到底该如何合理的设计4个参数的值呢?我们一起往下看.
4个参数的设计:
1.核心线程数(corePoolSize)
核心线程数的设计需要依据任务的处理时间和每秒产生的任务数量来确定,例如:执行一个任务需要0.1秒,系统百分之80的时间每秒都会产生100个任务,那么要想在1秒内处理完这100个任务,就需要10个线程,此时我们就可以设计核心线程数为10;
当然实际情况不可能这么平均,所以我们一般按照8020原则设计即可,既按照百分之80的情况设计核心线程数,剩下的百分之20可以利用最大线程数处理;
2.任务队列长度(workQueue)
任务队列长度一般设计为:核心线程数/单个任务执行时间*2即可;例如上面的场景中,核心线程数设计为10,单个任务执行时间为0.1秒,则队列长度可以设计为200;
3.最大线程数(maximumPoolSize)
最大线程数的设计除了需要参照核心线程数的条件外,还需要参照系统每秒产生的最大任务数决定:例如:上述环境中,如果系统每秒最大产生的任务是1000个,那么,最大线程数=(最大任务数-任务队列长度)*单个任务执行时间;既: 最大线程数=(1000-200)*0.1=80个;
4.最大空闲时间(keepAliveTime)
这个参数的设计完全参考系统运行环境和硬件压力设定,没有固定的参考值,用户可以根据经验和系统产生任务的时间间隔合理设置一个值即可;
自定义线程池-实现步骤
编写任务类(MyTask),实现Runnable接口;
编写线程类(MyWorker),用于执行任务,需要持有所有任务;
编写线程池类(MyThreadPool),包含提交任务,执行任务的能力;
编写测试类(MyTest),创建线程池对象,提交多个任务测试;
MyTask
package com.itheima.demo01;
/*
需求:
自定义线程池练习,这是任务类,需要实现Runnable;
包含任务编号,每一个任务执行时间设计为0.2秒
*/
public class MyTask implements Runnable{
private int id;
//由于run方法是重写接口中的方法,因此id这个属性初始化可以利用构造方法完成
public MyTask(int id) {
this.id = id;
}
@Override
public void run() {
String name = Thread.currentThread().getName();
System.out.println("线程:"+name+" 即将执行任务:"+id);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程:"+name+" 完成了任务:"+id);
}
@Override
public String toString() {
return "MyTask{" +
"id=" + id +
'}';
}
}
MyWorker
package com.itheima.demo01;
import java.util.List;
/*
需求:
编写一个线程类,需要继承Thread类,设计一个属性,用于保存线程的名字;
设计一个集合,用于保存所有的任务;
*/
public class MyWorker extends Thread{
private String name;//保存线程的名字
private List<Runnable> tasks;
//利用构造方法,给成员变量赋值
public MyWorker(String name, List<Runnable> tasks) {
super(name);
this.tasks = tasks;
}
@Override
public void run() {
//判断集合中是否有任务,只要有,就一直执行任务
while (tasks.size()>0){
Runnable r = tasks.remove(0);
r.run();
}
}
}
MyThreadPool
package com.itheima.demo01;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
/*
这是自定义的线程池类;
成员变量:
1:任务队列 集合 需要控制线程安全问题
2:当前线程数量
3:核心线程数量
4:最大线程数量
5:任务队列的长度
成员方法
1:提交任务;
将任务添加到集合中,需要判断是否超出了任务总长度
2:执行任务;
判断当前线程的数量,决定创建核心线程还是非核心线程
*/
public class MyThreadPool {
// 1:任务队列 集合 需要控制线程安全问题
private List<Runnable> tasks = Collections.synchronizedList(new LinkedList<>());
//2:当前线程数量
private int num;
//3:核心线程数量
private int corePoolSize;
//4:最大线程数量
private int maxSize;
//5:任务队列的长度
private int workSize;
public MyThreadPool(int corePoolSize, int maxSize, int workSize) {
this.corePoolSize = corePoolSize;
this.maxSize = maxSize;
this.workSize = workSize;
}
//1:提交任务;
public void submit(Runnable r){
//判断当前集合中任务的数量,是否超出了最大任务数量
if(tasks.size()>=workSize){
System.out.println("任务:"+r+"被丢弃了...");
}else {
tasks.add(r);
//执行任务
execTask(r);
}
}
//2:执行任务;
private void execTask(Runnable r) {
//判断当前线程池中的线程总数量,是否超出了核心数,
if(num < corePoolSize){
new MyWorker("核心线程:"+num,tasks).start();
num++;
}else if(num < maxSize){
new MyWorker("非核心线程:"+num,tasks).start();
num++;
}else {
System.out.println("任务:"+r+" 被缓存了...");
}
}
}
MyTest
package com.itheima.demo01;
/*
测试类:
1: 创建线程池类对象;
2: 提交多个任务
*/
public class MyTest {
public static void main(String[] args) {
//1:创建线程池类对象;
MyThreadPool pool = new MyThreadPool(2,4,20);
//2: 提交多个任务
for (int i = 0; i <30 ; i++) {
//3:创建任务对象,并提交给线程池
MyTask my = new MyTask(i);
pool.submit(my);
}
}
}
线程池的继承方式
java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。
在ThreadPoolExecutor类中提供了四个构造方法:
从JDK的API中可以查阅以下资料:
第一种构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue)使用给定的初始参数创建新的ThreadPoolExecutor ,默认线程工厂和默认拒绝执行处理程序。
使用Executors工厂方法之一而不是此通用构造函数可能更方便。
参数
corePoolSize – 池中保留的线程数,即使它们处于空闲状态,除非 allowCoreThreadTimeOut为 allowCoreThreadTimeOut
maximumPoolSize – 池中允许的最大线程数
keepAliveTime – 当线程数大于核心数时,这是多余空闲线程在终止之前等待新任务的最长时间。
unit – keepAliveTime参数的时间单位
workQueue – 在执行任务之前用于保存任务的队列。 此队列仅Runnable execute方法提交的Runnable任务。
异常
IllegalArgumentException – 如果满足以下条件之一:
corePoolSize < 0
keepAliveTime < 0
maximumPoolSize <= 0
maximumPoolSize < corePoolSize
NullPointerException – 如果 workQueue为空
第二种构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory)创建一个新的ThreadPoolExecutor给定的初始参数和default rejected execution handler 。
参数
corePoolSize – 池中保留的线程数,即使它们处于空闲状态,除非设置了 allowCoreThreadTimeOut
maximumPoolSize – 池中允许的最大线程数
keepAliveTime – 当线程数大于核心数时,这是多余空闲线程在终止之前等待新任务的最长时间。
unit – keepAliveTime参数的时间单位
workQueue – 在执行任务之前用于保存任务的队列。 此队列仅Runnable execute方法提交的Runnable任务。
threadFactory – 执行程序创建新线程时使用的工厂
异常
IllegalArgumentException – 如果满足以下条件之一:
corePoolSize < 0
keepAliveTime < 0
maximumPoolSize <= 0
maximumPoolSize < corePoolSize
NullPointerException – 如果 workQueue或 threadFactory为空
第三个构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
RejectedExecutionHandler handler)创建一个新的 ThreadPoolExecutor给定的初始参数和 default thread factory 。
参数
corePoolSize – 池中保留的线程数,即使它们处于空闲状态,除非设置了 allowCoreThreadTimeOut
maximumPoolSize – 池中允许的最大线程数
keepAliveTime – 当线程数大于核心数时,这是多余空闲线程在终止之前等待新任务的最长时间。
unit – keepAliveTime参数的时间单位
workQueue – 在执行任务之前用于保存任务的队列。 此队列仅Runnable execute方法提交的Runnable任务。
handler – 由于达到线程边界和队列容量而阻止执行时使用的处理程序
异常
IllegalArgumentException – 如果满足以下条件之一:
corePoolSize < 0
keepAliveTime < 0
maximumPoolSize <= 0
maximumPoolSize < corePoolSize
NullPointerException – 如果 workQueue或 handler为空
第四种构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)使用给定的初始参数创建新的 ThreadPoolExecutor 。
参数
corePoolSize – 池中保留的线程数,即使它们处于空闲状态,除非设置了 allowCoreThreadTimeOut
maximumPoolSize – 池中允许的最大线程数
keepAliveTime – 当线程数大于核心数时,这是多余空闲线程在终止之前等待新任务的最长时间。
unit – keepAliveTime参数的时间单位
workQueue – 在执行任务之前用于保存任务的队列。 此队列仅Runnable execute方法提交的Runnable任务。
threadFactory – 执行程序创建新线程时使用的工厂
handler – 由于达到线程边界和队列容量而阻止执行时使用的处理程序
异常
IllegalArgumentException – 如果满足以下条件之一:
corePoolSize < 0
keepAliveTime < 0
maximumPoolSize <= 0
maximumPoolSize < corePoolSize
NullPointerException – 如果 workQueue或 threadFactory或 handler为空
从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。
下面解释下一下构造器中各个参数的含义:
corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
其实对阻塞队列了解的话,可以了解到有其中阻塞队列,我在之前的文章也谈过阻塞队列,分别有不同的策略方法
java多线程之——-阻塞队列—–线程池
线程池主要用的就是上面那三个阻塞队列
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。
线程池的排队策略与BlockingQueue有关。
threadFactory:线程工厂,主要用来创建线程;
handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。
1 Executor是一个顶层接口在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;
2 然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
3 抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;
4 然后ThreadPoolExecutor继承了类AbstractExecutorService。
在ThreadPoolExecutor类中有几个非常重要的方法:
Java内置线程池-ExecutorService介绍
ExecutorService接口是java内置的线程池接口,通过学习接口中的方法,可以快速的掌握java内置线程池的基本使用 常用方法:
void shutdown() 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
List shutdownNow() 停止所有正在执行的任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
Future submit(Callable task) 执行带返回值的任务,返回一个Future对象。
Future<?> submit(Runnable task) 执行 Runnable 任务,并返回一个表示该任务的 Future。
Future submit(Runnable task, T result) 执行 Runnable 任务,并返回一个表示该任务的 Future。
Java内置线程池-ExecutorService获取
获取ExecutorService可以利用JDK中的Executors 类中的静态方法,常用获取方式如下:
static ExecutorService newCachedThreadPool() 创建一个默认的线程池对象,里面的线程可重用,且在第一次使用时才创建
static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) 线程池中的所有线程都使用ThreadFactory来创建,这样的线程无需手动启动,自动执行;
static ExecutorService newFixedThreadPool(int nThreads) 创建一个可重用固定线程数的线程池
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) 创建一个可重用固定线程数的线程池且线程池中的所有线程都使用ThreadFactory来创建。
static ExecutorService newSingleThreadExecutor() 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) 创建一个使用单个 worker 线程的 Executor,且线程池中的所有线程都使用ThreadFactory来创建。
newCachedThreadPool
package com.test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/*
练习Executors获取ExecutorService,然后调用方法,提交任务;
*/
public class MyTest01 {
public static void main(String[] args) {
// test1();
test2();
}
//练习newCachedThreadPool方法
private static void test1() {
//1:使用工厂类获取线程池对象
ExecutorService es = Executors.newCachedThreadPool();
//2:提交任务;
for (int i = 1; i <=10 ; i++) {
es.submit(new MyRunnable(i));
}
}
private static void test2() {
//1:使用工厂类获取线程池对象
ExecutorService es = Executors.newCachedThreadPool(new ThreadFactory() {
int n=1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"自定义的线程名称"+n++);
}
});
//2:提交任务;
for (int i = 1; i <=10 ; i++) {
es.submit(new MyRunnable(i));
}
}
}
/*
任务类,包含一个任务编号,在任务中,打印出是哪一个线程正在执行任务
*/
class MyRunnable implements Runnable{
private int id;
public MyRunnable(int id) {
this.id = id;
}
@Override
public void run() {
//获取线程的名称,打印一句话
String name = Thread.currentThread().getName();
System.out.println(name+"执行了任务..."+id);
}
}
newFixedThreadPool
package com.test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/*
练习Executors获取ExecutorService,然后调用方法,提交任务;
*/
public class MyTest02 {
public static void main(String[] args) {
//test1();
test2();
}
//练习方法newFixedThreadPool
private static void test1() {
//1:使用工厂类获取线程池对象
ExecutorService es = Executors.newFixedThreadPool(3);
//2:提交任务;
for (int i = 1; i <=10 ; i++) {
es.submit(new MyRunnable2(i));
}
}
private static void test2() {
//1:使用工厂类获取线程池对象
ExecutorService es = Executors.newFixedThreadPool(3,new ThreadFactory() {
int n=1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"自定义的线程名称"+n++);
}
});
//2:提交任务;
for (int i = 1; i <=10 ; i++) {
es.submit(new MyRunnable2(i));
}
}
}
/*
任务类,包含一个任务编号,在任务中,打印出是哪一个线程正在执行任务
*/
class MyRunnable2 implements Runnable{
private int id;
public MyRunnable2(int id) {
this.id = id;
}
@Override
public void run() {
//获取线程的名称,打印一句话
String name = Thread.currentThread().getName();
System.out.println(name+"执行了任务..."+id);
}
}
newSingleThreadExecutor
package com.test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/*
练习Executors获取ExecutorService,然后调用方法,提交任务;
*/
public class MyTest03 {
public static void main(String[] args) {
//test1();
test2();
}
//练习方法newFixedThreadPool
private static void test1() {
//1:使用工厂类获取线程池对象
ExecutorService es = Executors.newSingleThreadExecutor();
//2:提交任务;
for (int i = 1; i <=10 ; i++) {
es.submit(new MyRunnable3(i));
}
}
private static void test2() {
//1:使用工厂类获取线程池对象
ExecutorService es = Executors.newSingleThreadExecutor(new ThreadFactory() {
int n=1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"自定义的线程名称"+n++);
}
});
//2:提交任务;
for (int i = 1; i <=10 ; i++) {
es.submit(new MyRunnable3(i));
}
}
}
/*
任务类,包含一个任务编号,在任务中,打印出是哪一个线程正在执行任务
*/
class MyRunnable3 implements Runnable{
private int id;
public MyRunnable3(int id) {
this.id = id;
}
@Override
public void run() {
//获取线程的名称,打印一句话
String name = Thread.currentThread().getName();
System.out.println(name+"执行了任务..."+id);
}
}
练习Executors获取ExecutorService,测试关闭线程池的方法;
package com.test;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/*
练习Executors获取ExecutorService,测试关闭线程池的方法;
*/
public class MyTest04 {
public static void main(String[] args) {
test1();
// test2();
}
//练习方法newFixedThreadPool
private static void test1() {
//1:使用工厂类获取线程池对象
ExecutorService es = Executors.newSingleThreadExecutor();
//2:提交任务;
for (int i = 1; i <=10 ; i++) {
es.submit(new MyRunnable4(i));
}
//3:关闭线程池,仅仅是不再接受新的任务,以前的任务还会继续执行
es.shutdown();
//es.submit(new MyRunnable4(888));//不能再提交新的任务了
}
private static void test2() {
//1:使用工厂类获取线程池对象
ExecutorService es = Executors.newSingleThreadExecutor(new ThreadFactory() {
int n=1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"自定义的线程名称"+n++);
}
});
//2:提交任务;
for (int i = 1; i <=10 ; i++) {
es.submit(new MyRunnable4(i));
}
//3:立刻关闭线程池,如果线程池中还有缓存的任务,没有执行,则取消执行,并返回这些任务
List<Runnable> list = es.shutdownNow();
System.out.println(list);
}
}
/*
任务类,包含一个任务编号,在任务中,打印出是哪一个线程正在执行任务
*/
class MyRunnable4 implements Runnable{
private int id;
public MyRunnable4(int id) {
this.id = id;
}
@Override
public void run() {
//获取线程的名称,打印一句话
String name = Thread.currentThread().getName();
System.out.println(name+"执行了任务..."+id);
}
@Override
public String toString() {
return "MyRunnable4{" +
"id=" + id +
'}';
}
}
Java内置线程池-ScheduledExecutorService
ScheduledExecutorService是ExecutorService的子接口,具备了延迟运行或定期执行任务的能力,
常用获取方式如下:
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 创建一个可重用固定线程数的线程池且允许延迟运行或定期执行任务;
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) 创建一个可重用固定线程数的线程池且线程池中的所有线程都使用ThreadFactory来创建,且允许延迟运行或定期执行任务;
static ScheduledExecutorService newSingleThreadScheduledExecutor() 创建一个单线程执行程序,它允许在给定延迟后运行命令或者定期地执行。
static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) 创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
ScheduledExecutorService常用方法如下
ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) 延迟时间单位是unit,数量是delay的时间后执行callable。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) 延迟时间单位是unit,数量是delay的时间后执行command。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 延迟时间单位是unit,数量是initialDelay的时间后,每间隔period时间重复执行一次command。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟
newScheduledThreadPool的schedule
package com.test.demo3;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/*
测试ScheduleExecutorService接口中延迟执行任务和重复执行任务的功能
*/
public class ScheduleExecutorServiceDemo01 {
public static void main(String[] args) {
//1:获取一个具备延迟执行任务的线程池对象
ScheduledExecutorService es = Executors.newScheduledThreadPool(3);
//2:创建多个任务对象,提交任务,每个任务延迟2秒执行
for (int i=1;i<=10;i++){
es.schedule(new MyRunnable(i),2, TimeUnit.SECONDS);
}
System.out.println("over");
}
}
class MyRunnable implements Runnable{
private int id;
public MyRunnable(int id) {
this.id = id;
}
@Override
public void run() {
String name = Thread.currentThread().getName();
System.out.println(name+"执行了任务:"+id);
}
}
scheduleAtFixedRate方法
package com.test.demo3;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/*
测试ScheduleExecutorService接口中延迟执行任务和重复执行任务的功能
*/
public class ScheduleExecutorServiceDemo02 {
public static void main(String[] args) {
//1:获取一个具备延迟执行任务的线程池对象
ScheduledExecutorService es = Executors.newScheduledThreadPool(3, new ThreadFactory() {
int n = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"自定义线程名:"+n++);
}
});
//2:创建多个任务对象,提交任务,每个任务延迟2秒执行
es.scheduleAtFixedRate(new MyRunnable2(1),1,2,TimeUnit.SECONDS);
System.out.println("over");
}
}
class MyRunnable2 implements Runnable{
private int id;
public MyRunnable2(int id) {
this.id = id;
}
@Override
public void run() {
String name = Thread.currentThread().getName();
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name+"执行了任务:"+id);
}
}
Java内置线程池-异步计算结果(Future)
我们刚刚在学习java内置线程池使用时,没有考虑线程计算的结果,但开发中,我们有时需要利用线程进行一些计算,然后获取这些计算的结果,而java中的Future接口就是专门用于描述异步计算结果的,我们可以通过Future 对象获取线程计算的结果;
Future 的常用方法如下:
boolean cancel(boolean mayInterruptIfRunning) 试图取消对此任务的执行。
V get() 如有必要,等待计算完成,然后获取其结果。
V get(long timeout, TimeUnit unit) 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
boolean isCancelled() 如果在任务正常完成前将其取消,则返回 true。
boolean isDone() 如果任务已完成,则返回 true。
package com.itheima.demo04;
import java.util.concurrent.*;
/*
练习异步计算结果
*/
public class FutureDemo {
public static void main(String[] args) throws Exception {
//1:获取线程池对象
ExecutorService es = Executors.newCachedThreadPool();
//2:创建Callable类型的任务对象
Future<Integer> f = es.submit(new MyCall(1, 1));
//3:判断任务是否已经完成
//test1(f);
boolean b = f.cancel(true);
//System.out.println("取消任务执行的结果:"+b);
//Integer v = f.get(1, TimeUnit.SECONDS);//由于等待时间过短,任务来不及执行完成,会报异常
//System.out.println("任务执行的结果是:"+v);
}
//正常测试流程
private static void test1(Future<Integer> f) throws InterruptedException, ExecutionException {
boolean done = f.isDone();
System.out.println("第一次判断任务是否完成:"+done);
boolean cancelled = f.isCancelled();
System.out.println("第一次判断任务是否取消:"+cancelled);
Integer v = f.get();//一直等待任务的执行,直到完成为止
System.out.println("任务执行的结果是:"+v);
boolean done2 = f.isDone();
System.out.println("第二次判断任务是否完成:"+done2);
boolean cancelled2 = f.isCancelled();
System.out.println("第二次判断任务是否取消:"+cancelled2);
}
}
class MyCall implements Callable<Integer>{
private int a;
private int b;
//通过构造方法传递两个参数
public MyCall(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public Integer call() throws Exception {
String name = Thread.currentThread().getName();
System.out.println(name+"准备开始计算...");
Thread.sleep(2000);
System.out.println(name+"计算完成...");
return a+b;
}
}
综合案例-秒杀商品
案例介绍:
假如某网上商城推出活动,新上架10部新手机免费送客户体验,要求所有参与活动的人员在规定的时间同时参与秒杀挣抢,假如有20人同时参与了该活动,请使用线程池模拟这个场景,保证前10人秒杀成功,后10人秒杀失败;
要求:
使用线程池创建线程
解决线程安全问题
思路提示:
既然商品总数量是10个,那么我们可以在创建线程池的时候初始化线程数是10个及以下,设计线程池最大数量为10个;
当某个线程执行完任务之后,可以让其他秒杀的人继续使用该线程参与秒杀;
使用synchronized控制线程安全,防止出现错误数据;
代码步骤:
编写任务类,主要是送出手机给秒杀成功的客户;
编写主程序类,创建20个任务(模拟20个客户);
创建线程池对象并接收20个任务,开始执行任务;
package com.itheima.demo05;
/*
任务类:
包含了商品数量,客户名称,送手机的行为;
*/
public class MyTask implements Runnable {
//设计一个变量,用于表示商品的数量
private static int id = 10;
//表示客户名称的变量
private String userName;
public MyTask(String userName) {
this.userName = userName;
}
@Override
public void run() {
String name = Thread.currentThread().getName();
System.out.println(userName+"正在使用"+name+"参与秒杀任务...");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (MyTask.class){
if(id>0){
System.out.println(userName+"使用"+name+"秒杀:"+id-- +"号商品成功啦!");
}else {
System.out.println(userName+"使用"+name+"秒杀失败啦!");
}
}
}
}
package com.itheima.demo05;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/*
主程序类,测试任务类
*/
public class MyTest {
public static void main(String[] args) {
//1:创建一个线程池对象
ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,1, TimeUnit.MINUTES,new LinkedBlockingQueue<>(15));
//2:循环创建任务对象
for (int i = 1; i <=20 ; i++) {
MyTask myTask = new MyTask("客户"+i);
pool.submit(myTask);
}
//3:关闭线程池
pool.shutdown();
}
}
案例二介绍–取钱:
设计一个程序,使用两个线程模拟在两个地点同时从一个账号中取钱,假如卡中一共有1000元,每个线程取800元,要求演示结果一个线程取款成功,剩余200元,另一个线程取款失败,余额不足;
要求:
使用线程池创建线程
解决线程安全问题
思路提示:
线程池可以利用Executors工厂类的静态方法,创建线程池对象;
解决线程安全问题可以使用synchronized方法控制取钱的操作
在取款前,先判断余额是否足够,且保证余额判断和取钱行为的原子性;
package com.itheima.demo06;
public class MyTask implements Runnable {
//用户姓名
private String userName;
//取款金额
private double money;
//总金额
private static double total = 1000;
public MyTask(String userName, double money) {
this.userName = userName;
this.money = money;
}
@Override
public void run() {
String name = Thread.currentThread().getName();
System.out.println(userName+"正在准备使用"+name+"取款:"+money+"元");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (MyTask.class){
if(total-money>0){
System.out.println(userName+"使用"+name+"取款:"+money+"元成功,余额:"+(total-money));
total-=money;
}else {
System.out.println(userName+"使用"+name+"取款:"+money+"元失败,余额:"+total);
}
}
}
}
package com.itheima.demo06;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class MyTest {
public static void main(String[] args) {
//1:创建线程池对象
ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {
int id = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "ATM" + id++);
}
});
//2:创建两个任务并提交
for (int i = 1; i <=2 ; i++) {
MyTask myTask = new MyTask("客户" + i, 800);
pool.submit(myTask);
}
//3:关闭线程池
pool.shutdown();
}
}
**
线程池总结
**
线程池的使用步骤可以归纳总结为五步 :
利用Executors工厂类的静态方法,创建线程池对象;
编写Runnable或Callable实现类的实例对象;
利用ExecutorService的submit方法或ScheduledExecutorService的schedule方 法提交并执行线程任务
如果有执行结果,则处理异步执行结果(Future)
调用shutdown()方法,关闭线程池
参考文章:
线程池很难么?带你从头到尾捋一遍,不信你听不懂!