pythonintermediate

Multiprocessing Pool for ETL

Parallelise CPU-bound ETL transformations across multiple CPU cores using multiprocessing.Pool.

python
import multiprocessing as mp
import pandas as pd
from pathlib import Path

def process_file(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path)
    df['revenue'] = df['price'] * df['qty']
    df['source'] = path.stem
    return df

if __name__ == '__main__':
    files = list(Path('data/').glob('*.csv'))
    with mp.Pool(processes=mp.cpu_count()) as pool:
        dfs = pool.map(process_file, files)
    combined = pd.concat(dfs, ignore_index=True)
    combined.to_parquet('combined.parquet')
    print('Processed', len(files), 'files')

Use Cases

  • parallel file processing
  • large-scale ETL
  • CPU-bound data tasks

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.