javaadvanced
Parallel Stream Processing
Use parallel streams for CPU-intensive tasks: when to use, performance tips, and thread-safety.
javaPress ⌘/Ctrl + Shift + C to copy
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class ParallelStreamDemo {
// CPU-intensive: good candidate for parallel
static long countPrimes(int limit) {
return IntStream.rangeClosed(2, limit)
.parallel()
.filter(ParallelStreamDemo::isPrime)
.count();
}
private static boolean isPrime(int n) {
if (n < 2) return false;
for (int i = 2; i * i <= n; i++) {
if (n % i == 0) return false;
}
return true;
}
// Custom thread pool for parallel streams
static <T> T parallelWithPool(int threads, java.util.function.Supplier<T> task)
throws Exception {
ForkJoinPool pool = new ForkJoinPool(threads);
try {
return pool.submit(task::get).get();
} finally {
pool.shutdown();
}
}
// Parallel collection processing
static Map<String, Long> wordFrequency(List<String> documents) {
return documents.parallelStream()
.flatMap(doc -> Arrays.stream(doc.toLowerCase().split("\\W+")))
.filter(w -> w.length() > 2)
.collect(Collectors.groupingByConcurrent(
w -> w,
Collectors.counting()
));
}
// Parallel with ordered result
static List<String> processOrdered(List<String> items) {
return items.parallelStream()
.map(item -> {
// Simulate expensive operation
try { Thread.sleep(10); } catch (InterruptedException e) {}
return item.toUpperCase();
})
.collect(Collectors.toList()); // maintains encounter order
}
public static void main(String[] args) throws Exception {
// Sequential vs Parallel benchmark
int n = 1_000_000;
long start = System.nanoTime();
long seqCount = IntStream.rangeClosed(2, n).filter(ParallelStreamDemo::isPrime).count();
long seqTime = (System.nanoTime() - start) / 1_000_000;
start = System.nanoTime();
long parCount = countPrimes(n);
long parTime = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Sequential: %d primes in %d ms%n", seqCount, seqTime);
System.out.printf("Parallel: %d primes in %d ms%n", parCount, parTime);
// Custom pool with 4 threads
long result = parallelWithPool(4, () -> countPrimes(n));
System.out.println("With custom pool: " + result);
}
}Use Cases
- CPU-intensive data processing with multi-core utilization
- Bulk document/text analysis
- Parallel computation with custom thread pools
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
javabeginner
Read File Line by Line in Java
Read files using BufferedReader, Files.readAllLines, and Stream API with proper resource management.
Best for: Processing log files line by line
#java#file-io
javabeginner
Java Streams — Filter, Map, Collect
Process collections with Java Streams: filter, map, flatMap, reduce, and collect to lists or maps.
Best for: Transforming and filtering collections
#java#streams
javaadvanced
Advanced Streams — Custom Collectors
Advanced Stream operations: custom collectors, flatMap, reduce, teeing, and parallel streams.
Best for: Complex data aggregation and reporting
#java#streams
javaadvanced
Java Flow API — Reactive Streams
Implement reactive publishers and subscribers with Java Flow API: backpressure, buffering, and transformation.
Best for: Reactive data processing with backpressure
#java#reactive