原文链接:https://dzone.com/articles/the-evolution-of-producer-consumer-problem-in-java
作者:Ioan Tinca
译者:Ju
想了解更多关于Java中生产者-消费者问题的演变吗?看看这篇文章,我们看看处理这个问题的旧方法和新方法。
生产者——消费者问题是多进程同步问题的典型示例。对于我们大多数人来说,这个问题可能是我们在学校学习中,首次面对并行算法的第一个同步问题。尽管它很简单,但它仍然是并行计算中最大的挑战——即通过多个进程共享单个资源。
问题陈述
有两个进程,一个生产者进程,一个消费者进程,它们共享一个有限大小的公共缓冲区。生产者“生成”数据并将其存储在缓冲区中,消费者“消耗”数据,将其从缓冲区中删除。对于两个并行运行的进程,我们需要确保在缓冲区满时生产者不会将新数据放入缓冲区,如果缓冲区为空,消费者也不会试图从缓冲区中删除数据。
解决方案
为了解决这个并发问题,生产者和消费者必须相互通信。如果缓冲区已满,则生产者将进入休眠状态并等待通知。在消费者从缓冲区中删除一些数据之后,它将通知生产者,然后生产者会开始重新填充缓冲区。如果缓冲区为空,也会发生相同的过程,但在这种情况下,消费者将等待生产者通知。
如果通信未正确完成,则可能导致死锁,两个进程将彼此等待。
经典方法
让我们看看这个问题的典型Java解决方案。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72package ProducerConsumer;
import java.util.LinkedList;
import java.util.Queue;
public class ClassicProducerConsumerExample {
public static void main(String[] args) throws InterruptedException {
Buffer buffer = new Buffer(2);
Thread producerThread = new Thread(new Runnable() {
public void run() {
try {
buffer.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread consumerThread = new Thread(new Runnable() {
public void run() {
try {
buffer.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
static class Buffer {
private Queue<Integer> list;
private int size;
public Buffer(int size) {
this.list = new LinkedList<>();
this.size = size;
}
public void produce() throws InterruptedException {
int value = 0;
while (true) {
synchronized (this) {
while (list.size() >= size) {
// wait for the consumer
wait();
}
list.add(value);
System.out.println("Produced " + value);
value++;
// notify the consumer
notify();
Thread.sleep(1000);
}
}
}
public void consume() throws InterruptedException {
while (true) {
synchronized (this) {
while (list.size() == 0) {
// wait for the producer
wait();
}
int value = list.poll();
System.out.println("Consume " + value);
// notify the producer
notify();
Thread.sleep(1000);
}
}
}
}
}
这里我们有两个线程,一个生产者线程和一个消费者线程,它们共享一个公共缓冲区。生产者线程开始生产新元素并将它们存储在缓冲区中。如果缓冲区满了,它将进入休眠状态并等待通知。否则,它将在缓冲区中放入一个新元素并通知消费者。正如我之前所说,同样的过程也适用于消费者。如果缓冲区为空,消费者将等待生产者通知。否则,它将从缓冲区中删除一个元素,并通知消费者。
如你所见,在前面的示例中,两个作业都由缓冲区对象管理。线程只是调用了buffer.produce()和buffer. consumer(),所有事情都是由这两个方法完成的。
这是一个值得商榷的问题,但在我看来,缓冲区不应该负责创建或删除元素。当然,这取决于你想要实现的目标,但在这种情况下,缓冲区应该只负责以线程安全的方式存储和汇集元素,而不是生成元素。
那么,让我们将生产逻辑和消费逻辑从缓冲区对象中移出。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63package ProducerConsumer;
import java.util.LinkedList;
import java.util.Queue;
public class ProducerConsumerExample2 {
public static void main(String[] args) throws InterruptedException {
Buffer buffer = new Buffer(2);
Thread producerThread = new Thread(() -> {
try {
int value = 0;
while (true) {
buffer.add(value);
System.out.println("Produced " + value);
value ++;
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumerThread = new Thread(() -> {
try {
while (true) {
int value = buffer.poll();
System.out.println("Consume " + value);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
static class Buffer {
private Queue<Integer> list;
private int size;
public Buffer(int size) {
this.list = new LinkedList<>();
this.size = size;
}
public void add(int value) throws InterruptedException {
synchronized (this) {
while (list.size() >= size) {
wait();
}
list.add(value);
notify();
}
}
public int poll() throws InterruptedException {
synchronized (this) {
while (list.size() == 0) {
wait();
}
int value = list.poll();
notify();
return value;
}
}
}
}
这样就好多了。现在,缓冲区负责以线程安全的方式存储和删除元素。
Blocking Queue
但是,我们可以进一步改进这一点。在前面的示例中,我们已经创建了一个缓冲区,当存储一个元素时,这个缓冲区会等待一个插槽变为可用,以防没有更多的空间,在汇集元素时,如果缓冲区为空,它等待一个元素出现,使存储和删除操作是线程安全的。
但是,Java已经为此提供了一个集合。它被称为BlockingQueue,正如这里所描述的,这是一个可以线程安全的放入其中,并从中获取实例的队列。它正是我们想要的。因此,如果在我们的示例中使用BlockingQueue,我们不必实现等待和通知机制。
让我们看看它是什么样子。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36package ProducerConsumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class ProducerConsumerWithBlockingQueue {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(2);
Thread producerThread = new Thread(() -> {
try {
int value = 0;
while (true) {
blockingQueue.put(value);
System.out.println("Produced " + value);
value++;
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumerThread = new Thread(() -> {
try {
while (true) {
int value = blockingQueue.take();
System.out.println("Consume " + value);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
}
可运行线程看起来和以前一模一样。它们以同样的方式生产和消费元素。唯一的区别是,这里我们使用blockingQueue而不是缓冲区对象。
关于Blocking Queue的一些细节
BlockingQueue有两种类型:
无界队列
有界队列
一个无界队列几乎可以无限增长,而且添加操作不会阻塞。你可以像这样创建一个无界队列:
1
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();
在这种情况下,由于添加操作没有阻塞,生产者在添加新元素时无需等待。每当生产者想要添加新元素时,队列就会存储它。但是,这里有一个问题。如果消费者删除元素的速度没有快于生产者添加新元素的速度,那么内存就会被填满,我们就会得到一个OutOfMemory的异常。
相反,有界队列具有固定大小。 你可以这样创建一个有界队列:1
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);
主要区别在于使用有界队列,如果队列已满并且生产者尝试存储另一个元素,则取决于使用什么方法来添加,队列将阻塞,直到它有足够的空间。
在BlockingQueue中添加元素有四种方法:
add() – 如果插入成功则返回true,否则将抛出IllegalStateException的异常
put() – 将一个元素插入队列,并在必要时等待一个空闲插槽
offer() – 如果插入成功则返回true,否则返回false
offer(E e, long timeout, TimeUnit unit) – 如果队列未满,则将元素插入队列,或者在指定超时内等待可用的插槽
因此,如果你使用put()方法并且队列已满,则生产者必须等到有空闲插槽。这就是我们在前面的示例中使用的内容,这与ProducerConsumerExample2的工作方式相同。
使用线程池
我们在这方面还能改进什么?让我们分析一下我们做了什么。我们已经实例化了两个线程,一个将一些元素放入BlockingQueue中,我们称之为生产者,另一个从队列中取出元素,我们称之为消费者。
但是,良好的软件技术表明手动创建和销毁线程是不好的做法。线程创建是一项昂贵的任务。每个线程的创建需要以下步骤:
它为线程堆栈分配内存
操作系统创建与Java线程对应的本机线程
与线程相关的描述符被添加到JVM内部数据结构中
别误会我的意思。拥有更多线程没有任何问题。这就是并行的工作原理。这里的问题是我们“手动”创建它们。这是不好的做法。如果我们手动创建它们,除了创建成本之外,另一个问题是我们无法控制它们中同时运行的数量。例如,如果有数百万个请求到达服务器应用程序,并且每个请求都会创建一个新线程,那么数百万个线程将并行运行,这可能导致线程饥饿).
因此,我们需要一种战略性管理线程的方法。这就是线程池。
线程池根据选定的策略处理线程的生命周期。 它拥有有限数量的空闲线程,并在需要解决任务时重用它们。 这样,我们不必每次都为新请求创建一个新线程,因此,我们可以避免线程饥饿,
Java线程池的实现包括:
一个任务队列
一个工作线程的集合
一个线程工厂
用于管理线程池状态的元数据。
要同时运行某些任务,必须将它们放在任务队列中。然后,当一个线程可用时,它将接收一个任务并运行它。可用线程越多,并行执行的任务就越多。
除了线程生命周期管理之外,使用线程池的另一个优点是,当你计划如何将需要并发执行的工作进行拆分时,你可以用一种更实用的方式进行思考。并行的单位不再是线程; 而是任务。你可以设计一些并发执行的任务,而不是共享公共内存并且并行运行的线程。以函数的方式思考可以帮助我们避免一些常见的多线程问题,比如死锁或数据争用。没有什么可以阻止我们再次涉及这些问题,但是,因为使用函数范式,我们不会强制同步并发计算(使用锁)。这比直接使用线程和共享内存出现的次数要少得多。在我们的示例中并不是这样,因为任务共享一个阻塞队列,但我只想强调这个优点。
综上所述,让我们看看我们的示例是如何使用线程池的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38package ProducerConsumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
public class ProducerConsumerExecutorService {
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(2);
ExecutorService executor = Executors.newFixedThreadPool(2);
Runnable producerTask = () -> {
try {
int value = 0;
while (true) {
blockingQueue.put(value);
System.out.println("Produced " + value);
value++;
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable consumerTask = () -> {
try {
while (true) {
int value = blockingQueue.take();
System.out.println("Consume " + value);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
executor.execute(producerTask);
executor.execute(consumerTask);
executor.shutdown();
}
}
这里的不同之处在于,我们不是手动创建和运行消费者和生产者线程,而是构建一个线程池,它将接收两个任务,即一个生产者任务和一个消费者任务。生产者和消费者任务实际上是与前一个示例中相同的可运行程序。现在,执行程序(线程池实现)接收任务,其工作线程将执行这些任务。
在我们这个简单的例子中,一切都会像以前一样工作。与前面的示例一样,我们仍然有两个线程,它们仍然以相同的方式生成和消费元素。因此,这里我们没有性能改进,但是代码看起来更整洁。我们不再手动建立线程,但是,相反,我们只是说明我们想要的,我们想要一种并行执行一些任务的方法。
因此,当你使用线程池时,你不必将线程视为并行的单位,而是考虑一些并发执行的任务。这就是你需要知道的,执行者将处理剩下的事情。 它将接收一些任务,然后,它将使用可用的工作线程执行这些任务。
总结
首先,我们看到了生产者-消费者问题的“传统”解决方案。我们试图在没有必要的情况下不重新发明轮子,相反,我们希望重用已经测试过的解决方案。因此,与其写一个等待/通知系统,为什么不使用已经提供这种服务的Java Blocking Queue 呢?而且,当Java为我们提供一个非常有效地管理线程生命周期的线程池时,我们可以摆脱手动创建线程。通过这些改进,生产者-消费者问题的解决方案看起来更加可靠和易懂。