javaadvanced

Java Flow API — Reactive Streams

Implement reactive publishers and subscribers with Java Flow API: backpressure, buffering, and transformation.

java
import java.util.concurrent.*;
import java.util.concurrent.Flow.*;
import java.util.List;

public class ReactiveFlowDemo {

    // Simple Publisher
    static class NumberPublisher implements Publisher<Integer> {
        private final List<Integer> data;

        NumberPublisher(List<Integer> data) { this.data = data; }

        @Override
        public void subscribe(Subscriber<? super Integer> subscriber) {
            subscriber.onSubscribe(new Subscription() {
                private int index = 0;
                private boolean cancelled = false;

                @Override
                public void request(long n) {
                    for (long i = 0; i < n && index < data.size() && !cancelled; i++) {
                        subscriber.onNext(data.get(index++));
                    }
                    if (index >= data.size() && !cancelled) {
                        subscriber.onComplete();
                    }
                }

                @Override
                public void cancel() { cancelled = true; }
            });
        }
    }

    // Transform subscriber (map)
    static class MapSubscriber<T, R> implements Subscriber<T> {
        private final Subscriber<R> downstream;
        private final java.util.function.Function<T, R> mapper;
        private Subscription subscription;

        MapSubscriber(Subscriber<R> downstream, java.util.function.Function<T, R> mapper) {
            this.downstream = downstream;
            this.mapper = mapper;
        }

        @Override public void onSubscribe(Subscription s) {
            this.subscription = s;
            downstream.onSubscribe(s);
        }
        @Override public void onNext(T item) { downstream.onNext(mapper.apply(item)); }
        @Override public void onError(Throwable t) { downstream.onError(t); }
        @Override public void onComplete() { downstream.onComplete(); }
    }

    // Collecting subscriber
    static class CollectSubscriber<T> implements Subscriber<T> {
        private final java.util.List<T> results = new java.util.ArrayList<>();
        private Subscription subscription;
        private final CompletableFuture<List<T>> future = new CompletableFuture<>();

        @Override public void onSubscribe(Subscription s) {
            this.subscription = s;
            s.request(Long.MAX_VALUE); // unbounded demand
        }
        @Override public void onNext(T item) { results.add(item); }
        @Override public void onError(Throwable t) { future.completeExceptionally(t); }
        @Override public void onComplete() { future.complete(results); }

        CompletableFuture<List<T>> result() { return future; }
    }

    // Using SubmissionPublisher (built-in)
    static void submissionPublisherExample() throws Exception {
        try (var publisher = new SubmissionPublisher<String>()) {
            var collector = new CollectSubscriber<String>();
            publisher.subscribe(collector);

            publisher.submit("Hello");
            publisher.submit("World");

            publisher.close();
            System.out.println(collector.result().get(1, TimeUnit.SECONDS));
        }
    }

    public static void main(String[] args) throws Exception {
        var pub = new NumberPublisher(List.of(1, 2, 3, 4, 5));
        var collector = new CollectSubscriber<Integer>();
        var mapper = new MapSubscriber<>(collector, n -> n * n);
        pub.subscribe(mapper);

        System.out.println(collector.result().get()); // [1, 4, 9, 16, 25]

        submissionPublisherExample(); // [Hello, World]
    }
}

Use Cases

  • Reactive data processing with backpressure
  • Event stream processing pipelines
  • Integrating with reactive libraries

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.