javaadvanced
Java Flow API — Reactive Streams
Implement reactive publishers and subscribers with Java Flow API: backpressure, buffering, and transformation.
javaPress ⌘/Ctrl + Shift + C to copy
import java.util.concurrent.*;
import java.util.concurrent.Flow.*;
import java.util.List;
public class ReactiveFlowDemo {
// Simple Publisher
static class NumberPublisher implements Publisher<Integer> {
private final List<Integer> data;
NumberPublisher(List<Integer> data) { this.data = data; }
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Subscription() {
private int index = 0;
private boolean cancelled = false;
@Override
public void request(long n) {
for (long i = 0; i < n && index < data.size() && !cancelled; i++) {
subscriber.onNext(data.get(index++));
}
if (index >= data.size() && !cancelled) {
subscriber.onComplete();
}
}
@Override
public void cancel() { cancelled = true; }
});
}
}
// Transform subscriber (map)
static class MapSubscriber<T, R> implements Subscriber<T> {
private final Subscriber<R> downstream;
private final java.util.function.Function<T, R> mapper;
private Subscription subscription;
MapSubscriber(Subscriber<R> downstream, java.util.function.Function<T, R> mapper) {
this.downstream = downstream;
this.mapper = mapper;
}
@Override public void onSubscribe(Subscription s) {
this.subscription = s;
downstream.onSubscribe(s);
}
@Override public void onNext(T item) { downstream.onNext(mapper.apply(item)); }
@Override public void onError(Throwable t) { downstream.onError(t); }
@Override public void onComplete() { downstream.onComplete(); }
}
// Collecting subscriber
static class CollectSubscriber<T> implements Subscriber<T> {
private final java.util.List<T> results = new java.util.ArrayList<>();
private Subscription subscription;
private final CompletableFuture<List<T>> future = new CompletableFuture<>();
@Override public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(Long.MAX_VALUE); // unbounded demand
}
@Override public void onNext(T item) { results.add(item); }
@Override public void onError(Throwable t) { future.completeExceptionally(t); }
@Override public void onComplete() { future.complete(results); }
CompletableFuture<List<T>> result() { return future; }
}
// Using SubmissionPublisher (built-in)
static void submissionPublisherExample() throws Exception {
try (var publisher = new SubmissionPublisher<String>()) {
var collector = new CollectSubscriber<String>();
publisher.subscribe(collector);
publisher.submit("Hello");
publisher.submit("World");
publisher.close();
System.out.println(collector.result().get(1, TimeUnit.SECONDS));
}
}
public static void main(String[] args) throws Exception {
var pub = new NumberPublisher(List.of(1, 2, 3, 4, 5));
var collector = new CollectSubscriber<Integer>();
var mapper = new MapSubscriber<>(collector, n -> n * n);
pub.subscribe(mapper);
System.out.println(collector.result().get()); // [1, 4, 9, 16, 25]
submissionPublisherExample(); // [Hello, World]
}
}Use Cases
- Reactive data processing with backpressure
- Event stream processing pipelines
- Integrating with reactive libraries
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
javabeginner
Read File Line by Line in Java
Read files using BufferedReader, Files.readAllLines, and Stream API with proper resource management.
Best for: Processing log files line by line
#java#file-io
javabeginner
Java Streams — Filter, Map, Collect
Process collections with Java Streams: filter, map, flatMap, reduce, and collect to lists or maps.
Best for: Transforming and filtering collections
#java#streams
javaadvanced
Advanced Streams — Custom Collectors
Advanced Stream operations: custom collectors, flatMap, reduce, teeing, and parallel streams.
Best for: Complex data aggregation and reporting
#java#streams
javaadvanced
Spring WebFlux — Reactive REST API
Build reactive REST APIs with Spring WebFlux using Mono, Flux, and non-blocking operations.
Best for: High-throughput non-blocking APIs
#spring-boot#webflux