BlockingQueue — Producer Consumer Pattern
Implement producer-consumer with BlockingQueue: bounded buffers, multiple producers, and graceful shutdown.
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class ProducerConsumer {
private final BlockingQueue<Task> queue;
private final AtomicBoolean running = new AtomicBoolean(true);
record Task(String id, String payload) {}
public ProducerConsumer(int capacity) {
this.queue = new ArrayBlockingQueue<>(capacity);
}
// Producer
class Producer implements Runnable {
private final String name;
Producer(String name) { this.name = name; }
@Override
public void run() {
int count = 0;
while (running.get()) {
try {
Task task = new Task(
name + "-" + count++,
"data-" + System.currentTimeMillis()
);
boolean added = queue.offer(task, 1, TimeUnit.SECONDS);
if (added) {
System.out.printf("[%s] Produced: %s (queue: %d)%n",
name, task.id(), queue.size());
}
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
System.out.println(name + " stopped");
}
}
// Consumer
class Consumer implements Runnable {
private final String name;
Consumer(String name) { this.name = name; }
@Override
public void run() {
while (running.get() || !queue.isEmpty()) {
try {
Task task = queue.poll(1, TimeUnit.SECONDS);
if (task != null) {
System.out.printf("[%s] Consumed: %s%n", name, task.id());
Thread.sleep(200); // simulate processing
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
System.out.println(name + " stopped");
}
}
public void shutdown() {
running.set(false);
}
public static void main(String[] args) throws Exception {
var pc = new ProducerConsumer(10);
ExecutorService pool = Executors.newFixedThreadPool(5);
// 2 producers, 3 consumers
pool.submit(pc.new Producer("P1"));
pool.submit(pc.new Producer("P2"));
pool.submit(pc.new Consumer("C1"));
pool.submit(pc.new Consumer("C2"));
pool.submit(pc.new Consumer("C3"));
Thread.sleep(2000);
pc.shutdown();
pool.shutdown();
pool.awaitTermination(5, TimeUnit.SECONDS);
System.out.println("All done. Remaining: " + pc.queue.size());
}
}Use Cases
- Task queue processing systems
- Bounded buffer for flow control
- Multi-threaded data pipelines
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
CompletableFuture — Async Programming
Chain async operations with CompletableFuture: thenApply, thenCompose, allOf, and exception handling.
Best for: Parallel API calls to multiple services
Virtual Threads — Lightweight Concurrency
Use Java 21 virtual threads for massive concurrency without thread pool tuning or reactive frameworks.
Best for: High-concurrency web servers handling thousands of requests
ExecutorService — Thread Pool Management
Create and manage thread pools with ExecutorService: fixed, cached, scheduled, and custom pools.
Best for: Managing concurrent task execution
Concurrent Collections — Thread-Safe Maps
Use ConcurrentHashMap, CopyOnWriteArrayList, and BlockingQueue for thread-safe data structures.
Best for: Thread-safe caching in multi-threaded applications