使用信号量实现

前提是熟悉信号量Semaphore的使用方式,尤其是release()方法,Semaphorerelease之前不必一定要先acquire(如果不熟悉Semaphore,可以参考阅读【多线程与并发】Java并发工具类)。

There is no requirement that a thread that releases a permit must have acquired that permit by calling acquire.Correct usage of a semaphore is established by programming convention in the application.

参考代码

生产者

class Producer extends Thread {
    private String threadName;
    private Queue<Goods> queue;
    private Semaphore queueSizeSemaphore;
    private Semaphore concurrentWriteSemaphore;
    private Semaphore notEmptySemaphore;

    public Producer(String threadName, Queue<Goods> queue, Semaphore concurrentWriteSemaphore, Semaphore queueSizeSemaphore, Semaphore notEmptySemaphore) {
        this.threadName = threadName;
        this.queue = queue;
        this.concurrentWriteSemaphore = concurrentWriteSemaphore;
        this.queueSizeSemaphore = queueSizeSemaphore;
        this.notEmptySemaphore = notEmptySemaphore;
    }

    @Override
    public void run() {
        while (true) {
            //模拟生产过程中的耗时操作
            Goods goods = new Goods();
            try {
                Thread.sleep(new Random().nextInt(100));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            try {
                queueSizeSemaphore.acquire();//获取队列未满的信号量
                concurrentWriteSemaphore.acquire();//获取读写的信号量
                queue.add(goods);
                System.out.println("【" + threadName + "】生产了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                concurrentWriteSemaphore.release();
                notEmptySemaphore.release();
            }
        }
    }
}

消费者

class Consumer extends Thread {
    private String threadName;
    private Queue<Goods> queue;
    private Semaphore queueSizeSemaphore;
    private Semaphore concurrentWriteSemaphore;
    private Semaphore notEmptySemaphore;

    public Consumer(String threadName, Queue<Goods> queue, Semaphore concurrentWriteSemaphore, Semaphore queueSizeSemaphore, Semaphore notEmptySemaphore) {
        this.threadName = threadName;
        this.queue = queue;
        this.concurrentWriteSemaphore = concurrentWriteSemaphore;
        this.queueSizeSemaphore = queueSizeSemaphore;
        this.notEmptySemaphore = notEmptySemaphore;
    }

    @Override
    public void run() {
        while (true) {
            Goods goods;
            try {
                notEmptySemaphore.acquire();
                concurrentWriteSemaphore.acquire();
                goods = queue.remove();
                System.out.println("【" + threadName + "】生产了一个商品:【" + goods.toString() + "】,目前商品数量:" + queue.size());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                concurrentWriteSemaphore.release();
                queueSizeSemaphore.release();
            }

            //模拟消费过程中的耗时操作
            try {
                Thread.sleep(new Random().nextInt(100));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

测试

public class ProducerConsumer {
    @Test
    public void test() {

        int maxSize = 5;
        Queue<Goods> queue = new LinkedList<>();
        Semaphore concurrentWriteSemaphore = new Semaphore(1);
        Semaphore notEmptySemaphore = new Semaphore(0);
        Semaphore queueSizeSemaphore = new Semaphore(maxSize);


        Thread producer1 = new ProducerConsumer4.Producer("生产者1", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
        Thread producer2 = new ProducerConsumer4.Producer("生产者2", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
        Thread producer3 = new ProducerConsumer4.Producer("生产者3", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);

        Thread consumer1 = new ProducerConsumer4.Consumer("消费者1", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
        Thread consumer2 = new ProducerConsumer4.Consumer("消费者2", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
        Thread consumer3 = new ProducerConsumer4.Consumer("消费者3", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);


        producer1.start();
        producer2.start();
        producer3.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
        while (true) {
        }
    }
}

要注意的地方

①理解代码中的三个信号量的含义 queueSizeSemaphore:(其中的许可证数量,可以理解为队列中可以再放入多少个元素),该信号量的许可证初始数量为仓库大小,即maxSize;生产者每放置一个商品,则该信号量-1,即执行acquire(),表示队列中已经添加了一个元素,要减少一个许可证;消费者每取出一个商品,该信号量+1,即执行release(),表示队列中已经少了一个元素,再给你一个许可证。 notEmptySemaphore:(其中的许可证数量,可以理解为队列中可以取出多少个元素),该信号量的许可证初始数量为0;生产者每放置一个商品,则该信号量+1,即执行release(),表示队列中添加了一个元素;消费者每取出一个商品,该信号量-1,即执行acquire(),表示队列中已经少了一个元素,要减少一个许可证; concurrentWriteSemaphore,相当于一个写锁,在放入或取出商品的时候,都需要先获取再释放许可证。

②由于实现中,使用了concurrentWriteSemaphore实现了对队列并发写的控制,在同一时刻,只能对队列进行一种操作:放入或取出。假如把concurrentWriteSemaphore中的信号量初始化为2或者2以上的值,就会出现多个生产者同时放入或多个消费者同时消费的情况,而使用的LinkedList是不允许并发进行这种修改的,否则会出现溢出或取空的情况。所以,concurrentWriteSemaphore只能设置为1,也就导致性能与使用wait() / notify()方式类似,性能不高。

Last updated