队列(Queue)是个先前先出(First In First Out, FIFO)的数据结构。在JDK 5.0中新增了Blocking Queue,在多执行绪的情况下,如果Blocking Queue的内容为空,而有个执行绪试图从Queue中取出元素,则该执行绪会被Block,直到Queue有元素时才解除Block,反过来说,如果 Blocking Queue满了,而有个执行绪试图再把数据填入Queue中,则该执行绪会被Block,直到Queue中有元素被取走后解除Block。
BlockingQueue的几个主要操作为下:
|
方法 |
说明 |
|
add |
加入元素,如果队列是满的,则丢出IllegalStateException |
|
remove |
传回并从队列移除元素,如果队列是空的,则丢出NoSuchElementException |
|
element |
传回元素,如果队列是空的,则丢出NoSuchElementException |
|
offer |
加入元素并传回true,如果队列是满的,则传回false |
|
poll |
传回并从队列移除元素,如果队列是空的,则传回null |
|
peek |
传回元素,如果队列是空的,则传回null |
|
put |
加入元素,如果队列是满,就block |
|
take |
传回并移除元素,如果队列是空的,就block |
在java.util.concurrent下提供几种不同的Blocking
Queue,ArrayBlockingQueue要指定容量大小来建构,LinkedBlockingQueue预设没有容量上限,但也可以指定容量上限,PriorityBlockingQueue严格来说不是Queue,因为它是根据优先权(Priority)来移除元素。
在这边以 wait()、notify() 中的生产者、消费者程序为例,使用BlockQueue来加以改写,好处是我们不用亲自处理wait、notify的细节,首先生产者改写如下:
package onlyfun.caterpillar;
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private BlockingQueue queue;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
for(int product = 1; product <= 10; product++) {
try {
// wait for a random time
Thread.sleep((int) Math.random() * 3000);
queue.put(product);
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
}
消费者类别改写如下:
package onlyfun.caterpillar;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
private BlockingQueue queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
for(int i = 1; i <= 10; i++) {
try {
// wait for a random time
Thread.sleep((int) (Math.random() * 3000));
queue.take();
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
}
可以使用下面这个程序来简单的测试一下:
package onlyfun.caterpillar;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue queue = new ArrayBlockingQueue(1);
Thread producerThread = new Thread(
new Producer(queue));
Thread consumerThread = new Thread(
new Consumer(queue));
producerThread.start();
consumerThread.start();
}
}