网站开发主要做什么/地推团队去哪里找
本月计划有一项,就是复习一下Java 的多线程包,这是Java自带的,用来实现多线程操作的工具类。
只求使用,暂时不看底层实现原理。
可以用在什么场景呢?
比如,同时对数据库发起数个查询请求的场景,通过多线程可以大大缩小读取时间,但又不能简单粗暴的直接起数据库连接,这是不允许的,会打满连接池,数据库会吃不消。
因此我们要使用多线程机制来控制并发的平衡。
这在工业级系统是非常常见的需求。
在网上找了一些指引,找到一篇不错的,分享给大家Overview of the java.util.concurrent | Baeldungwww.baeldung.com
以下是一些个人私货
其实多线程的使用,在Java本身是很简单的,只是新建一个线程,分配一个任务即可,原生API就很简单,那么这个包核心的功能在于什么呢?
在于提供了更细致的控制,比如线程池,超时时间,比如延迟执行,周期调度。
新建一个线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
提交一个任务执行
Future future = executorService.submit(() -> {
Thread.sleep(1 * 800);
return 0;
});
提交之后就会执行。
使用CountDownLatch进行线程控制
使用场景为,有一个任务,必须等待前置的N个任务完成,方能执行,那么使用CountDownLatch非常合适。
我们看下面这一个例子
//做线程池ExecutorService executorService = Executors.newFixedThreadPool(2);
//限制条件,初始为1CountDownLatch countDownLatch = new CountDownLatch(1);
//2个任务List> taskList = new ArrayList<>(2);
//线程A,沉睡3秒后,将countDownLatch减1taskList.add(() -> {
log.info("A 线程先执行,3秒后将countDown - 1");
Thread.sleep(3000);
log.info("A 准备将countDownLatch对象-1,当前: {}", countDownLatch.getCount());
countDownLatch.countDown();
log.info("A 将countDownLatch对象置为了:{}", countDownLatch.getCount());
return null;
});
//等待,直到countDownLatch对象为0taskList.add(() -> {
log.info("B 准备等待countDown对象,当前值: {}", countDownLatch.getCount());
countDownLatch.await();
log.info("B 获取到了countDown对象,当前值:{}", countDownLatch.getCount());
return null;
});
//同时唤醒任务executorService.invokeAll(taskList);
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
我们新建了一个线程池,使用CountDownLatch来控制线程阻塞(遇到countDownLatch.await()方法即阻塞,直到countDownLatch的值为0)
线程A等待3秒,打印准备结束日志,然后将countDownLatch减1(得0)
线程B先打印一句准备执行,之后等待countDownLatch为0,再打印一句结束。
提交之后,主线程等待10秒钟,等待2个线程完成。
14:12:03.775 [pool-1-thread-2] INFO com.example.demo.CountDownTest - B 准备等待countDown对象,当前值: 1
14:12:03.775 [pool-1-thread-1] INFO com.example.demo.CountDownTest - A 线程先执行,3秒后将countDown - 1
14:12:06.785 [pool-1-thread-1] INFO com.example.demo.CountDownTest - A 准备将countDownLatch对象-1,当前: 1
14:12:06.785 [pool-1-thread-1] INFO com.example.demo.CountDownTest - A 将countDownLatch对象置为了:0
14:12:06.785 [pool-1-thread-2] INFO com.example.demo.CountDownTest - B 获取到了countDown对象,当前值:0
executorService.awaitTermination(1, TimeUnit.MINUTES);
使用该方法可以使得主线程不会永远等待。
值得注意的是,这个对象是一次性的,count数目不能重置。
使用CyclicBarrier进行线程控制
与CountDownLatch类似。但是可以重置
示例代码:
long base = System.currentTimeMillis();
//有三个线程被阻塞时,执行回调//3个任务完成后,执行回调任务CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
log.info("the Key Task execute now");
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("the Key Task done");
});
ExecutorService executorService = Executors.newFixedThreadPool(3);
List> taskList = new ArrayList<>(3);
for (int i = 1 ; i <= 3; i ++ ){
final int index = i;
taskList.add(() -> {
Thread.sleep(index * 1000);
log.info("Task {} ready to wait the Key Task, with {} parties, with {} numberWaiting",index,cyclicBarrier.getParties(), cyclicBarrier.getNumberWaiting());
//a来指明到达顺序,数字越大,说明来的越早,最晚来的是0,最早是 (getParties() - 1) int a = cyclicBarrier.await();
Thread.sleep(index * 10);
log.info("Task {} can continue, with {} parties, with {} numberWaiting, {}",index,cyclicBarrier.getParties(), cyclicBarrier.getNumberWaiting(), a);
return null;
});
}
executorService.invokeAll(taskList);
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
输出如下:
14:49:44.618 [pool-1-thread-1] - Task 1 ready to wait the Key Task, with 3 parties, with 0 numberWaiting
14:49:45.613 [pool-1-thread-2] - Task 2 ready to wait the Key Task, with 3 parties, with 1 numberWaiting
14:49:46.613 [pool-1-thread-3] - Task 3 ready to wait the Key Task, with 3 parties, with 2 numberWaiting
14:49:46.613 [pool-1-thread-3] - the Key Task execute now
14:49:49.618 [pool-1-thread-3] - the Key Task done
14:49:49.631 [pool-1-thread-1] - Task 1 can continue, with 3 parties, with 0 numberWaiting, 2
14:49:49.642 [pool-1-thread-2] - Task 2 can continue, with 3 parties, with 0 numberWaiting, 1
14:49:49.653 [pool-1-thread-3] - Task 3 can continue, with 3 parties, with 0 numberWaiting, 0
如果这里将线程池的数目设置为2,则会一直等待下去。
可以看出这个类的使用意义在于,如果你有3个任务,他们执行到
int a = cyclicBarrier.await();
这一句之后,会触发cyclicBarrier执行回调任务,该任务完成后,3个线程才会继续往下执行。
Semaphore模拟资源进行控制
其实前面2个方法已经提供了良好的控制了,这里又提供了一个工具,Semaphore,模拟资源,来协调各个线程的运行。
假设我们有一个数据库,连接池只有3,但我们同时需要进行N次查询,那么我们就可以使用这个类来方便的控制。
新建一个线程池用于查询
ExecutorService executorService = Executors.newFixedThreadPool(10);
使用Semaphore来模拟数据库连接池
Semaphore semaphore = new Semaphore(3);
这是非常常见的使用用例,各个线程在进行SQL查询时,先去尝试获取连接池,如果没有资源了,就等待一个别的线程使用完毕,多余一个出来。
//一共5个DB查询任务ExecutorService executorService = Executors.newFixedThreadPool(5);
List> taskList = new ArrayList<>(5);
//同时只允许2个同时查询Semaphore dbLink = new Semaphore(2);
for (int i = 1 ; i <= 5; i ++ ){
final int index = i;
taskList.add(() -> {
//尝试获取DB连接 log.info("Task {} try for DB LINK: {}", index, dbLink.availablePermits());
if (dbLink.tryAcquire(2, TimeUnit.SECONDS)){
log.info("Task {} get DB LINK", index);
Thread.sleep(index * 1000);
log.info("Task {} ready to release DB LINK", index);
dbLink.release();
}else {
log.info("Task {} fail to get DB LINK", index);
}
return null;
});
}
executorService.invokeAll(taskList);
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
并发查询是工业级项目常有的需求,使用Semaphore非常方便!
输出如下:
15:00:57.531 - Task 1 try for DB LINK: 2
15:00:57.531 - Task 2 try for DB LINK: 2
15:00:57.531 - Task 5 try for DB LINK: 2
15:00:57.531 - Task 4 try for DB LINK: 2
15:00:57.531 - Task 3 try for DB LINK: 2
15:00:57.537 - Task 1 get DB LINK
15:00:57.537 - Task 2 get DB LINK
15:00:58.541 - Task 1 ready to release DB LINK
15:00:58.541 - Task 5 get DB LINK
15:00:59.540 - Task 2 ready to release DB LINK
15:00:59.540 - Task 3 fail to get DB LINK
15:00:59.540 - Task 4 fail to get DB LINK
15:01:03.546 - Task 5 ready to release DB LINK
我把输出结果优化了一下:
第一个输出块,可以看出,5个线程同时尝试获取limit为2的DB连接
第二个输出块,线程1,2获取到了DB连接,1,2开始执行任务
第三个输出块,线程1完成了任务,准备释放DB连接,于是线程5获取到了DB连接
第四个输出块,线程2也完成了任务,但是来不及了,达到了设定的最大等待时间,线程3,4未能完成任务,最后线程5完成释放,但是没有新的线程可以获取DB连接了。
使用Phaser进行多阶段的并发控制
想象我们现在需要把大象装进冰箱,众所周知需要三个阶段
那么Phaser相当于一个指挥,他指挥3个线程分别将各自的大象装进各自的冰箱,要求:
所有的线程都打开冰箱门后,才能开始装大象
所有的大象都装进冰箱后,才能关冰箱门
资源准备:
ExecutorService executorService = Executors.newFixedThreadPool(10);
List> callableList = new ArrayList<>();
Phaser phaser = new MyPhaser();
这里的Phaser是自己继承的一个类,重写了一个回调的方法
@Slf4j
public class MyPhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
log.info("now is " + phase + " register is " + registeredParties);
return super.onAdvance(phase, registeredParties);
}
}
三个线程准备:
for (int i = 1 ; i <=3 ; i ++){
int finalI = i;
callableList.add(() -> {
//注册一次表示phaser维护的线程个数 phaser.register();
log.info(finalI + "线程 已经打开冰箱门,等待其他线程");
phaser.arriveAndAwaitAdvance();
Thread.sleep(finalI * 1000);
log.info(finalI + "线程 大象装进冰箱了,等待其他线程");
phaser.arriveAndAwaitAdvance();
Thread.sleep(finalI * 1000);
log.info(finalI + "线程 冰箱门已关闭,结束流程");
phaser.arriveAndDeregister();
return finalI;
});
}
开始调度:
log.info(" ----- 开始把大象装进冰箱");
executorService.invokeAll(callableList);
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
log.info("final phaser " + phaser.getPhase());
打印如下:
17:23:47.371 [main] ConcurrentTest - ----- 开始把大象装进冰箱
17:23:47.374 [thread-2] ConcurrentTest - 1线程 已经打开冰箱门,等待其他线程
17:23:47.374 [thread-3] ConcurrentTest - 2线程 已经打开冰箱门,等待其他线程
17:23:47.374 [thread-4] ConcurrentTest - 3线程 已经打开冰箱门,等待其他线程
17:23:47.374 [thread-4] demo.MyPhaser - now is 0 register is 3
17:23:48.375 [thread-2] ConcurrentTest - 1线程 大象装进冰箱了,等待其他线程
17:23:49.378 [thread-3] ConcurrentTest - 2线程 大象装进冰箱了,等待其他线程
17:23:50.378 [thread-4] ConcurrentTest - 3线程 大象装进冰箱了,等待其他线程
17:23:50.378 [thread-4] demo.MyPhaser - now is 1 register is 3
17:23:51.383 [thread-2] ConcurrentTest - 1线程 冰箱门已关闭,结束流程
17:23:52.381 [thread-3] ConcurrentTest - 2线程 冰箱门已关闭,结束流程
17:23:53.379 [thread-4] ConcurrentTest - 3线程 冰箱门已关闭,结束流程
17:23:53.379 [thread-4] MyPhaser - now is 2 register is 0
17:23:56.398 [main] ConcurrentTest - final phaser -2147483645
BlockingQueue
阻塞队列是为了满足多线程之间共享数据使用的。
我在此前没有接触过在同一个JVM内,需要使用这种设计的模型,接触到这个比较难懂,放一个中文讲的很好的博客BlockingQueue - 不会就问咯 - 博客园www.cnblogs.com多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。
在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。
希望对你有帮助。