
Python Multiprocessing Example – Parallelize Your Code
Python multiprocessing allows you to run multiple processes in parallel, making full use of your CPU cores to dramatically speed up computation-heavy tasks. Unlike threading, which is limited by the Global Interpreter Lock (GIL), multiprocessing creates separate Python interpreter processes that can truly execute simultaneously. This guide will show you how to implement multiprocessing in your Python applications, compare different approaches, troubleshoot common issues, and optimize performance for real-world scenarios.
How Python Multiprocessing Works
The multiprocessing module creates separate Python processes, each with its own memory space and Python interpreter. This bypasses the GIL limitation that prevents true parallelism with threading. When you spawn processes, the operating system schedules them across available CPU cores, allowing genuine parallel execution.
The main components include:
- Process – Individual worker processes that execute your functions
- Pool – Manages a group of worker processes for distributing tasks
- Queue – Thread-safe communication between processes
- Pipe – Two-way communication channel between processes
- Lock – Synchronization primitive to prevent race conditions
Step-by-Step Implementation Guide
Let’s start with a basic example comparing sequential vs parallel processing:
import multiprocessing
import time
import math
def cpu_intensive_task(n):
"""Simulate CPU-intensive work"""
result = 0
for i in range(n):
result += math.sqrt(i)
return result
def sequential_processing():
start_time = time.time()
numbers = [1000000] * 4
results = []
for num in numbers:
results.append(cpu_intensive_task(num))
end_time = time.time()
print(f"Sequential time: {end_time - start_time:.2f} seconds")
return results
def parallel_processing():
start_time = time.time()
numbers = [1000000] * 4
with multiprocessing.Pool() as pool:
results = pool.map(cpu_intensive_task, numbers)
end_time = time.time()
print(f"Parallel time: {end_time - start_time:.2f} seconds")
return results
if __name__ == "__main__":
sequential_processing()
parallel_processing()
Here’s a more advanced example using Process class for fine-grained control:
import multiprocessing
import os
import time
def worker_function(name, shared_queue, lock):
"""Worker function that processes items from a shared queue"""
process_id = os.getpid()
while True:
try:
# Get item from queue with timeout
item = shared_queue.get(timeout=1)
if item is None: # Poison pill to stop worker
break
# Simulate processing
time.sleep(0.1)
# Thread-safe printing
with lock:
print(f"Process {name} (PID: {process_id}) processed: {item}")
except:
break
def main():
# Create shared resources
task_queue = multiprocessing.Queue()
print_lock = multiprocessing.Lock()
# Add tasks to queue
for i in range(20):
task_queue.put(f"Task-{i}")
# Create and start worker processes
processes = []
num_workers = multiprocessing.cpu_count()
for i in range(num_workers):
p = multiprocessing.Process(
target=worker_function,
args=(f"Worker-{i}", task_queue, print_lock)
)
p.start()
processes.append(p)
# Wait for all tasks to complete
task_queue.join()
# Send poison pills to stop workers
for _ in range(num_workers):
task_queue.put(None)
# Wait for all processes to finish
for p in processes:
p.join()
print("All tasks completed!")
if __name__ == "__main__":
main()
Real-World Examples and Use Cases
Here are practical applications where multiprocessing provides significant benefits:
Web Scraping with Multiprocessing
import multiprocessing
import requests
import time
from urllib.parse import urljoin
def scrape_url(url):
"""Scrape a single URL and return basic info"""
try:
response = requests.get(url, timeout=5)
return {
'url': url,
'status_code': response.status_code,
'content_length': len(response.content),
'response_time': response.elapsed.total_seconds()
}
except Exception as e:
return {'url': url, 'error': str(e)}
def parallel_scraping(urls, num_workers=4):
"""Scrape multiple URLs in parallel"""
start_time = time.time()
with multiprocessing.Pool(processes=num_workers) as pool:
results = pool.map(scrape_url, urls)
end_time = time.time()
print(f"Scraped {len(urls)} URLs in {end_time - start_time:.2f} seconds")
return results
# Example usage
if __name__ == "__main__":
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/status/200',
'https://httpbin.org/json',
] * 5 # 20 URLs total
results = parallel_scraping(urls)
# Process results
successful = [r for r in results if 'error' not in r]
errors = [r for r in results if 'error' in r]
print(f"Successful: {len(successful)}, Errors: {len(errors)}")
Data Processing Pipeline
import multiprocessing
import json
import time
from pathlib import Path
def process_json_file(file_path):
"""Process a single JSON file"""
try:
with open(file_path, 'r') as f:
data = json.load(f)
# Simulate processing (e.g., data transformation)
processed_data = {
'file': file_path.name,
'record_count': len(data) if isinstance(data, list) else 1,
'size_kb': file_path.stat().st_size / 1024,
'processed_at': time.time()
}
return processed_data
except Exception as e:
return {'file': file_path.name, 'error': str(e)}
def batch_process_files(directory_path, max_workers=None):
"""Process all JSON files in a directory using multiprocessing"""
json_files = list(Path(directory_path).glob('*.json'))
if not json_files:
print("No JSON files found")
return []
if max_workers is None:
max_workers = min(len(json_files), multiprocessing.cpu_count())
print(f"Processing {len(json_files)} files with {max_workers} workers")
start_time = time.time()
with multiprocessing.Pool(processes=max_workers) as pool:
results = pool.map(process_json_file, json_files)
end_time = time.time()
print(f"Processing completed in {end_time - start_time:.2f} seconds")
return results
Performance Comparisons
Here’s a benchmark comparing different approaches:
Approach | CPU Cores Used | Memory Usage | Best For | Limitations |
---|---|---|---|---|
Sequential | 1 | Low | I/O bound tasks, simple scripts | Doesn’t utilize multiple cores |
Threading | 1 (due to GIL) | Medium | I/O bound, concurrent operations | GIL prevents true parallelism |
Multiprocessing | All available | High | CPU intensive tasks | Higher memory usage, IPC overhead |
AsyncIO | 1 | Low | High concurrency I/O | Single-threaded, requires async/await |
Performance test results for CPU-intensive tasks (4-core system):
import multiprocessing
import time
import threading
def benchmark_approaches():
def cpu_task(n):
return sum(i*i for i in range(n))
numbers = [100000] * 8
# Sequential
start = time.time()
seq_results = [cpu_task(n) for n in numbers]
seq_time = time.time() - start
# Multiprocessing
start = time.time()
with multiprocessing.Pool() as pool:
mp_results = pool.map(cpu_task, numbers)
mp_time = time.time() - start
# Threading (for comparison)
start = time.time()
threads = []
thread_results = []
def thread_worker(n):
thread_results.append(cpu_task(n))
for n in numbers:
t = threading.Thread(target=thread_worker, args=(n,))
threads.append(t)
t.start()
for t in threads:
t.join()
thread_time = time.time() - start
print(f"Sequential: {seq_time:.2f}s")
print(f"Threading: {thread_time:.2f}s")
print(f"Multiprocessing: {mp_time:.2f}s")
print(f"Speedup: {seq_time/mp_time:.1f}x")
if __name__ == "__main__":
benchmark_approaches()
Best Practices and Common Pitfalls
Best Practices
- Use context managers – Always use
with multiprocessing.Pool()
to ensure proper cleanup - Guard main execution – Always use
if __name__ == "__main__":
to prevent recursive process creation - Choose optimal worker count – Start with
multiprocessing.cpu_count()
but adjust based on your specific workload - Minimize data transfer – Large objects passed between processes incur serialization overhead
- Use appropriate data structures – Prefer multiprocessing.Queue over regular queues for inter-process communication
Common Issues and Solutions
# Issue: Shared state problems
# WRONG - This won't work as expected
counter = 0
def increment_counter():
global counter
counter += 1 # Each process has its own copy
# CORRECT - Use shared memory
def safe_counter_example():
def increment_shared(shared_counter, lock):
with lock:
shared_counter.value += 1
shared_counter = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
processes = []
for _ in range(4):
p = multiprocessing.Process(target=increment_shared,
args=(shared_counter, lock))
p.start()
processes.append(p)
for p in processes:
p.join()
print(f"Final counter value: {shared_counter.value}")
# Issue: Pickle errors with complex objects
# WRONG - Lambda functions can't be pickled
# pool.map(lambda x: x*2, numbers) # This will fail
# CORRECT - Use regular functions or functools.partial
from functools import partial
def multiply(x, factor):
return x * factor
def pickle_safe_example():
numbers = [1, 2, 3, 4, 5]
multiply_by_3 = partial(multiply, factor=3)
with multiprocessing.Pool() as pool:
results = pool.map(multiply_by_3, numbers)
return results
Memory Management and Process Monitoring
import multiprocessing
import psutil
import os
def monitor_process_resources():
"""Monitor memory and CPU usage of multiprocessing tasks"""
def memory_intensive_task(size):
# Create large list to demonstrate memory usage
data = list(range(size))
process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024 / 1024
return f"PID {os.getpid()}: Processed {size} items, Memory: {memory_mb:.1f}MB"
sizes = [1000000, 2000000, 1500000, 3000000]
print("Starting multiprocessing with resource monitoring...")
with multiprocessing.Pool(processes=2) as pool:
results = pool.map(memory_intensive_task, sizes)
for result in results:
print(result)
# Process cleanup and error handling
def robust_multiprocessing_example():
def worker_with_error_handling(item):
try:
# Simulate work that might fail
if item % 7 == 0:
raise ValueError(f"Item {item} caused an error")
return item * 2
except Exception as e:
return f"Error processing {item}: {str(e)}"
items = list(range(20))
try:
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(worker_with_error_handling, items)
# Separate successful results from errors
successful = [r for r in results if not isinstance(r, str) or not r.startswith("Error")]
errors = [r for r in results if isinstance(r, str) and r.startswith("Error")]
print(f"Successful: {len(successful)}, Errors: {len(errors)}")
except KeyboardInterrupt:
print("Process interrupted by user")
except Exception as e:
print(f"Unexpected error: {e}")
if __name__ == "__main__":
monitor_process_resources()
robust_multiprocessing_example()
Advanced Techniques and Optimization
Custom Process Pool with Progress Tracking
import multiprocessing
from multiprocessing import Pool
import time
from tqdm import tqdm
def trackable_worker(args):
"""Worker function that can be tracked"""
item, delay = args
time.sleep(delay) # Simulate work
return f"Processed {item}"
def parallel_with_progress(items, num_workers=4):
"""Run multiprocessing with progress bar"""
# Prepare arguments (item, processing_time)
work_items = [(item, 0.1) for item in items]
with Pool(processes=num_workers) as pool:
# Use imap for lazy evaluation and progress tracking
results = []
with tqdm(total=len(work_items), desc="Processing") as pbar:
for result in pool.imap(trackable_worker, work_items):
results.append(result)
pbar.update(1)
return results
# Dynamic worker adjustment based on system load
def adaptive_multiprocessing(tasks, max_workers=None):
"""Adjust worker count based on system resources"""
if max_workers is None:
# Start with CPU count but adjust based on memory
available_memory_gb = psutil.virtual_memory().available / (1024**3)
max_workers = min(
multiprocessing.cpu_count(),
max(1, int(available_memory_gb / 0.5)) # 0.5GB per worker
)
print(f"Using {max_workers} workers based on available resources")
with multiprocessing.Pool(processes=max_workers) as pool:
results = pool.map(cpu_intensive_task, tasks)
return results
Integration with Other Technologies
Multiprocessing works well with popular Python libraries:
# Integration with pandas for data processing
import pandas as pd
import numpy as np
from multiprocessing import Pool
def process_dataframe_chunk(chunk):
"""Process a chunk of DataFrame"""
# Example: Calculate rolling statistics
chunk['rolling_mean'] = chunk['value'].rolling(window=5).mean()
chunk['rolling_std'] = chunk['value'].rolling(window=5).std()
return chunk
def parallel_dataframe_processing(df, num_chunks=4):
"""Process large DataFrame in parallel chunks"""
# Split DataFrame into chunks
chunks = np.array_split(df, num_chunks)
with Pool(processes=num_chunks) as pool:
processed_chunks = pool.map(process_dataframe_chunk, chunks)
# Combine results
result_df = pd.concat(processed_chunks, ignore_index=True)
return result_df
# Example usage
if __name__ == "__main__":
# Create sample DataFrame
data = pd.DataFrame({
'value': np.random.randn(10000),
'category': np.random.choice(['A', 'B', 'C'], 10000)
})
result = parallel_dataframe_processing(data)
print(f"Processed DataFrame shape: {result.shape}")
For more detailed information about Python multiprocessing, refer to the official Python multiprocessing documentation. The concurrent.futures module provides a higher-level interface that’s often easier to use for simple parallel processing tasks.
Multiprocessing is particularly effective for CPU-bound tasks like mathematical computations, data processing, image manipulation, and scientific computing. For I/O-bound tasks, consider using asyncio or threading instead. The key is understanding your workload characteristics and choosing the right tool for optimal performance.

This article incorporates information and material from various online sources. We acknowledge and appreciate the work of all original authors, publishers, and websites. While every effort has been made to appropriately credit the source material, any unintentional oversight or omission does not constitute a copyright infringement. All trademarks, logos, and images mentioned are the property of their respective owners. If you believe that any content used in this article infringes upon your copyright, please contact us immediately for review and prompt action.
This article is intended for informational and educational purposes only and does not infringe on the rights of the copyright owners. If any copyrighted material has been used without proper credit or in violation of copyright laws, it is unintentional and we will rectify it promptly upon notification. Please note that the republishing, redistribution, or reproduction of part or all of the contents in any form is prohibited without express written permission from the author and website owner. For permissions or further inquiries, please contact us.