javaadvanced

Fork/Join Framework — Divide and Conquer

Parallelize recursive tasks with ForkJoinPool: RecursiveTask, RecursiveAction, and work-stealing.

java
import java.util.concurrent.*;
import java.util.Arrays;

public class ForkJoinDemo {

    // Parallel sum with RecursiveTask (returns a value)
    static class ParallelSum extends RecursiveTask<Long> {
        private static final int THRESHOLD = 10_000;
        private final long[] array;
        private final int start, end;

        ParallelSum(long[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            if (end - start <= THRESHOLD) {
                long sum = 0;
                for (int i = start; i < end; i++) sum += array[i];
                return sum;
            }
            int mid = (start + end) / 2;
            ParallelSum left = new ParallelSum(array, start, mid);
            ParallelSum right = new ParallelSum(array, mid, end);
            left.fork(); // async left
            long rightResult = right.compute(); // sync right
            long leftResult = left.join();
            return leftResult + rightResult;
        }
    }

    // Parallel merge sort with RecursiveAction (no return)
    static class ParallelMergeSort extends RecursiveAction {
        private static final int THRESHOLD = 4096;
        private final int[] array, temp;
        private final int start, end;

        ParallelMergeSort(int[] array, int[] temp, int start, int end) {
            this.array = array;
            this.temp = temp;
            this.start = start;
            this.end = end;
        }

        @Override
        protected void compute() {
            if (end - start <= THRESHOLD) {
                Arrays.sort(array, start, end);
                return;
            }
            int mid = (start + end) / 2;
            invokeAll(
                new ParallelMergeSort(array, temp, start, mid),
                new ParallelMergeSort(array, temp, mid, end)
            );
            merge(start, mid, end);
        }

        private void merge(int lo, int mid, int hi) {
            System.arraycopy(array, lo, temp, lo, hi - lo);
            int i = lo, j = mid, k = lo;
            while (i < mid && j < hi) {
                array[k++] = temp[i] <= temp[j] ? temp[i++] : temp[j++];
            }
            while (i < mid) array[k++] = temp[i++];
        }
    }

    public static void main(String[] args) {
        // Parallel sum
        long[] data = new long[1_000_000];
        Arrays.fill(data, 1);
        ForkJoinPool pool = ForkJoinPool.commonPool();

        long sum = pool.invoke(new ParallelSum(data, 0, data.length));
        System.out.println("Sum: " + sum); // 1000000

        // Parallel sort
        int[] sortData = new java.util.Random(42).ints(100_000, 0, 1_000_000).toArray();
        int[] temp = new int[sortData.length];

        long t = System.nanoTime();
        pool.invoke(new ParallelMergeSort(sortData, temp, 0, sortData.length));
        System.out.printf("Sorted %d items in %d ms%n",
            sortData.length, (System.nanoTime() - t) / 1_000_000);

        // Verify
        for (int i = 1; i < sortData.length; i++) {
            assert sortData[i - 1] <= sortData[i];
        }

        System.out.println("Parallelism: " + pool.getParallelism());
        System.out.println("Pool size: " + pool.getPoolSize());
    }
}

Use Cases

  • CPU-intensive divide-and-conquer algorithms
  • Parallel array processing and aggregation
  • Custom parallelism beyond parallel streams

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.