学习Lock中Condition的使用

有java开发经验的朋友肯定知道Lock,不管是培训班还是自学,大家知道Lock代替的是synchronized关键字。那么Condition又是在哪里使用呢?

synchronized和Lock的简单使用

假设有一个对象同一时间只能被一个线程操作。

那么synchronized的实现方式是:在这个对象里设置一个任意属性并且赋值,例如Object类型的值。然后在所有方法的方法代码外加上锁,即可实现。

例如:

1
2
3
4
5
6
7
8
public class A {
private Object o = new Object();
public void check(){
synchronized(o){
// do something
}
}
}

Lock的实现方式是:在对象里创建一个Lock属性并且赋值,在所有方法的方法代码外加一个lock.lock(),即可实现。

例如:

1
2
3
4
5
6
7
8
9
10
11
public class A {
private Lock lock = new ReentrantLock();
public void check(){
try{
lock.lock();
// do something
}finally{
lock.unlock();
}
}
}

在以上用法中,Lock使用了CAS技术实现了和synchronized关键字同样的功能。其作用是避免了线程进入BLOCKED状态(Thread.State类),在操作系统中避免了因线程自身阻塞引起的线程切换,减少了上下文切换引起的操作系统内核CPU消耗。

生产者-消费者问题

生产者-消费者(producer-consumer)问题,也称作有界缓冲区(bounded-buffer)问题,两个进程共享一个公共的固定大小的缓冲区。
其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者,用于从缓冲区中取出消息。

问题出现在当缓冲区已经满了,而此时生产者还想向其中放入一个新的数据项的情形,其解决方法是让生产者此时进行休眠,等待消费者从缓冲区中取走了一个或者多个数据后再去唤醒它。
同样地,当缓冲区已经空了,而消费者还想去取消息,此时也可以让消费者进行休眠,等待生产者放入一个或者多个数据时再唤醒它。

对于以上问题,在这里我们把共享的缓冲区叫做pool。

synchronized和wait/notify

使用synchronized关键字和Object.wait()/Object.notifyAll()/Object.notify()就可以解决该问题。

在生产端判断pool中是否还有数据,如果有数据,则调用pool.wait(),没有数据就加入数据后调用pool.notifyAll(),循环以上操作。

在消费端判断pool中是否还有数据,如果有数据则消耗数据后调用pool.notifyAll(),如果没有数据,则调用pool.wait(),循环以上操作。

在以上方法中,首先有一个问题就是生产者和消费者需要在需要对pool加synchronized关键字,这个上面说过了,会带来上下文切换。其次调用wait方法会让线程进入WAITING状态(Thread.State类),此状态也会带来线程上下文切换,操作系统内核消耗CPU增加。

我们再考虑一下多消费者或多生产者的情况,当一个(生产者/消费者)调用了notifyAll方法。其余的生产者消费者均会被唤醒,然而等待他们的是synchronized,其中只有一个能够真正使用pool,其余的继续被压制,万一一个生产者调用notifyAll方法唤起的是另一个生产者,完了,还得在来一次notifyAll。上下文切换更加频繁了。

Lock 和 Condition

既然上一小节synchronized的await方法都会带来上下文切换,那Lock的解决方法是什么呢?

Lock接口提供了newCondition()方法,不同的Condition可以区分不同类型的调用者。以下举一个例子来说明一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author gavin
*/
public class LockMain {
// 锁
private static final Lock lock = new ReentrantLock();

// pool中有数据时的条件
private static Condition full = lock.newCondition();

// pool中没有数据时的条件
private static Condition empty = lock.newCondition();

// 模拟一个缓冲区,可以看到缓冲区是不可并发的
private static List<String> pool = new LinkedList<String>();

// 生成add对象的唯一编号
private static AtomicInteger addThreadNumber = new AtomicInteger(0);

// 生成remove对象的唯一编号
private static AtomicInteger removeThreadNumber = new AtomicInteger(0);

// 生产的数据编号
private static AtomicLong atomicLong = new AtomicLong(0);

static class Add implements Runnable {

public void run() {
int number = addThreadNumber.addAndGet(1);
int i = 10;
while (i-- != 0) {
try {
lock.lock();
//System.out.println(String.format("%s%s 获取了锁", "Add", number));
// 这里可以减缓一下运行速度,方便debug
// try {
// Thread.sleep((long) (Math.random() * 1000));
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
try {
while (pool.size() != 0) {
//System.out.println(String.format("%s%s 等待池子变空", "Add", number));
empty.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
}

pool.add(String.format("Add线程%s 创建了 %s 号物品", number, atomicLong.addAndGet(1)));

full.signal();
//System.out.println(String.format("%s%s 发送已经变满", "Add", number));
} finally {
lock.unlock();
//System.out.println(String.format("%s%s 释放了锁", "Add", number));
}

}
}
}

static class Remove implements Runnable {

public void run() {
int number = removeThreadNumber.addAndGet(1);
while (true) {
try {
lock.lock();
//System.out.println(String.format("%s%s 获取了锁", "Remove", number));
// 这里可以减缓一下运行速度,方便debug
// try {
// Thread.sleep((long) (Math.random() * 1000));
// } catch (InterruptedException e) {
// e.printStackTrace();
// }

try {
while (pool.size() == 0) {
//System.out.println(String.format("%s%s 等待池子变满", "Remove", number));
full.await();
}

} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println((String.format("Remove线程%s 获取了 %s", number, pool.remove(0))));

empty.signal();
//System.out.println(String.format("%s%s 发送可能变空信号", "Remove", number));
} finally {
lock.unlock();
//System.out.println(String.format("%s%s 释放了锁", "Remove", number));
}

}
}
}

public static void main(String[] args) {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(100);
// 这里可以模拟各种情况
// 例如1:1/1:N/N:1/N:N 的生产者消费者数量
for (int i = 0; i < 1; i++) {
executor.execute(new Add());
}
for (int i = 0; i < 10; i++) {
executor.execute(new Remove());
}

}
}

执行以上方法Console中打印的结果是:

1
2
3
4
5
6
7
8
9
10
Remove线程1 获取了 Add线程1 创建了 1 号物品
Remove线程1 获取了 Add线程1 创建了 2 号物品
Remove线程2 获取了 Add线程1 创建了 3 号物品
Remove线程3 获取了 Add线程1 创建了 4 号物品
Remove线程4 获取了 Add线程1 创建了 5 号物品
Remove线程5 获取了 Add线程1 创建了 6 号物品
Remove线程6 获取了 Add线程1 创建了 7 号物品
Remove线程7 获取了 Add线程1 创建了 8 号物品
Remove线程8 获取了 Add线程1 创建了 9 号物品
Remove线程9 获取了 Add线程1 创建了 10 号物品

Lock中的Condition能带来什么好处呢?如果我们去阅读Condition实现的源码,会发现底层也是采用了CAS技术,也就是说不会带来因线程自身wait或block引起的上下文切换。其次,我们区分了两种条件,一个是有数据的条件full,一个是没有数据的条件empty,full的signal方法只会唤起调用full.await()的线程(也就是Remove类线程),empty的signal方法只会唤起调用empty.await()的线程(也就是Add类线程),保证了消费者只会唤起生产者,生产者只会唤起消费者。

还有没有别的办法?

回头看看前两节,我们发现有些东西是相同的。为什么要在生产者消费者上下那么大功夫?不能把pool做成可并发的吗?对的,是可以的,我们给pool做一个Proxy代理,在执行pool方法前,均调用synchronized或lock方法,这样就解决了以上的问题。

实际上,java的BlockingQueue接口的实现类中都采用了这种方式,在集合内部针对put和take有不同的条件Condition,保证了put完成后,只有调用take的线程被唤醒,take完成后,只有调用put的线程被唤醒。