ITEEDU

Java Gossip: BlockingQueue

队列(Queue)是个先前先出(First In First Out, FIFO)的数据结构。在JDK 5.0中新增了Blocking Queue,在多执行绪的情况下,如果Blocking Queue的内容为空,而有个执行绪试图从Queue中取出元素,则该执行绪会被Block,直到Queue有元素时才解除Block,反过来说,如果 Blocking Queue满了,而有个执行绪试图再把数据填入Queue中,则该执行绪会被Block,直到Queue中有元素被取走后解除Block。

BlockingQueue的几个主要操作为下:

方法

说明

add

加入元素,如果队列是满的,则丢出IllegalStateException

remove

传回并从队列移除元素,如果队列是空的,则丢出NoSuchElementException

element

传回元素,如果队列是空的,则丢出NoSuchElementException

offer

加入元素并传回true,如果队列是满的,则传回false

poll

传回并从队列移除元素,如果队列是空的,则传回null

peek

传回元素,如果队列是空的,则传回null

put

加入元素,如果队列是满,就block

take

传回并移除元素,如果队列是空的,就block

java.util.concurrent下提供几种不同的Blocking QueueArrayBlockingQueue要指定容量大小来建构,LinkedBlockingQueue预设没有容量上限,但也可以指定容量上限,PriorityBlockingQueue严格来说不是Queue,因为它是根据优先权(Priority)来移除元素。

在这边以 wait()、notify() 中的生产者、消费者程序为例,使用BlockQueue来加以改写,好处是我们不用亲自处理wait、notify的细节,首先生产者改写如下:

Producer.java
package onlyfun.caterpillar;
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
	private BlockingQueue queue;
	public Producer(BlockingQueue queue) {
		this.queue = queue;
	}
	public void run() {
		for(int product = 1; product <= 10; product++) {
			try {
				// wait for a random time
				Thread.sleep((int) Math.random() * 3000);
				queue.put(product);
			}
			catch(InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

消费者类别改写如下:

Consumer.java
package onlyfun.caterpillar;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
	private BlockingQueue queue;
	public Consumer(BlockingQueue queue) {
		this.queue = queue;
	}
	public void run() {
		for(int i = 1; i <= 10; i++) {
			try {
				// wait for a random time
				Thread.sleep((int) (Math.random() * 3000));
				queue.take();
			}
			catch(InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

可以使用下面这个程序来简单的测试一下:

BlockingQueueDemo.java
package onlyfun.caterpillar;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueDemo {
	public static void main(String[] args) {
		BlockingQueue queue = new ArrayBlockingQueue(1);
		Thread producerThread = new Thread(
		new Producer(queue));
		Thread consumerThread = new Thread(
		new Consumer(queue));
		producerThread.start();
		consumerThread.start();
	}
}