阻塞队列是一种队列,一种可以在多线程环境下使用,并且支持阻塞等待的队列。也就是说,阻塞队列和一般的队列的区别就在于:

  • 多线程环境支持,多个线程可以安全的访问队列
  • 支持生产和消费等待,多个线程之间互相配合,当队列为空的时候,消费线程会阻塞等待队列不为空;当队列满了的时候,生产线 程就会阻塞直到队列不满。

阻塞队列

阻塞队列在Java中的一种典型使用场景是线程池,在线程池中,当提交的任务不能立即

得到执行的时候,线程池就会将提交的任务放到一个阻塞队列中来。比如下面的代码:

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

newFixedThreadPool使用可LinkedBlockingQueue这种阻塞队列。

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

newCachedThreadPool使用了SynchronousQueue这种队列,这种队列的特点是不缓存数据,而是缓存线程,线程分为生产者线程和消费者线程,一个生产者线程和一个消费者线程是互补的,当一个生产者线程遇到一个消费者线程的时候就会直接进行数据交换,所以这种队列的技术点比较高,理解起来难度较大。一个线程只能缓存一个数据,当一个线程插入数据之后就会被阻塞,直到另外一个线程消费了其中的数据。

阻塞队列还提供了其他类型的队列,包括双端阻塞队列,延时阻塞队列,延时阻塞队列的使用可以在newScheduledThreadPool中找到,newScheduledThreadPool里面使用延时阻塞队列来调度周期性任务执行。

下面展示的是BlockingQueue提供的一些方法:

BlockingQueue
BlockingQueue

BlockingQueue的使用

BlockingQueue通常用于在线程上生成另一个线程消耗的对象。下图展示这个过程:

BlockingQueue
BlockingQueue

生成线程将继续生成新对象并将它们插入队列,直到队列达到它可以包含的内容的某个上限。换句话说,这是限制。如果阻塞队列达到其上限,则在尝试插入新对象时会阻止生成线程。它一直被阻塞,直到消费线程将一个对象从队列中取出。

消费线程不断将对象从阻塞队列中取出并处理它们。如果消费线程试图将对象从空队列中取出,则消耗线程将被阻塞,直到生成线程将对象放入队列。

BlockingQueue的方法

BlockingQueue有4组不同的方法用于插入,删除和检查队列中的元素。如果不能立即执行所请求的操作,则每组方法的行为都不同。

操作类型 Throws Exception Special Value Blocks Times Out
插入 add(o) offer(o) put(o) offer(o, timeout, timeunit)
取出(删除) remove(o) poll() take() poll(timeout, timeunit)
检查 element() peek()

四种操作的含义:

  • Throws Exception: 如果无法立即尝试操作,则抛出异常。
  • Special Value: 如果无法立即尝试操作,则返回特殊值(通常为true / false)。
  • Blocks: 如果无法立即执行尝试的操作,则方法调用将阻塞直到它为止。
  • Times Out: 如果无法立即执行尝试的操作,则方法调用将阻塞直到它,但等待不超过给定的超时。返回一个特殊值,告知操作是否成功(通常为true / false)。

无法在BlockingQueue中插入null。如果您尝试插入null,则BlockingQueue将抛出NullPointerException

可以访问BlockingQueue中的所有元素,而不仅仅是开头和结尾的元素。例如,假设您已排队对象进行处理,但您的应用程序决定取消它。然后可以调用remove(o)删除队列中的特定对象。但是,这不是非常有效,所以除非你真的需要,否则你不应该使用这些Collection方法。

BlockingQueue实现类

BlockingQueue是一个接口,需要通过它的实现类来使用它,BlockingQueue有以下实现类:

BlockingQueue Implementations
BlockingQueue Implementations

ArrayBlockingQueue

ArrayBlockingQueue是一个有界的阻塞队列,它将元素存储在数组内部。它有界意味着它无法存储无限量的元素。它可以同时存储的元素数量有一个上限。在实例化时设置上限,之后无法更改。

ArrayBlockingQueueFIFO(先进先出)顺序在内部存储元素。队列的头部是队列中最长时间的元素,队列的尾部是队列中最短时间的元素。

以下是如何实例化和使用ArrayBlockingQueue

1
2
3
4
5
BlockingQueue queue = new ArrayBlockingQueue(1024);

queue.put("1");

Object object = queue.take();

DelayQueue

DelayQueue在内部阻止元素,直到某个延迟到期。元素必须实现java.util.concurrent.Delayed接口。以下是该接口的定义:

1
2
3
4
5
public interface Delayed extends Comparable<Delayed< {

public long getDelay(TimeUnit timeUnit);

}

只有一个方法,就是要获取消费延时。

延时阻塞队列使用了优先阻塞队列来存储数据,数据的获取是有优先级的,这一点需要注意,在这点上,我们应该和java的线程池的调度线程池的实现联系起来,在java的调度线程池的实现上,也使用了延时队列,而优先级队列可以保证线程池调度的任务都是根据时间优先级被调度的。take方法首先从优先队列中获取第一个元素,然后询问是否需要延时,如果不需要,则直接返回,否则延时设定的时间之后再返回。

Example:

1
2
3
4
5
6
7
8
9
10
11
12
public class DelayQueueExample {

public static void main(String[] args) {
DelayQueue queue = new DelayQueue();

Delayed element1 = new DelayedElement();

queue.put(element1);

Delayed element2 = queue.take();
}
}

LinkedBlockingQueue

LinkedBlockingQueue使用链表来作为队列的数据结构,下面就是链表节点的数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
static class Node<E> {
E item;

/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;

Node(E x) { item = x; }
}

LinkedBlockingQueue将元素保留在链接结构(链接节点)内部。如果需要,该连接结构可任选地具有上限。如果未指定上限,则使用Integer.MAX_VALUE作为上限。

LinkedBlockingQueue在内部以FIFO(先进先出)顺序存储元素。队列的头部是队列中最长时间的元素,队列的尾部是队列中最短时间的元素。

Example:

1
2
3
4
5
6
BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
BlockingQueue<String> bounded = new LinkedBlockingQueue<String>(1024);

bounded.put("Value");

String value = bounded.take();

PriorityBlockingQueue

PriorityBlockingQueue是一个无限制的并发队列。 它使用与java.util.PriorityQueue类相同的排序规则。 不能将null插入此队列。

插入PriorityBlockingQueue的所有元素都必须实现java.lang.Comparable接口。 因此,元素根据您在Comparable实现中决定的优先级进行排序。

需要注意的是:PriorityBlockingQueue不会对具有相同优先级的元素强制执行任何特定行为(compare()== 0)。

如果从PriorityBlockingQueue获取IteratorIterator不保证按优先级顺序迭代元素。

Example:

1
2
3
4
5
6
BlockingQueue queue   = new PriorityBlockingQueue();

//String implements java.lang.Comparable
queue.put("Value");

String value = queue.take();

SynchronousQueue

SynchronousQueue是一个内部只能包含单个元素的队列。 将元素插入队列的线程被阻塞,直到另一个线程从队列中获取该元素。 同样,如果线程尝试获取元素并且当前不存在任何元素,则该线程将被阻塞,直到线程将元素插入队列。是最为复杂的阻塞队列。

任何插入操作都需要等待其他线程来消费,否则就会阻塞等待,也就是说,生产线程生产出一条数据之后就要等待消费者线程来将其消费掉,才能继续生产数据,否则就会阻塞等待消费。队列中会把到来的线程缓存起来,当然会进行一些操作,下面是大概的算法:

1
2
3
1、队列初始化为null
2、当一个线程达到之后,如果队列为null,则将该线程放到队列中去,否则,判断队列中的第一个元素是否和当前到达的元素
匹配,如果匹配,那么两个线程的数据交易完成,否则也将新到达的线程数据缓存到队列中。

SynchronousQueue通过使用Transferer类的transfer(E e, boolean timed, long nanos)方法来完成数据交易操作,根据fair模式和non-fair模式有两种类型的Transfererfair模式对应于TransferQueuenon-fair模式对应TransferStack

Java BlockingQueue Example

这是一个Java BlockingQueue示例。该示例使用BlockingQueue接口的ArrayBlockingQueue实现。

实现的流程如下,BlockingQueueExample类在单独的线程中启动ProducerConsumerProducer将字符串插入共享的BlockingQueue,然后Consumer将它们取出。

Producer

编写生产者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Producer implements Runnable {

protected BlockingQueue queue = null;

public Producer(BlockingQueue queue) {
this.queue = queue;
}

@Override
public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Consumer

消费者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Consumer implements Runnable {

protected BlockingQueue queue = null;

public Consumer(BlockingQueue queue) {
this.queue = queue;
}

@Override
public void run() {
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Example

示例代码(main()):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ArrayBlockingQueueExample {

public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue queue = new ArrayBlockingQueue(1024);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);

new Thread(producer).start();
new Thread(consumer).start();

Thread.sleep(4000);
}

}

运行结果:

1
2
3
4
5
1
2
3

Process finished with exit code 0