javaadvanced

Parallel Stream Processing

Use parallel streams for CPU-intensive tasks: when to use, performance tips, and thread-safety.

java
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class ParallelStreamDemo {

    // CPU-intensive: good candidate for parallel
    static long countPrimes(int limit) {
        return IntStream.rangeClosed(2, limit)
            .parallel()
            .filter(ParallelStreamDemo::isPrime)
            .count();
    }

    private static boolean isPrime(int n) {
        if (n < 2) return false;
        for (int i = 2; i * i <= n; i++) {
            if (n % i == 0) return false;
        }
        return true;
    }

    // Custom thread pool for parallel streams
    static <T> T parallelWithPool(int threads, java.util.function.Supplier<T> task)
            throws Exception {
        ForkJoinPool pool = new ForkJoinPool(threads);
        try {
            return pool.submit(task::get).get();
        } finally {
            pool.shutdown();
        }
    }

    // Parallel collection processing
    static Map<String, Long> wordFrequency(List<String> documents) {
        return documents.parallelStream()
            .flatMap(doc -> Arrays.stream(doc.toLowerCase().split("\\W+")))
            .filter(w -> w.length() > 2)
            .collect(Collectors.groupingByConcurrent(
                w -> w,
                Collectors.counting()
            ));
    }

    // Parallel with ordered result
    static List<String> processOrdered(List<String> items) {
        return items.parallelStream()
            .map(item -> {
                // Simulate expensive operation
                try { Thread.sleep(10); } catch (InterruptedException e) {}
                return item.toUpperCase();
            })
            .collect(Collectors.toList()); // maintains encounter order
    }

    public static void main(String[] args) throws Exception {
        // Sequential vs Parallel benchmark
        int n = 1_000_000;

        long start = System.nanoTime();
        long seqCount = IntStream.rangeClosed(2, n).filter(ParallelStreamDemo::isPrime).count();
        long seqTime = (System.nanoTime() - start) / 1_000_000;

        start = System.nanoTime();
        long parCount = countPrimes(n);
        long parTime = (System.nanoTime() - start) / 1_000_000;

        System.out.printf("Sequential: %d primes in %d ms%n", seqCount, seqTime);
        System.out.printf("Parallel:   %d primes in %d ms%n", parCount, parTime);

        // Custom pool with 4 threads
        long result = parallelWithPool(4, () -> countPrimes(n));
        System.out.println("With custom pool: " + result);
    }
}

Use Cases

  • CPU-intensive data processing with multi-core utilization
  • Bulk document/text analysis
  • Parallel computation with custom thread pools

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.