javaintermediate

BlockingQueue — Producer Consumer Pattern

Implement producer-consumer with BlockingQueue: bounded buffers, multiple producers, and graceful shutdown.

java
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class ProducerConsumer {
    private final BlockingQueue<Task> queue;
    private final AtomicBoolean running = new AtomicBoolean(true);

    record Task(String id, String payload) {}

    public ProducerConsumer(int capacity) {
        this.queue = new ArrayBlockingQueue<>(capacity);
    }

    // Producer
    class Producer implements Runnable {
        private final String name;

        Producer(String name) { this.name = name; }

        @Override
        public void run() {
            int count = 0;
            while (running.get()) {
                try {
                    Task task = new Task(
                        name + "-" + count++,
                        "data-" + System.currentTimeMillis()
                    );
                    boolean added = queue.offer(task, 1, TimeUnit.SECONDS);
                    if (added) {
                        System.out.printf("[%s] Produced: %s (queue: %d)%n",
                            name, task.id(), queue.size());
                    }
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            System.out.println(name + " stopped");
        }
    }

    // Consumer
    class Consumer implements Runnable {
        private final String name;

        Consumer(String name) { this.name = name; }

        @Override
        public void run() {
            while (running.get() || !queue.isEmpty()) {
                try {
                    Task task = queue.poll(1, TimeUnit.SECONDS);
                    if (task != null) {
                        System.out.printf("[%s] Consumed: %s%n", name, task.id());
                        Thread.sleep(200); // simulate processing
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            System.out.println(name + " stopped");
        }
    }

    public void shutdown() {
        running.set(false);
    }

    public static void main(String[] args) throws Exception {
        var pc = new ProducerConsumer(10);
        ExecutorService pool = Executors.newFixedThreadPool(5);

        // 2 producers, 3 consumers
        pool.submit(pc.new Producer("P1"));
        pool.submit(pc.new Producer("P2"));
        pool.submit(pc.new Consumer("C1"));
        pool.submit(pc.new Consumer("C2"));
        pool.submit(pc.new Consumer("C3"));

        Thread.sleep(2000);
        pc.shutdown();
        pool.shutdown();
        pool.awaitTermination(5, TimeUnit.SECONDS);
        System.out.println("All done. Remaining: " + pc.queue.size());
    }
}

Use Cases

  • Task queue processing systems
  • Bounded buffer for flow control
  • Multi-threaded data pipelines

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.