There is no requirement that a thread that releases a permit must have acquired that permit by calling acquire.Correct usage of a semaphore is established by programming convention in the application.
Copy class Producer extends Thread {
private String threadName;
private Queue < Goods > queue;
private Semaphore queueSizeSemaphore;
private Semaphore concurrentWriteSemaphore;
private Semaphore notEmptySemaphore;
public Producer(String threadName, Queue<Goods> queue, Semaphore concurrentWriteSemaphore, Semaphore queueSizeSemaphore, Semaphore notEmptySemaphore) {
this . threadName = threadName;
this . queue = queue;
this . concurrentWriteSemaphore = concurrentWriteSemaphore;
this . queueSizeSemaphore = queueSizeSemaphore;
this . notEmptySemaphore = notEmptySemaphore;
}
@ Override
public void run() {
while ( true ) {
//模拟生产过程中的耗时操作
Goods goods = new Goods() ;
try {
Thread . sleep ( new Random() . nextInt ( 100 ));
} catch ( InterruptedException e) {
e . printStackTrace ();
}
try {
queueSizeSemaphore . acquire (); //获取队列未满的信号量
concurrentWriteSemaphore . acquire (); //获取读写的信号量
queue . add (goods);
System . out . println ( "【" + threadName + "】生产了一个商品:【" + goods . toString () + "】,目前商品数量:" + queue . size ());
} catch ( InterruptedException e) {
e . printStackTrace ();
} finally {
concurrentWriteSemaphore . release ();
notEmptySemaphore . release ();
}
}
}
}
Copy class Consumer extends Thread {
private String threadName;
private Queue < Goods > queue;
private Semaphore queueSizeSemaphore;
private Semaphore concurrentWriteSemaphore;
private Semaphore notEmptySemaphore;
public Consumer(String threadName, Queue<Goods> queue, Semaphore concurrentWriteSemaphore, Semaphore queueSizeSemaphore, Semaphore notEmptySemaphore) {
this . threadName = threadName;
this . queue = queue;
this . concurrentWriteSemaphore = concurrentWriteSemaphore;
this . queueSizeSemaphore = queueSizeSemaphore;
this . notEmptySemaphore = notEmptySemaphore;
}
@ Override
public void run() {
while ( true ) {
Goods goods;
try {
notEmptySemaphore . acquire ();
concurrentWriteSemaphore . acquire ();
goods = queue . remove ();
System . out . println ( "【" + threadName + "】生产了一个商品:【" + goods . toString () + "】,目前商品数量:" + queue . size ());
} catch ( InterruptedException e) {
e . printStackTrace ();
} finally {
concurrentWriteSemaphore . release ();
queueSizeSemaphore . release ();
}
//模拟消费过程中的耗时操作
try {
Thread . sleep ( new Random() . nextInt ( 100 ));
} catch ( InterruptedException e) {
e . printStackTrace ();
}
}
}
}
Copy public class ProducerConsumer {
@ Test
public void test () {
int maxSize = 5 ;
Queue < Goods > queue = new LinkedList <>();
Semaphore concurrentWriteSemaphore = new Semaphore( 1 ) ;
Semaphore notEmptySemaphore = new Semaphore( 0 ) ;
Semaphore queueSizeSemaphore = new Semaphore(maxSize) ;
Thread producer1 = new ProducerConsumer4.Producer("生产者1", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
Thread producer2 = new ProducerConsumer4.Producer("生产者2", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
Thread producer3 = new ProducerConsumer4.Producer("生产者3", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
Thread consumer1 = new ProducerConsumer4.Consumer("消费者1", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
Thread consumer2 = new ProducerConsumer4.Consumer("消费者2", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
Thread consumer3 = new ProducerConsumer4.Consumer("消费者3", queue, concurrentWriteSemaphore, queueSizeSemaphore, notEmptySemaphore);
producer1 . start ();
producer2 . start ();
producer3 . start ();
consumer1 . start ();
consumer2 . start ();
consumer3 . start ();
while ( true ) {
}
}
}