
How to Build a Data Processing Pipeline Using Luigi in Python on Ubuntu 24
Building robust data processing pipelines is essential for modern applications that handle large volumes of data, and Luigi stands out as a powerful Python framework for creating complex workflows with dependency management, error handling, and monitoring capabilities. While tools like Apache Airflow often dominate discussions about pipeline orchestration, Luigi offers a more lightweight, Python-centric approach that’s particularly well-suited for data scientists and developers who prefer writing pipeline logic directly in code rather than managing DAGs through web interfaces. In this guide, you’ll learn how to set up Luigi on Ubuntu 24, create your first data processing pipeline, handle common deployment challenges, and leverage advanced features like parameter handling and task monitoring.
What is Luigi and How Does It Work
Luigi, developed by Spotify, is a Python package that helps you build complex pipelines of batch jobs with dependency resolution, workflow management, visualization, and failure handling. Unlike some pipeline tools that require external schedulers, Luigi includes its own scheduler daemon that manages task execution based on dependency graphs.
The core concept revolves around Tasks, which are Python classes that define units of work. Each task declares its requirements (dependencies), parameters, and outputs. Luigi automatically builds a dependency graph and executes tasks in the correct order, skipping tasks whose outputs already exist unless explicitly told otherwise.
Key components include:
- Tasks: Individual units of work that inherit from luigi.Task
- Targets: Represent task outputs (files, database records, etc.)
- Parameters: Type-safe configuration for tasks
- Scheduler: Central coordinator that manages task execution
- Workers: Processes that actually execute tasks
Setting Up Luigi on Ubuntu 24
First, ensure your Ubuntu 24 system is updated and has Python 3.10+ installed:
sudo apt update && sudo apt upgrade -y
python3 --version
sudo apt install python3-pip python3-venv python3-dev -y
Create a dedicated project directory and virtual environment:
mkdir luigi-pipeline && cd luigi-pipeline
python3 -m venv luigi-env
source luigi-env/bin/activate
Install Luigi and common dependencies:
pip install luigi pandas requests sqlalchemy psycopg2-binary boto3
For production deployments on VPS or dedicated servers, you’ll want to install additional monitoring tools:
pip install prometheus-client structlog
Verify the installation by starting the Luigi daemon:
luigid --background --pidfile /tmp/luigi.pid --logdir /tmp/luigi-logs --state-path /tmp/luigi-state.pickle
The Luigi web interface should now be accessible at http://localhost:8082. You can check if the daemon is running:
ps aux | grep luigid
Building Your First Data Processing Pipeline
Let’s create a practical example that demonstrates a common data processing workflow: downloading data, cleaning it, and generating a summary report.
Create the main pipeline file data_pipeline.py
:
import luigi
import pandas as pd
import requests
import json
from datetime import datetime, date
import os
class DownloadData(luigi.Task):
"""Download raw data from an API or external source"""
date_param = luigi.DateParameter(default=date.today())
def output(self):
return luigi.LocalTarget(f'data/raw/data_{self.date_param}.json')
def run(self):
# Simulate API call - replace with your actual data source
sample_data = [
{'id': i, 'value': i * 2.5, 'category': 'A' if i % 2 == 0 else 'B', 'timestamp': datetime.now().isoformat()}
for i in range(1000)
]
os.makedirs('data/raw', exist_ok=True)
with self.output().open('w') as f:
json.dump(sample_data, f, indent=2)
class CleanData(luigi.Task):
"""Clean and validate the downloaded data"""
date_param = luigi.DateParameter(default=date.today())
def requires(self):
return DownloadData(date_param=self.date_param)
def output(self):
return luigi.LocalTarget(f'data/processed/cleaned_{self.date_param}.csv')
def run(self):
# Read the raw data
with self.input().open('r') as f:
raw_data = json.load(f)
# Convert to DataFrame and clean
df = pd.DataFrame(raw_data)
# Example cleaning operations
df = df[df['value'] > 0] # Remove invalid records
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['date'] = df['timestamp'].dt.date
os.makedirs('data/processed', exist_ok=True)
# Save cleaned data
with self.output().open('w') as f:
df.to_csv(f, index=False)
class GenerateReport(luigi.Task):
"""Generate summary statistics and reports"""
date_param = luigi.DateParameter(default=date.today())
def requires(self):
return CleanData(date_param=self.date_param)
def output(self):
return luigi.LocalTarget(f'data/reports/report_{self.date_param}.json')
def run(self):
# Read cleaned data
with self.input().open('r') as f:
df = pd.read_csv(f)
# Generate summary statistics
report = {
'date': str(self.date_param),
'total_records': len(df),
'avg_value': df['value'].mean(),
'max_value': df['value'].max(),
'category_distribution': df['category'].value_counts().to_dict(),
'generated_at': datetime.now().isoformat()
}
os.makedirs('data/reports', exist_ok=True)
with self.output().open('w') as f:
json.dump(report, f, indent=2)
# Main pipeline task that orchestrates everything
class DataPipeline(luigi.Task):
"""Main pipeline that coordinates all tasks"""
date_param = luigi.DateParameter(default=date.today())
def requires(self):
return GenerateReport(date_param=self.date_param)
def output(self):
return luigi.LocalTarget(f'data/pipeline_complete_{self.date_param}.flag')
def run(self):
# Create completion flag
with self.output().open('w') as f:
f.write(f'Pipeline completed at {datetime.now().isoformat()}')
if __name__ == '__main__':
luigi.run()
Run the pipeline:
python data_pipeline.py DataPipeline --local-scheduler
For production use, run with the central scheduler:
python data_pipeline.py DataPipeline
Advanced Configuration and Production Setup
Create a Luigi configuration file luigi.cfg
for production settings:
[core]
log_level=INFO
keep_alive=False
no_configure_logging=False
[scheduler]
record_task_history=True
state_path=/var/lib/luigi/luigi-state.pickle
[task_history]
db_connection=sqlite:///var/lib/luigi/luigi-task-history.db
[worker]
keep_alive=False
max_reschedules=3
retry_delay=60
[email]
force-send=False
# Configure SMTP settings for notifications
# smtp_host=smtp.gmail.com
# smtp_port=587
# smtp_login=your-email@gmail.com
# smtp_password=your-app-password
For database integration, create a task that works with PostgreSQL:
import luigi
import luigi.contrib.postgres
import pandas as pd
from sqlalchemy import create_engine
class DatabaseTask(luigi.contrib.postgres.PostgresTask):
"""Example task that writes to PostgreSQL"""
date_param = luigi.DateParameter()
host = 'localhost'
database = 'datawarehouse'
user = 'luigi_user'
password = 'luigi_password'
table = 'processed_data'
def requires(self):
return CleanData(date_param=self.date_param)
def rows(self):
# Read cleaned data and yield rows for database insertion
with self.input().open('r') as f:
df = pd.read_csv(f)
for _, row in df.iterrows():
yield (row['id'], row['value'], row['category'], self.date_param)
Task Monitoring and Error Handling
Implement robust error handling and monitoring:
import luigi
import structlog
from luigi.contrib.prometheus import PrometheusMetrics
import time
logger = structlog.get_logger()
class MonitoredTask(luigi.Task):
"""Base task with monitoring and error handling"""
retry_count = luigi.IntParameter(default=3)
def on_failure(self, exception):
logger.error("Task failed",
task=self.__class__.__name__,
error=str(exception))
# Send alert to monitoring system
return super().on_failure(exception)
def on_success(self):
logger.info("Task completed successfully",
task=self.__class__.__name__)
return super().on_success()
class ResilientDataDownload(MonitoredTask):
"""Data download with retry logic and exponential backoff"""
url = luigi.Parameter()
date_param = luigi.DateParameter()
def output(self):
return luigi.LocalTarget(f'data/downloads/{self.date_param}.json')
def run(self):
for attempt in range(self.retry_count):
try:
response = requests.get(self.url, timeout=30)
response.raise_for_status()
with self.output().open('w') as f:
f.write(response.text)
return
except requests.RequestException as e:
if attempt == self.retry_count - 1:
raise
wait_time = 2 ** attempt
logger.warning(f"Download failed, retrying in {wait_time}s",
attempt=attempt + 1, error=str(e))
time.sleep(wait_time)
Luigi vs Alternatives Comparison
Feature | Luigi | Apache Airflow | Prefect | Dagster |
---|---|---|---|---|
Learning Curve | Low – Pure Python | Medium – DAG concepts | Low – Pythonic | Medium – Type system |
Web UI | Basic monitoring | Rich dashboard | Modern interface | Advanced debugging |
Scheduling | External cron needed | Built-in scheduler | Hybrid approach | External scheduler |
Resource Usage | Lightweight | Resource intensive | Medium | Medium |
Community | Stable, smaller | Large, active | Growing fast | Emerging |
Real-World Use Cases and Best Practices
Luigi excels in several scenarios:
- ETL Pipelines: Daily data extraction from APIs, transformation, and loading into data warehouses
- ML Model Training: Multi-stage machine learning workflows with feature engineering and model validation
- Report Generation: Automated business intelligence reports with dependency management
- Data Quality Monitoring: Continuous data validation and anomaly detection workflows
Here’s a production-ready example for a ML training pipeline:
import luigi
import luigi.contrib.s3
import pickle
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
class ModelTrainingPipeline(luigi.Task):
"""Complete ML training pipeline"""
model_version = luigi.Parameter()
s3_bucket = luigi.Parameter(default='ml-models')
def requires(self):
return [
PrepareFeatures(model_version=self.model_version),
ValidateData(model_version=self.model_version)
]
def output(self):
return {
'model': luigi.contrib.s3.S3Target(f's3://{self.s3_bucket}/models/model_{self.model_version}.pkl'),
'metrics': luigi.LocalTarget(f'models/metrics_{self.model_version}.json')
}
def run(self):
# Load prepared features
with self.input()[0].open('r') as f:
df = pd.read_csv(f)
# Split data
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
metrics = classification_report(y_test, y_pred, output_dict=True)
# Save model to S3
with self.output()['model'].open('wb') as f:
pickle.dump(model, f)
# Save metrics locally
with self.output()['metrics'].open('w') as f:
json.dump(metrics, f, indent=2)
Production Deployment and Scaling
For production deployment, create a systemd service file /etc/systemd/system/luigid.service
:
[Unit]
Description=Luigi Scheduler
After=network.target
[Service]
Type=simple
User=luigi
Group=luigi
WorkingDirectory=/opt/luigi
ExecStart=/opt/luigi/luigi-env/bin/luigid --port=8082 --pidfile=/var/run/luigi/luigi.pid --logdir=/var/log/luigi --state-path=/var/lib/luigi/luigi-state.pickle
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
Enable and start the service:
sudo systemctl enable luigid
sudo systemctl start luigid
sudo systemctl status luigid
For horizontal scaling, you can run multiple workers:
# Run workers on different machines pointing to central scheduler
python -m luigi --module data_pipeline DataPipeline --scheduler-host=your-scheduler-host
Common Issues and Troubleshooting
The most frequent issues you’ll encounter:
- Task not running: Check if output targets already exist. Luigi skips tasks with existing outputs
- Import errors: Ensure your Python path includes your task modules
- Scheduler connection issues: Verify luigid is running and accessible on the correct port
- Permission errors: Check file system permissions for output directories
- Memory issues: Monitor worker memory usage, especially with large datasets
Debug common problems:
# Force task re-execution
python data_pipeline.py DataPipeline --local-scheduler --no-lock
# Check task status
python data_pipeline.py DataPipeline --local-scheduler --dry-run
# Enable debug logging
python data_pipeline.py DataPipeline --local-scheduler --log-level=DEBUG
Monitor Luigi performance and task execution through the web interface at http://localhost:8082, where you can view task dependency graphs, execution times, and failure reasons. For comprehensive documentation and advanced features, refer to the official Luigi documentation.
Luigi provides a solid foundation for building maintainable data pipelines, especially when you need fine-grained control over task execution and prefer writing pipeline logic in pure Python rather than configuration files or domain-specific languages.

This article incorporates information and material from various online sources. We acknowledge and appreciate the work of all original authors, publishers, and websites. While every effort has been made to appropriately credit the source material, any unintentional oversight or omission does not constitute a copyright infringement. All trademarks, logos, and images mentioned are the property of their respective owners. If you believe that any content used in this article infringes upon your copyright, please contact us immediately for review and prompt action.
This article is intended for informational and educational purposes only and does not infringe on the rights of the copyright owners. If any copyrighted material has been used without proper credit or in violation of copyright laws, it is unintentional and we will rectify it promptly upon notification. Please note that the republishing, redistribution, or reproduction of part or all of the contents in any form is prohibited without express written permission from the author and website owner. For permissions or further inquiries, please contact us.