BLOG POSTS
    MangoHost Blog / Python Multiprocessing Example – Parallelize Your Code
Python Multiprocessing Example – Parallelize Your Code

Python Multiprocessing Example – Parallelize Your Code

Python multiprocessing allows you to run multiple processes in parallel, making full use of your CPU cores to dramatically speed up computation-heavy tasks. Unlike threading, which is limited by the Global Interpreter Lock (GIL), multiprocessing creates separate Python interpreter processes that can truly execute simultaneously. This guide will show you how to implement multiprocessing in your Python applications, compare different approaches, troubleshoot common issues, and optimize performance for real-world scenarios.

How Python Multiprocessing Works

The multiprocessing module creates separate Python processes, each with its own memory space and Python interpreter. This bypasses the GIL limitation that prevents true parallelism with threading. When you spawn processes, the operating system schedules them across available CPU cores, allowing genuine parallel execution.

The main components include:

  • Process – Individual worker processes that execute your functions
  • Pool – Manages a group of worker processes for distributing tasks
  • Queue – Thread-safe communication between processes
  • Pipe – Two-way communication channel between processes
  • Lock – Synchronization primitive to prevent race conditions

Step-by-Step Implementation Guide

Let’s start with a basic example comparing sequential vs parallel processing:

import multiprocessing
import time
import math

def cpu_intensive_task(n):
    """Simulate CPU-intensive work"""
    result = 0
    for i in range(n):
        result += math.sqrt(i)
    return result

def sequential_processing():
    start_time = time.time()
    numbers = [1000000] * 4
    results = []
    
    for num in numbers:
        results.append(cpu_intensive_task(num))
    
    end_time = time.time()
    print(f"Sequential time: {end_time - start_time:.2f} seconds")
    return results

def parallel_processing():
    start_time = time.time()
    numbers = [1000000] * 4
    
    with multiprocessing.Pool() as pool:
        results = pool.map(cpu_intensive_task, numbers)
    
    end_time = time.time()
    print(f"Parallel time: {end_time - start_time:.2f} seconds")
    return results

if __name__ == "__main__":
    sequential_processing()
    parallel_processing()

Here’s a more advanced example using Process class for fine-grained control:

import multiprocessing
import os
import time

def worker_function(name, shared_queue, lock):
    """Worker function that processes items from a shared queue"""
    process_id = os.getpid()
    
    while True:
        try:
            # Get item from queue with timeout
            item = shared_queue.get(timeout=1)
            
            if item is None:  # Poison pill to stop worker
                break
                
            # Simulate processing
            time.sleep(0.1)
            
            # Thread-safe printing
            with lock:
                print(f"Process {name} (PID: {process_id}) processed: {item}")
                
        except:
            break

def main():
    # Create shared resources
    task_queue = multiprocessing.Queue()
    print_lock = multiprocessing.Lock()
    
    # Add tasks to queue
    for i in range(20):
        task_queue.put(f"Task-{i}")
    
    # Create and start worker processes
    processes = []
    num_workers = multiprocessing.cpu_count()
    
    for i in range(num_workers):
        p = multiprocessing.Process(
            target=worker_function,
            args=(f"Worker-{i}", task_queue, print_lock)
        )
        p.start()
        processes.append(p)
    
    # Wait for all tasks to complete
    task_queue.join()
    
    # Send poison pills to stop workers
    for _ in range(num_workers):
        task_queue.put(None)
    
    # Wait for all processes to finish
    for p in processes:
        p.join()
    
    print("All tasks completed!")

if __name__ == "__main__":
    main()

Real-World Examples and Use Cases

Here are practical applications where multiprocessing provides significant benefits:

Web Scraping with Multiprocessing

import multiprocessing
import requests
import time
from urllib.parse import urljoin

def scrape_url(url):
    """Scrape a single URL and return basic info"""
    try:
        response = requests.get(url, timeout=5)
        return {
            'url': url,
            'status_code': response.status_code,
            'content_length': len(response.content),
            'response_time': response.elapsed.total_seconds()
        }
    except Exception as e:
        return {'url': url, 'error': str(e)}

def parallel_scraping(urls, num_workers=4):
    """Scrape multiple URLs in parallel"""
    start_time = time.time()
    
    with multiprocessing.Pool(processes=num_workers) as pool:
        results = pool.map(scrape_url, urls)
    
    end_time = time.time()
    print(f"Scraped {len(urls)} URLs in {end_time - start_time:.2f} seconds")
    return results

# Example usage
if __name__ == "__main__":
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/status/200',
        'https://httpbin.org/json',
    ] * 5  # 20 URLs total
    
    results = parallel_scraping(urls)
    
    # Process results
    successful = [r for r in results if 'error' not in r]
    errors = [r for r in results if 'error' in r]
    
    print(f"Successful: {len(successful)}, Errors: {len(errors)}")

Data Processing Pipeline

import multiprocessing
import json
import time
from pathlib import Path

def process_json_file(file_path):
    """Process a single JSON file"""
    try:
        with open(file_path, 'r') as f:
            data = json.load(f)
        
        # Simulate processing (e.g., data transformation)
        processed_data = {
            'file': file_path.name,
            'record_count': len(data) if isinstance(data, list) else 1,
            'size_kb': file_path.stat().st_size / 1024,
            'processed_at': time.time()
        }
        
        return processed_data
        
    except Exception as e:
        return {'file': file_path.name, 'error': str(e)}

def batch_process_files(directory_path, max_workers=None):
    """Process all JSON files in a directory using multiprocessing"""
    json_files = list(Path(directory_path).glob('*.json'))
    
    if not json_files:
        print("No JSON files found")
        return []
    
    if max_workers is None:
        max_workers = min(len(json_files), multiprocessing.cpu_count())
    
    print(f"Processing {len(json_files)} files with {max_workers} workers")
    
    start_time = time.time()
    
    with multiprocessing.Pool(processes=max_workers) as pool:
        results = pool.map(process_json_file, json_files)
    
    end_time = time.time()
    print(f"Processing completed in {end_time - start_time:.2f} seconds")
    
    return results

Performance Comparisons

Here’s a benchmark comparing different approaches:

Approach CPU Cores Used Memory Usage Best For Limitations
Sequential 1 Low I/O bound tasks, simple scripts Doesn’t utilize multiple cores
Threading 1 (due to GIL) Medium I/O bound, concurrent operations GIL prevents true parallelism
Multiprocessing All available High CPU intensive tasks Higher memory usage, IPC overhead
AsyncIO 1 Low High concurrency I/O Single-threaded, requires async/await

Performance test results for CPU-intensive tasks (4-core system):

import multiprocessing
import time
import threading

def benchmark_approaches():
    def cpu_task(n):
        return sum(i*i for i in range(n))
    
    numbers = [100000] * 8
    
    # Sequential
    start = time.time()
    seq_results = [cpu_task(n) for n in numbers]
    seq_time = time.time() - start
    
    # Multiprocessing
    start = time.time()
    with multiprocessing.Pool() as pool:
        mp_results = pool.map(cpu_task, numbers)
    mp_time = time.time() - start
    
    # Threading (for comparison)
    start = time.time()
    threads = []
    thread_results = []
    
    def thread_worker(n):
        thread_results.append(cpu_task(n))
    
    for n in numbers:
        t = threading.Thread(target=thread_worker, args=(n,))
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()
    
    thread_time = time.time() - start
    
    print(f"Sequential: {seq_time:.2f}s")
    print(f"Threading: {thread_time:.2f}s") 
    print(f"Multiprocessing: {mp_time:.2f}s")
    print(f"Speedup: {seq_time/mp_time:.1f}x")

if __name__ == "__main__":
    benchmark_approaches()

Best Practices and Common Pitfalls

Best Practices

  • Use context managers – Always use with multiprocessing.Pool() to ensure proper cleanup
  • Guard main execution – Always use if __name__ == "__main__": to prevent recursive process creation
  • Choose optimal worker count – Start with multiprocessing.cpu_count() but adjust based on your specific workload
  • Minimize data transfer – Large objects passed between processes incur serialization overhead
  • Use appropriate data structures – Prefer multiprocessing.Queue over regular queues for inter-process communication

Common Issues and Solutions

# Issue: Shared state problems
# WRONG - This won't work as expected
counter = 0

def increment_counter():
    global counter
    counter += 1  # Each process has its own copy

# CORRECT - Use shared memory
def safe_counter_example():
    def increment_shared(shared_counter, lock):
        with lock:
            shared_counter.value += 1
    
    shared_counter = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()
    
    processes = []
    for _ in range(4):
        p = multiprocessing.Process(target=increment_shared, 
                                  args=(shared_counter, lock))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    
    print(f"Final counter value: {shared_counter.value}")

# Issue: Pickle errors with complex objects
# WRONG - Lambda functions can't be pickled
# pool.map(lambda x: x*2, numbers)  # This will fail

# CORRECT - Use regular functions or functools.partial
from functools import partial

def multiply(x, factor):
    return x * factor

def pickle_safe_example():
    numbers = [1, 2, 3, 4, 5]
    multiply_by_3 = partial(multiply, factor=3)
    
    with multiprocessing.Pool() as pool:
        results = pool.map(multiply_by_3, numbers)
    
    return results

Memory Management and Process Monitoring

import multiprocessing
import psutil
import os

def monitor_process_resources():
    """Monitor memory and CPU usage of multiprocessing tasks"""
    def memory_intensive_task(size):
        # Create large list to demonstrate memory usage
        data = list(range(size))
        process = psutil.Process(os.getpid())
        memory_mb = process.memory_info().rss / 1024 / 1024
        return f"PID {os.getpid()}: Processed {size} items, Memory: {memory_mb:.1f}MB"
    
    sizes = [1000000, 2000000, 1500000, 3000000]
    
    print("Starting multiprocessing with resource monitoring...")
    
    with multiprocessing.Pool(processes=2) as pool:
        results = pool.map(memory_intensive_task, sizes)
    
    for result in results:
        print(result)

# Process cleanup and error handling
def robust_multiprocessing_example():
    def worker_with_error_handling(item):
        try:
            # Simulate work that might fail
            if item % 7 == 0:
                raise ValueError(f"Item {item} caused an error")
            return item * 2
        except Exception as e:
            return f"Error processing {item}: {str(e)}"
    
    items = list(range(20))
    
    try:
        with multiprocessing.Pool(processes=4) as pool:
            results = pool.map(worker_with_error_handling, items)
        
        # Separate successful results from errors
        successful = [r for r in results if not isinstance(r, str) or not r.startswith("Error")]
        errors = [r for r in results if isinstance(r, str) and r.startswith("Error")]
        
        print(f"Successful: {len(successful)}, Errors: {len(errors)}")
        
    except KeyboardInterrupt:
        print("Process interrupted by user")
    except Exception as e:
        print(f"Unexpected error: {e}")

if __name__ == "__main__":
    monitor_process_resources()
    robust_multiprocessing_example()

Advanced Techniques and Optimization

Custom Process Pool with Progress Tracking

import multiprocessing
from multiprocessing import Pool
import time
from tqdm import tqdm

def trackable_worker(args):
    """Worker function that can be tracked"""
    item, delay = args
    time.sleep(delay)  # Simulate work
    return f"Processed {item}"

def parallel_with_progress(items, num_workers=4):
    """Run multiprocessing with progress bar"""
    # Prepare arguments (item, processing_time)
    work_items = [(item, 0.1) for item in items]
    
    with Pool(processes=num_workers) as pool:
        # Use imap for lazy evaluation and progress tracking
        results = []
        with tqdm(total=len(work_items), desc="Processing") as pbar:
            for result in pool.imap(trackable_worker, work_items):
                results.append(result)
                pbar.update(1)
    
    return results

# Dynamic worker adjustment based on system load
def adaptive_multiprocessing(tasks, max_workers=None):
    """Adjust worker count based on system resources"""
    if max_workers is None:
        # Start with CPU count but adjust based on memory
        available_memory_gb = psutil.virtual_memory().available / (1024**3)
        max_workers = min(
            multiprocessing.cpu_count(),
            max(1, int(available_memory_gb / 0.5))  # 0.5GB per worker
        )
    
    print(f"Using {max_workers} workers based on available resources")
    
    with multiprocessing.Pool(processes=max_workers) as pool:
        results = pool.map(cpu_intensive_task, tasks)
    
    return results

Integration with Other Technologies

Multiprocessing works well with popular Python libraries:

# Integration with pandas for data processing
import pandas as pd
import numpy as np
from multiprocessing import Pool

def process_dataframe_chunk(chunk):
    """Process a chunk of DataFrame"""
    # Example: Calculate rolling statistics
    chunk['rolling_mean'] = chunk['value'].rolling(window=5).mean()
    chunk['rolling_std'] = chunk['value'].rolling(window=5).std()
    return chunk

def parallel_dataframe_processing(df, num_chunks=4):
    """Process large DataFrame in parallel chunks"""
    # Split DataFrame into chunks
    chunks = np.array_split(df, num_chunks)
    
    with Pool(processes=num_chunks) as pool:
        processed_chunks = pool.map(process_dataframe_chunk, chunks)
    
    # Combine results
    result_df = pd.concat(processed_chunks, ignore_index=True)
    return result_df

# Example usage
if __name__ == "__main__":
    # Create sample DataFrame
    data = pd.DataFrame({
        'value': np.random.randn(10000),
        'category': np.random.choice(['A', 'B', 'C'], 10000)
    })
    
    result = parallel_dataframe_processing(data)
    print(f"Processed DataFrame shape: {result.shape}")

For more detailed information about Python multiprocessing, refer to the official Python multiprocessing documentation. The concurrent.futures module provides a higher-level interface that’s often easier to use for simple parallel processing tasks.

Multiprocessing is particularly effective for CPU-bound tasks like mathematical computations, data processing, image manipulation, and scientific computing. For I/O-bound tasks, consider using asyncio or threading instead. The key is understanding your workload characteristics and choosing the right tool for optimal performance.



This article incorporates information and material from various online sources. We acknowledge and appreciate the work of all original authors, publishers, and websites. While every effort has been made to appropriately credit the source material, any unintentional oversight or omission does not constitute a copyright infringement. All trademarks, logos, and images mentioned are the property of their respective owners. If you believe that any content used in this article infringes upon your copyright, please contact us immediately for review and prompt action.

This article is intended for informational and educational purposes only and does not infringe on the rights of the copyright owners. If any copyrighted material has been used without proper credit or in violation of copyright laws, it is unintentional and we will rectify it promptly upon notification. Please note that the republishing, redistribution, or reproduction of part or all of the contents in any form is prohibited without express written permission from the author and website owner. For permissions or further inquiries, please contact us.

Leave a reply

Your email address will not be published. Required fields are marked