有java开发经验的朋友肯定知道Lock,不管是培训班还是自学,大家知道Lock代替的是synchronized关键字。那么Condition又是在哪里使用呢?

synchronized和Lock的简单使用

假设有一个对象同一时间只能被一个线程操作。

那么synchronized的实现方式是:在这个对象里设置一个任意属性并且赋值,例如Object类型的值。然后在所有方法的方法代码外加上锁,即可实现。

例如:

public class A {
    private Object o = new Object();
    public void check(){
        synchronized(o){
            // do something
        }
   } 
}

Lock的实现方式是:在对象里创建一个Lock属性并且赋值,在所有方法的方法代码外加一个lock.lock(),即可实现。

例如:

public class A {
    private Lock lock = new ReentrantLock();
    public void check(){
        try{
            lock.lock();
            // do something
        }finally{
            lock.unlock();
        }
   } 
}

在以上用法中,Lock使用了CAS技术实现了和synchronized关键字同样的功能。其作用是避免了线程进入BLOCKED状态(Thread.State类),在操作系统中避免了因线程自身阻塞引起的线程切换,减少了上下文切换引起的操作系统内核CPU消耗。

生产者-消费者问题

生产者-消费者(producer-consumer)问题,也称作有界缓冲区(bounded-buffer)问题,两个进程共享一个公共的固定大小的缓冲区。
其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者,用于从缓冲区中取出消息。

问题出现在当缓冲区已经满了,而此时生产者还想向其中放入一个新的数据项的情形,其解决方法是让生产者此时进行休眠,等待消费者从缓冲区中取走了一个或者多个数据后再去唤醒它。
同样地,当缓冲区已经空了,而消费者还想去取消息,此时也可以让消费者进行休眠,等待生产者放入一个或者多个数据时再唤醒它。

对于以上问题,在这里我们把共享的缓冲区叫做pool。

synchronized和wait/notify

使用synchronized关键字和Object.wait()/Object.notifyAll()/Object.notify()就可以解决该问题。

在生产端判断pool中是否还有数据,如果有数据,则调用pool.wait(),没有数据就加入数据后调用pool.notifyAll(),循环以上操作。

在消费端判断pool中是否还有数据,如果有数据则消耗数据后调用pool.notifyAll(),如果没有数据,则调用pool.wait(),循环以上操作。

在以上方法中,首先有一个问题就是生产者和消费者需要在需要对pool加synchronized关键字,这个上面说过了,会带来上下文切换。其次调用wait方法会让线程进入WAITING状态(Thread.State类),此状态也会带来线程上下文切换,操作系统内核消耗CPU增加。

我们再考虑一下多消费者或多生产者的情况,当一个(生产者/消费者)调用了notifyAll方法。其余的生产者消费者均会被唤醒,然而等待他们的是synchronized,其中只有一个能够真正使用pool,其余的继续被压制,万一一个生产者调用notifyAll方法唤起的是另一个生产者,完了,还得在来一次notifyAll。上下文切换更加频繁了。

Lock 和 Condition

既然上一小节synchronized的await方法都会带来上下文切换,那Lock的解决方法是什么呢?

Lock接口提供了newCondition()方法,不同的Condition可以区分不同类型的调用者。以下举一个例子来说明一下:

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author gavin
 */
public class LockMain {
    // 锁
    private static final Lock lock = new ReentrantLock();

    // pool中有数据时的条件
    private static Condition full = lock.newCondition();

    // pool中没有数据时的条件
    private static Condition empty = lock.newCondition();

    // 模拟一个缓冲区,可以看到缓冲区是不可并发的
    private static List<String> pool = new LinkedList<String>();

    // 生成add对象的唯一编号
    private static AtomicInteger addThreadNumber = new AtomicInteger(0);

    // 生成remove对象的唯一编号
    private static AtomicInteger removeThreadNumber = new AtomicInteger(0);

    // 生产的数据编号
    private static AtomicLong atomicLong = new AtomicLong(0);

    static class Add implements Runnable {

        public void run() {
            int number = addThreadNumber.addAndGet(1);
            int i = 10;
            while (i-- != 0) {
                try {
                    lock.lock();
                    //System.out.println(String.format("%s%s 获取了锁", "Add", number));
                    // 这里可以减缓一下运行速度,方便debug
//                    try {
//                        Thread.sleep((long) (Math.random() * 1000));
//                    } catch (InterruptedException e) {
//                        e.printStackTrace();
//                    }
                    try {
                        while (pool.size() != 0) {
                            //System.out.println(String.format("%s%s 等待池子变空", "Add", number));
                            empty.await();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    pool.add(String.format("Add线程%s 创建了 %s 号物品", number, atomicLong.addAndGet(1)));

                    full.signal();
                    //System.out.println(String.format("%s%s 发送已经变满", "Add", number));
                } finally {
                    lock.unlock();
                    //System.out.println(String.format("%s%s 释放了锁", "Add", number));
                }

            }
        }
    }

    static class Remove implements Runnable {

        public void run() {
            int number = removeThreadNumber.addAndGet(1);
            while (true) {
                try {
                    lock.lock();
                    //System.out.println(String.format("%s%s 获取了锁", "Remove", number));
                    // 这里可以减缓一下运行速度,方便debug
//                    try {
//                        Thread.sleep((long) (Math.random() * 1000));
//                    } catch (InterruptedException e) {
//                        e.printStackTrace();
//                    }

                    try {
                        while (pool.size() == 0) {
                            //System.out.println(String.format("%s%s 等待池子变满", "Remove", number));
                            full.await();
                        }

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    System.out.println((String.format("Remove线程%s 获取了 %s", number, pool.remove(0))));

                    empty.signal();
                    //System.out.println(String.format("%s%s 发送可能变空信号", "Remove", number));
                } finally {
                    lock.unlock();
                    //System.out.println(String.format("%s%s 释放了锁", "Remove", number));
                }

            }
        }
    }

    public static void main(String[] args) {
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(100);
        // 这里可以模拟各种情况
        // 例如1:1/1:N/N:1/N:N 的生产者消费者数量
        for (int i = 0; i < 1; i++) {
            executor.execute(new Add());
        }
        for (int i = 0; i < 10; i++) {
            executor.execute(new Remove());
        }

    }
}

执行以上方法Console中打印的结果是:

Remove线程1 获取了 Add线程1 创建了 1 号物品
Remove线程1 获取了 Add线程1 创建了 2 号物品
Remove线程2 获取了 Add线程1 创建了 3 号物品
Remove线程3 获取了 Add线程1 创建了 4 号物品
Remove线程4 获取了 Add线程1 创建了 5 号物品
Remove线程5 获取了 Add线程1 创建了 6 号物品
Remove线程6 获取了 Add线程1 创建了 7 号物品
Remove线程7 获取了 Add线程1 创建了 8 号物品
Remove线程8 获取了 Add线程1 创建了 9 号物品
Remove线程9 获取了 Add线程1 创建了 10 号物品

Lock中的Condition能带来什么好处呢?如果我们去阅读Condition实现的源码,会发现底层也是采用了CAS技术,也就是说不会带来因线程自身wait或block引起的上下文切换。其次,我们区分了两种条件,一个是有数据的条件full,一个是没有数据的条件empty,full的signal方法只会唤起调用full.await()的线程(也就是Remove类线程),empty的signal方法只会唤起调用empty.await()的线程(也就是Add类线程),保证了消费者只会唤起生产者,生产者只会唤起消费者。

还有没有别的办法?

回头看看前两节,我们发现有些东西是相同的。为什么要在生产者消费者上下那么大功夫?不能把pool做成可并发的吗?对的,是可以的,我们给pool做一个Proxy代理,在执行pool方法前,均调用synchronized或lock方法,这样就解决了以上的问题。

实际上,java的BlockingQueue接口的实现类中都采用了这种方式,在集合内部针对put和take有不同的条件Condition,保证了put完成后,只有调用take的线程被唤醒,take完成后,只有调用put的线程被唤醒。

标签: java, 并发, 集合, 阻塞容器, BlockingQueue, Lock, Condition

添加新评论