JUC工具类
- JUC工具类: CountDownLatch详解
- JUC工具类: CyclicBarrier详解
- JUC工具类: Semaphore详解
- JUC工具类: Phaser详解
- JUC工具类: Exchanger详解
- Java 并发 - ThreadLocal详解
# CountDownLatch
# 1、什么是CountDownLatch
CountDownLatch底层也是由AQS,用来同步一个或多个任务的常用并发工具类,强制它们等待由其他任务执行的一组操作完成。
# 2、CountDownLatch底层实现原理?
其底层是由AQS提供支持,所以其数据结构可以参考AQS的数据结构,而AQS的数据结构核心就是两个虚拟队列: 同步队列sync queue 和条件队列condition queue,不同的条件会有不同的条件队列。CountDownLatch典型的用法是将一个程序分为n个互相独立的可解决任务,并创建值为n的CountDownLatch。当每一个任务完成时,都会在这个锁存器上调用countDown,等待问题被解决的任务调用这个锁存器的await,将他们自己拦住,直至锁存器计数结束。
# 3、CountDownLatch一次可以唤醒几个任务?
多个
# 4、CountDownLatch有哪些主要方法?
await(), 此函数将会使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
countDown(), 此函数将递减锁存器的计数,如果计数到达零,则释放所有等待的线程
# 5、题目
实现一个容器,提供两个方法,add,size 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束?
说出使用CountDownLatch 代替wait notify 好处?
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* 使用CountDownLatch 代替wait notify 好处是通讯方式简单,不涉及锁定 Count 值为0时当前线程继续执行,
*/
public class T3 {
volatile List list = new ArrayList();
public void add(int i){
list.add(i);
}
public int getSize(){
return list.size();
}
public static void main(String[] args) {
T3 t = new T3();
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(() -> {
System.out.println("t2 start");
if(t.getSize() != 5){
try {
countDownLatch.await();
System.out.println("t2 end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"t2").start();
new Thread(()->{
System.out.println("t1 start");
for (int i = 0;i<9;i++){
t.add(i);
System.out.println("add"+ i);
if(t.getSize() == 5){
System.out.println("countdown is open");
countDownLatch.countDown();
}
}
System.out.println("t1 end");
},"t1").start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# CyclicBarrier
# 1、什么是CyclicBarrier?
对于CountDownLatch,其他线程为游戏玩家,比如英雄联盟,主线程为控制游戏开始的线程。在所有的玩家都准备好之前,主线程是处于等待状态的,也就是游戏不能开始。当所有的玩家准备好之后,下一步的动作实施者为主线程,即开始游戏。
对于CyclicBarrier,假设有一家公司要全体员工进行团建活动,活动内容为翻越三个障碍物,每一个人翻越障碍物所用的时间是不一样的。但是公司要求所有人在翻越当前障碍物之后再开始翻越下一个障碍物,也就是所有人翻越第一个障碍物之后,才开始翻越第二个,以此类推。类比地,每一个员工都是一个“其他线程”。当所有人都翻越的所有的障碍物之后,程序才结束。而主线程可能早就结束了,这里我们不用管主线程。
# 2、CountDownLatch和CyclicBarrier对比?
CountDownLatch减计数,CyclicBarrier加计数。
CountDownLatch是一次性的,CyclicBarrier可以重用。
CountDownLatch和CyclicBarrier都有让多个线程等待同步然后再开始下一步动作的意思,但是CountDownLatch的下一步的动作实施者是主线程,具有不可重复性;而CyclicBarrier的下一步动作实施者还是“其他线程”本身,具有往复多次实施动作的特点。
# Semaphore
# 1、什么是Semaphore?
Semaphore底层是基于AbstractQueuedSynchronizer来实现的。Semaphore称为计数信号量,它允许n个任务同时访问某个资源,可以将信号量看做是在向外分发使用资源的许可证,只有成功获取许可证,才能使用资源
# 2、Semaphore内部原理?
Semaphore总共有三个内部类,并且三个内部类是紧密相关的,下面先看三个类的关系。
说明: Semaphore与ReentrantLock的内部类的结构相同,类内部总共存在Sync、NonfairSync、FairSync三个类,NonfairSync与FairSync类继承自Sync类,Sync类继承自AbstractQueuedSynchronizer抽象类。
# 3、单独使用Semaphore是不会使用到AQS的条件队列?
不同于CyclicBarrier和ReentrantLock,单独使用Semaphore是不会使用到AQS的条件队列的,其实,只有进行await操作才会进入条件队列,其他的都是在同步队列中,只是当前线程会被park。
# 4、问题:
# a、Semaphore初始化有10个令牌,11个线程同时各调用1次acquire方法,会发生什么?
拿不到令牌的线程阻塞,不会继续往下运行。
# b、Semaphore初始化有10个令牌,一个线程重复调用11次acquire方法,会发生什么?
线程阻塞,不会继续往下运行。可能你会考虑类似于锁的重入的问题,很好,但是,令牌没有重入的概念。你只要调用一次acquire方法,就需要有一个令牌才能继续运行。
# c、Semaphore初始化有1个令牌,1个线程调用一次acquire方法,然后调用两次release方法,之后另外一个线程调用acquire(2)方法,此线程能够获取到足够的令牌并继续运行吗?
能,原因是release方法会添加令牌,并不会以初始化的大小为准。
# d、Semaphore初始化有2个令牌,一个线程调用1次release方法,然后一次性获取3个令牌,会获取到吗?
能,原因是release会添加令牌,并不会以初始化的大小为准。Semaphore中release方法的调用并没有限制要在acquire后调用。代码如下:
public class TestSemaphore2 {
public static void main(String[] args) {
int permitsNum = 2;
final Semaphore semaphore = new Semaphore(permitsNum);
try {
System.out.println("availablePermits:"+semaphore.availablePermits()+",semaphore.tryAcquire(3,1, TimeUnit.SECONDS):"+semaphore.tryAcquire(3,1, TimeUnit.SECONDS));
semaphore.release();
System.out.println("availablePermits:"+semaphore.availablePermits()+",semaphore.tryAcquire(3,1, TimeUnit.SECONDS):"+semaphore.tryAcquire(3,1, TimeUnit.SECONDS));
}catch (Exception e) {
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
# Phaser
# 1、Phaser主要用来解决什么问题?
Phaser是JDK 7新增的一个同步辅助类,它可以实现CyclicBarrier和CountDownLatch类似的功能,而且它支持对任务的动态调整,并支持分层结构来达到更高的吞吐量。
# 2、Phaser与CyclicBarrier和CountDownLatch的区别是什么?
Phaser 和 CountDownLatch、CyclicBarrier 都有很相似的地方。
Phaser 顾名思义,就是可以分阶段的进行线程同步。
- CountDownLatch 只能在创建实例时,通过构造方法指定同步数量;
- Phaser 支持线程动态地向它注册。
利用这个动态注册的特性,可以达到分阶段同步控制的目的:
注册一批操作,等待它们执行结束;再注册一批操作,等它们结束...
# 3、Phaser运行机制是什么样的?
- Registration(注册)
跟其他barrier不同,在phaser上注册的parties会随着时间的变化而变化。任务可以随时注册(使用方法register,bulkRegister注册,或者由构造器确定初始parties),并且在任何抵达点可以随意地撤销注册(方法arriveAndDeregister)。就像大多数基本的同步结构一样,注册和撤销只影响内部count;不会创建更深的内部记录,所以任务不能查询他们是否已经注册。(不过,可以通过继承来实现类似的记录)
- Synchronization(同步机制)
和CyclicBarrier一样,Phaser也可以重复await。方法arriveAndAwaitAdvance的效果类似CyclicBarrier.await。phaser的每一代都有一个相关的phase number,初始值为0,当所有注册的任务都到达phaser时phase+1,到达最大值(Integer.MAX_VALUE)之后清零。使用phase number可以独立控制 到达phaser 和 等待其他线程 的动作,通过下面两种类型的方法:
Arrival(到达机制) arrive和arriveAndDeregister方法记录到达状态。这些方法不会阻塞,但是会返回一个相关的arrival phase number;也就是说,phase number用来确定到达状态。当所有任务都到达给定phase时,可以执行一个可选的函数,这个函数通过重写onAdvance方法实现,通常可以用来控制终止状态。重写此方法类似于为CyclicBarrier提供一个barrierAction,但比它更灵活。
Waiting(等待机制) awaitAdvance方法需要一个表示arrival phase number的参数,并且在phaser前进到与给定phase不同的phase时返回。和CyclicBarrier不同,即使等待线程已经被中断,awaitAdvance方法也会一直等待。中断状态和超时时间同样可用,但是当任务等待中断或超时后未改变phaser的状态时会遭遇异常。如果有必要,在方法forceTermination之后可以执行这些异常的相关的handler进行恢复操作,Phaser也可能被ForkJoinPool中的任务使用,这样在其他任务阻塞等待一个phase时可以保证足够的并行度来执行任务。
- Termination(终止机制) :
可以用isTerminated方法检查phaser的终止状态。在终止时,所有同步方法立刻返回一个负值。在终止时尝试注册也没有效果。当调用onAdvance返回true时Termination被触发。当deregistration操作使已注册的parties变为0时,onAdvance的默认实现就会返回true。也可以重写onAdvance方法来定义终止动作。forceTermination方法也可以释放等待线程并且允许它们终止。
- Tiering(分层结构) :
Phaser支持分层结构(树状构造)来减少竞争。注册了大量parties的Phaser可能会因为同步竞争消耗很高的成本, 因此可以设置一些子Phaser来共享一个通用的parent。这样的话即使每个操作消耗了更多的开销,但是会提高整体吞吐量。 在一个分层结构的phaser里,子节点phaser的注册和取消注册都通过父节点管理。子节点phaser通过构造或方法register、bulkRegister进行首次注册时,在其父节点上注册。子节点phaser通过调用arriveAndDeregister进行最后一次取消注册时,也在其父节点上取消注册。
- Monitoring(状态监控) :
由于同步方法可能只被已注册的parties调用,所以phaser的当前状态也可能被任何调用者监控。在任何时候,可以通过getRegisteredParties获取parties数,其中getArrivedParties方法返回已经到达当前phase的parties数。当剩余的parties(通过方法getUnarrivedParties获取)到达时,phase进入下一代。这些方法返回的值可能只表示短暂的状态,所以一般来说在同步结构里并没有啥卵用
# 4、给一个Phaser使用的示例?
模拟了100米赛跑,10名选手,只等裁判一声令下。当所有人都到达终点时,比赛结束。
public class Match {
// 模拟了100米赛跑,10名选手,只等裁判一声令下。当所有人都到达终点时,比赛结束。
public static void main(String[] args) throws InterruptedException {
final Phaser phaser=new Phaser(1) ;
// 十名选手
for (int index = 0; index < 10; index++) {
phaser.register();
new Thread(new player(phaser),"player"+index).start();
}
System.out.println("Game Start");
//注销当前线程,比赛开始
phaser.arriveAndDeregister();
//是否非终止态一直等待
while(!phaser.isTerminated()){
}
System.out.println("Game Over");
}
}
class player implements Runnable{
private final Phaser phaser ;
player(Phaser phaser){
this.phaser=phaser;
}
@Override
public void run() {
try {
// 第一阶段——等待创建好所有线程再开始
phaser.arriveAndAwaitAdvance();
// 第二阶段——等待所有选手准备好再开始
Thread.sleep((long) (Math.random() * 10000));
System.out.println(Thread.currentThread().getName() + " ready");
phaser.arriveAndAwaitAdvance();
// 第三阶段——等待所有选手准备好到达,到达后,该线程从phaser中注销,不在进行下面的阶段。
Thread.sleep((long) (Math.random() * 10000));
System.out.println(Thread.currentThread().getName() + " arrived");
phaser.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# Exchanger
# 1、Exchanger主要解决什么问题?
Exchanger用于进行两个线程之间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange()方法交换数据,当一个线程先执行exchange()方法后,它会一直等待第二个线程也执行exchange()方法,当这两个线程到达同步点时,这两个线程就可以交换数据了。
# 2、对比SynchronousQueue,为什么说Exchanger可被视为 SynchronousQueue 的双向形式?
Exchanger是一种线程间安全交换数据的机制。可以和之前分析过的SynchronousQueue对比一下:线程A通过SynchronousQueue将数据a交给线程B;线程A通过Exchanger和线程B交换数据,线程A把数据a交给线程B,同时线程B把数据b交给线程A。可见,SynchronousQueue是交给一个数据,Exchanger是交换两个数据。
# 3、Exchanger在不同的JDK版本中实现有什么差别?
在JDK5中Exchanger被设计成一个容量为1的容器,存放一个等待线程,直到有另外线程到来就会发生数据交换,然后清空容器,等到下一个到来的线程。
从JDK6开始,Exchanger用了类似ConcurrentMap的分段思想,提供了多个slot,增加了并发执行时的吞吐量。
# 4、Exchanger实现举例
来一个非常经典的并发问题:你有相同的数据buffer,一个或多个数据生产者,和一个或多个数据消费者。只是Exchange类只能同步2个线程,所以你只能在你的生产者和消费者问题中只有一个生产者和一个消费者时使用这个类。
public class Test {
static class Producer extends Thread {
private Exchanger<Integer> exchanger;
private static int data = 0;
Producer(String name, Exchanger<Integer> exchanger) {
super("Producer-" + name);
this.exchanger = exchanger;
}
@Override
public void run() {
for (int i=1; i<5; i++) {
try {
TimeUnit.SECONDS.sleep(1);
data = i;
System.out.println(getName()+" 交换前:" + data);
data = exchanger.exchange(data);
System.out.println(getName()+" 交换后:" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Consumer extends Thread {
private Exchanger<Integer> exchanger;
private static int data = 0;
Consumer(String name, Exchanger<Integer> exchanger) {
super("Consumer-" + name);
this.exchanger = exchanger;
}
@Override
public void run() {
while (true) {
data = 0;
System.out.println(getName()+" 交换前:" + data);
try {
TimeUnit.SECONDS.sleep(1);
data = exchanger.exchange(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName()+" 交换后:" + data);
}
}
}
public static void main(String[] args) throws InterruptedException {
Exchanger<Integer> exchanger = new Exchanger<Integer>();
new Producer("", exchanger).start();
new Consumer("", exchanger).start();
TimeUnit.SECONDS.sleep(7);
System.exit(-1);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
可以看到,其结果可能如下:
Consumer- 交换前:0
Producer- 交换前:1
Consumer- 交换后:1
Consumer- 交换前:0
Producer- 交换后:0
Producer- 交换前:2
Producer- 交换后:0
Consumer- 交换后:2
Consumer- 交换前:0
Producer- 交换前:3
Producer- 交换后:0
Consumer- 交换后:3
Consumer- 交换前:0
Producer- 交换前:4
Producer- 交换后:0
Consumer- 交换后:4
Consumer- 交换前:0
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# ThreadLocal
# 1、什么是ThreadLocal? 用来解决什么问题的?
我们在Java 并发 - 并发理论基础总结过线程安全(是指广义上的共享资源访问安全性,因为线程隔离是通过副本保证本线程访问资源安全性,它不保证线程之间还存在共享关系的狭义上的安全性)的解决思路:
- 互斥同步: synchronized 和 ReentrantLock
- 非阻塞同步: CAS, AtomicXXXX
- 无同步方案: 栈封闭,本地存储(Thread Local),可重入代码
ThreadLocal是通过线程隔离的方式防止任务在共享资源上产生冲突, 线程本地存储是一种自动化机制,可以为使用相同变量的每个不同线程都创建不同的存储。
ThreadLocal是一个将在多线程中为每一个线程创建单独的变量副本的类; 当使用ThreadLocal来维护变量时, ThreadLocal会为每个线程创建单独的变量副本, 避免因多线程操作共享变量而导致的数据不一致的情况。
# 2、说说你对ThreadLocal的理解
提到ThreadLocal被提到应用最多的是session管理和数据库链接管理,这里以数据访问为例帮助你理解ThreadLocal:
- 如下数据库管理类在单线程使用是没有任何问题的
class ConnectionManager {
private static Connection connect = null;
public static Connection openConnection() {
if (connect == null) {
connect = DriverManager.getConnection();
}
return connect;
}
public static void closeConnection() {
if (connect != null)
connect.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
很显然,在多线程中使用会存在线程安全问题:第一,这里面的2个方法都没有进行同步,很可能在openConnection方法中会多次创建connect;第二,由于connect是共享变量,那么必然在调用connect的地方需要使用到同步来保障线程安全,因为很可能一个线程在使用connect进行数据库操作,而另外一个线程调用closeConnection关闭链接。
- 为了解决上述线程安全的问题,第一考虑:互斥同步
你可能会说,将这段代码的两个方法进行同步处理,并且在调用connect的地方需要进行同步处理,比如用Synchronized或者ReentrantLock互斥锁。
- 这里再抛出一个问题:这地方到底需不需要将connect变量进行共享?
事实上,是不需要的。假如每个线程中都有一个connect变量,各个线程之间对connect变量的访问实际上是没有依赖关系的,即一个线程不需要关心其他线程是否对这个connect进行了修改的。即改后的代码可以这样:
class ConnectionManager {
private Connection connect = null;
public Connection openConnection() {
if (connect == null) {
connect = DriverManager.getConnection();
}
return connect;
}
public void closeConnection() {
if (connect != null)
connect.close();
}
}
class Dao {
public void insert() {
ConnectionManager connectionManager = new ConnectionManager();
Connection connection = connectionManager.openConnection();
// 使用connection进行操作
connectionManager.closeConnection();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
这样处理确实也没有任何问题,由于每次都是在方法内部创建的连接,那么线程之间自然不存在线程安全问题。但是这样会有一个致命的影响:导致服务器压力非常大,并且严重影响程序执行性能。由于在方法中需要频繁地开启和关闭数据库连接,这样不仅严重影响程序执行效率,还可能导致服务器压力巨大。
- 这时候ThreadLocal登场了
那么这种情况下使用ThreadLocal是再适合不过的了,因为ThreadLocal在每个线程中对该变量会创建一个副本,即每个线程内部都会有一个该变量,且在线程内部任何地方都可以使用,线程之间互不影响,这样一来就不存在线程安全问题,也不会严重影响程序执行性能。下面就是网上出现最多的例子:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class ConnectionManager {
private static final ThreadLocal<Connection> dbConnectionLocal = new ThreadLocal<Connection>() {
@Override
protected Connection initialValue() {
try {
return DriverManager.getConnection("", "", "");
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
};
public Connection getConnection() {
return dbConnectionLocal.get();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 3、ThreadLocal是如何实现线程隔离的?
ThreadLocalMap
# 4、为什么ThreadLocal会造成内存泄露? 如何解决
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadLocalDemo {
static class LocalVariable {
private Long[] a = new Long[1024 * 1024];
}
// (1)
final static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>());
// (2)
final static ThreadLocal<LocalVariable> localVariable = new ThreadLocal<LocalVariable>();
public static void main(String[] args) throws InterruptedException {
// (3)
Thread.sleep(5000 * 4);
for (int i = 0; i < 50; ++i) {
poolExecutor.execute(new Runnable() {
public void run() {
// (4)
localVariable.set(new LocalVariable());
// (5)
System.out.println("use local varaible" + localVariable.get());
localVariable.remove();
}
});
}
// (6)
System.out.println("pool execute over");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
如果用线程池来操作ThreadLocal 对象确实会造成内存泄露, 因为对于线程池里面不会销毁的线程, 里面总会存在着<ThreadLocal, LocalVariable>的强引用, 因为final static 修饰的 ThreadLocal 并不会释放, 而ThreadLocalMap 对于 Key 虽然是弱引用, 但是强引用不会释放, 弱引用当然也会一直有值, 同时创建的LocalVariable对象也不会释放, 就造成了内存泄露; 如果LocalVariable对象不是一个大对象的话, 其实泄露的并不严重, 泄露的内存 = 核心线程数 * LocalVariable对象的大小;
所以, 为了避免出现内存泄露的情况, ThreadLocal提供了一个清除线程中对象的方法, 即 remove, 其实内部实现就是调用 ThreadLocalMap 的remove方法:
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
e.clear();
expungeStaleEntry(i);
return;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
找到Key对应的Entry, 并且清除Entry的Key(ThreadLocal)置空, 随后清除过期的Entry即可避免内存泄露。
# 5、还有哪些使用ThreadLocal的应用场景?
- 每个线程维护了一个“序列号”
public class SerialNum {
// The next serial number to be assigned
private static int nextSerialNum = 0;
private static ThreadLocal serialNum = new ThreadLocal() {
protected synchronized Object initialValue() {
return new Integer(nextSerialNum++);
}
};
public static int get() {
return ((Integer) (serialNum.get())).intValue();
}
}
+ 经典的另外一个例子:
```java
private static final ThreadLocal threadSession = new ThreadLocal();
public static Session getSession() throws InfrastructureException {
Session s = (Session) threadSession.get();
try {
if (s == null) {
s = getSessionFactory().openSession();
threadSession.set(s);
}
} catch (HibernateException ex) {
throw new InfrastructureException(ex);
}
return s;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
- 看看阿里巴巴 java 开发手册中推荐的 ThreadLocal 的用法:
import java.text.DateFormat;
import java.text.SimpleDateFormat;
public class DateUtils {
public static final ThreadLocal<DateFormat> df = new ThreadLocal<DateFormat>(){
@Override
protected DateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd");
}
};
}
2
3
4
5
6
7
8
9
10
11
然后我们再要用到 DateFormat 对象的地方,这样调用:
DateUtils.df.get().format(new Date());