BLOG POSTS
    MangoHost Blog / How to Build a Data Processing Pipeline Using Luigi in Python on Ubuntu 24
How to Build a Data Processing Pipeline Using Luigi in Python on Ubuntu 24

How to Build a Data Processing Pipeline Using Luigi in Python on Ubuntu 24

Building robust data processing pipelines is essential for modern applications that handle large volumes of data, and Luigi stands out as a powerful Python framework for creating complex workflows with dependency management, error handling, and monitoring capabilities. While tools like Apache Airflow often dominate discussions about pipeline orchestration, Luigi offers a more lightweight, Python-centric approach that’s particularly well-suited for data scientists and developers who prefer writing pipeline logic directly in code rather than managing DAGs through web interfaces. In this guide, you’ll learn how to set up Luigi on Ubuntu 24, create your first data processing pipeline, handle common deployment challenges, and leverage advanced features like parameter handling and task monitoring.

What is Luigi and How Does It Work

Luigi, developed by Spotify, is a Python package that helps you build complex pipelines of batch jobs with dependency resolution, workflow management, visualization, and failure handling. Unlike some pipeline tools that require external schedulers, Luigi includes its own scheduler daemon that manages task execution based on dependency graphs.

The core concept revolves around Tasks, which are Python classes that define units of work. Each task declares its requirements (dependencies), parameters, and outputs. Luigi automatically builds a dependency graph and executes tasks in the correct order, skipping tasks whose outputs already exist unless explicitly told otherwise.

Key components include:

  • Tasks: Individual units of work that inherit from luigi.Task
  • Targets: Represent task outputs (files, database records, etc.)
  • Parameters: Type-safe configuration for tasks
  • Scheduler: Central coordinator that manages task execution
  • Workers: Processes that actually execute tasks

Setting Up Luigi on Ubuntu 24

First, ensure your Ubuntu 24 system is updated and has Python 3.10+ installed:

sudo apt update && sudo apt upgrade -y
python3 --version
sudo apt install python3-pip python3-venv python3-dev -y

Create a dedicated project directory and virtual environment:

mkdir luigi-pipeline && cd luigi-pipeline
python3 -m venv luigi-env
source luigi-env/bin/activate

Install Luigi and common dependencies:

pip install luigi pandas requests sqlalchemy psycopg2-binary boto3

For production deployments on VPS or dedicated servers, you’ll want to install additional monitoring tools:

pip install prometheus-client structlog

Verify the installation by starting the Luigi daemon:

luigid --background --pidfile /tmp/luigi.pid --logdir /tmp/luigi-logs --state-path /tmp/luigi-state.pickle

The Luigi web interface should now be accessible at http://localhost:8082. You can check if the daemon is running:

ps aux | grep luigid

Building Your First Data Processing Pipeline

Let’s create a practical example that demonstrates a common data processing workflow: downloading data, cleaning it, and generating a summary report.

Create the main pipeline file data_pipeline.py:

import luigi
import pandas as pd
import requests
import json
from datetime import datetime, date
import os

class DownloadData(luigi.Task):
    """Download raw data from an API or external source"""
    
    date_param = luigi.DateParameter(default=date.today())
    
    def output(self):
        return luigi.LocalTarget(f'data/raw/data_{self.date_param}.json')
    
    def run(self):
        # Simulate API call - replace with your actual data source
        sample_data = [
            {'id': i, 'value': i * 2.5, 'category': 'A' if i % 2 == 0 else 'B', 'timestamp': datetime.now().isoformat()}
            for i in range(1000)
        ]
        
        os.makedirs('data/raw', exist_ok=True)
        
        with self.output().open('w') as f:
            json.dump(sample_data, f, indent=2)

class CleanData(luigi.Task):
    """Clean and validate the downloaded data"""
    
    date_param = luigi.DateParameter(default=date.today())
    
    def requires(self):
        return DownloadData(date_param=self.date_param)
    
    def output(self):
        return luigi.LocalTarget(f'data/processed/cleaned_{self.date_param}.csv')
    
    def run(self):
        # Read the raw data
        with self.input().open('r') as f:
            raw_data = json.load(f)
        
        # Convert to DataFrame and clean
        df = pd.DataFrame(raw_data)
        
        # Example cleaning operations
        df = df[df['value'] > 0]  # Remove invalid records
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df['date'] = df['timestamp'].dt.date
        
        os.makedirs('data/processed', exist_ok=True)
        
        # Save cleaned data
        with self.output().open('w') as f:
            df.to_csv(f, index=False)

class GenerateReport(luigi.Task):
    """Generate summary statistics and reports"""
    
    date_param = luigi.DateParameter(default=date.today())
    
    def requires(self):
        return CleanData(date_param=self.date_param)
    
    def output(self):
        return luigi.LocalTarget(f'data/reports/report_{self.date_param}.json')
    
    def run(self):
        # Read cleaned data
        with self.input().open('r') as f:
            df = pd.read_csv(f)
        
        # Generate summary statistics
        report = {
            'date': str(self.date_param),
            'total_records': len(df),
            'avg_value': df['value'].mean(),
            'max_value': df['value'].max(),
            'category_distribution': df['category'].value_counts().to_dict(),
            'generated_at': datetime.now().isoformat()
        }
        
        os.makedirs('data/reports', exist_ok=True)
        
        with self.output().open('w') as f:
            json.dump(report, f, indent=2)

# Main pipeline task that orchestrates everything
class DataPipeline(luigi.Task):
    """Main pipeline that coordinates all tasks"""
    
    date_param = luigi.DateParameter(default=date.today())
    
    def requires(self):
        return GenerateReport(date_param=self.date_param)
    
    def output(self):
        return luigi.LocalTarget(f'data/pipeline_complete_{self.date_param}.flag')
    
    def run(self):
        # Create completion flag
        with self.output().open('w') as f:
            f.write(f'Pipeline completed at {datetime.now().isoformat()}')

if __name__ == '__main__':
    luigi.run()

Run the pipeline:

python data_pipeline.py DataPipeline --local-scheduler

For production use, run with the central scheduler:

python data_pipeline.py DataPipeline

Advanced Configuration and Production Setup

Create a Luigi configuration file luigi.cfg for production settings:

[core]
log_level=INFO
keep_alive=False
no_configure_logging=False

[scheduler]
record_task_history=True
state_path=/var/lib/luigi/luigi-state.pickle

[task_history]
db_connection=sqlite:///var/lib/luigi/luigi-task-history.db

[worker]
keep_alive=False
max_reschedules=3
retry_delay=60

[email]
force-send=False
# Configure SMTP settings for notifications
# smtp_host=smtp.gmail.com
# smtp_port=587
# smtp_login=your-email@gmail.com
# smtp_password=your-app-password

For database integration, create a task that works with PostgreSQL:

import luigi
import luigi.contrib.postgres
import pandas as pd
from sqlalchemy import create_engine

class DatabaseTask(luigi.contrib.postgres.PostgresTask):
    """Example task that writes to PostgreSQL"""
    
    date_param = luigi.DateParameter()
    host = 'localhost'
    database = 'datawarehouse'
    user = 'luigi_user'
    password = 'luigi_password'
    table = 'processed_data'
    
    def requires(self):
        return CleanData(date_param=self.date_param)
    
    def rows(self):
        # Read cleaned data and yield rows for database insertion
        with self.input().open('r') as f:
            df = pd.read_csv(f)
        
        for _, row in df.iterrows():
            yield (row['id'], row['value'], row['category'], self.date_param)

Task Monitoring and Error Handling

Implement robust error handling and monitoring:

import luigi
import structlog
from luigi.contrib.prometheus import PrometheusMetrics
import time

logger = structlog.get_logger()

class MonitoredTask(luigi.Task):
    """Base task with monitoring and error handling"""
    
    retry_count = luigi.IntParameter(default=3)
    
    def on_failure(self, exception):
        logger.error("Task failed", 
                    task=self.__class__.__name__, 
                    error=str(exception))
        # Send alert to monitoring system
        return super().on_failure(exception)
    
    def on_success(self):
        logger.info("Task completed successfully", 
                   task=self.__class__.__name__)
        return super().on_success()

class ResilientDataDownload(MonitoredTask):
    """Data download with retry logic and exponential backoff"""
    
    url = luigi.Parameter()
    date_param = luigi.DateParameter()
    
    def output(self):
        return luigi.LocalTarget(f'data/downloads/{self.date_param}.json')
    
    def run(self):
        for attempt in range(self.retry_count):
            try:
                response = requests.get(self.url, timeout=30)
                response.raise_for_status()
                
                with self.output().open('w') as f:
                    f.write(response.text)
                return
                
            except requests.RequestException as e:
                if attempt == self.retry_count - 1:
                    raise
                
                wait_time = 2 ** attempt
                logger.warning(f"Download failed, retrying in {wait_time}s", 
                             attempt=attempt + 1, error=str(e))
                time.sleep(wait_time)

Luigi vs Alternatives Comparison

Feature Luigi Apache Airflow Prefect Dagster
Learning Curve Low – Pure Python Medium – DAG concepts Low – Pythonic Medium – Type system
Web UI Basic monitoring Rich dashboard Modern interface Advanced debugging
Scheduling External cron needed Built-in scheduler Hybrid approach External scheduler
Resource Usage Lightweight Resource intensive Medium Medium
Community Stable, smaller Large, active Growing fast Emerging

Real-World Use Cases and Best Practices

Luigi excels in several scenarios:

  • ETL Pipelines: Daily data extraction from APIs, transformation, and loading into data warehouses
  • ML Model Training: Multi-stage machine learning workflows with feature engineering and model validation
  • Report Generation: Automated business intelligence reports with dependency management
  • Data Quality Monitoring: Continuous data validation and anomaly detection workflows

Here’s a production-ready example for a ML training pipeline:

import luigi
import luigi.contrib.s3
import pickle
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report

class ModelTrainingPipeline(luigi.Task):
    """Complete ML training pipeline"""
    
    model_version = luigi.Parameter()
    s3_bucket = luigi.Parameter(default='ml-models')
    
    def requires(self):
        return [
            PrepareFeatures(model_version=self.model_version),
            ValidateData(model_version=self.model_version)
        ]
    
    def output(self):
        return {
            'model': luigi.contrib.s3.S3Target(f's3://{self.s3_bucket}/models/model_{self.model_version}.pkl'),
            'metrics': luigi.LocalTarget(f'models/metrics_{self.model_version}.json')
        }
    
    def run(self):
        # Load prepared features
        with self.input()[0].open('r') as f:
            df = pd.read_csv(f)
        
        # Split data
        X = df.drop('target', axis=1)
        y = df['target']
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        
        # Train model
        model = RandomForestClassifier(n_estimators=100, random_state=42)
        model.fit(X_train, y_train)
        
        # Evaluate
        y_pred = model.predict(X_test)
        metrics = classification_report(y_test, y_pred, output_dict=True)
        
        # Save model to S3
        with self.output()['model'].open('wb') as f:
            pickle.dump(model, f)
        
        # Save metrics locally
        with self.output()['metrics'].open('w') as f:
            json.dump(metrics, f, indent=2)

Production Deployment and Scaling

For production deployment, create a systemd service file /etc/systemd/system/luigid.service:

[Unit]
Description=Luigi Scheduler
After=network.target

[Service]
Type=simple
User=luigi
Group=luigi
WorkingDirectory=/opt/luigi
ExecStart=/opt/luigi/luigi-env/bin/luigid --port=8082 --pidfile=/var/run/luigi/luigi.pid --logdir=/var/log/luigi --state-path=/var/lib/luigi/luigi-state.pickle
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

Enable and start the service:

sudo systemctl enable luigid
sudo systemctl start luigid
sudo systemctl status luigid

For horizontal scaling, you can run multiple workers:

# Run workers on different machines pointing to central scheduler
python -m luigi --module data_pipeline DataPipeline --scheduler-host=your-scheduler-host

Common Issues and Troubleshooting

The most frequent issues you’ll encounter:

  • Task not running: Check if output targets already exist. Luigi skips tasks with existing outputs
  • Import errors: Ensure your Python path includes your task modules
  • Scheduler connection issues: Verify luigid is running and accessible on the correct port
  • Permission errors: Check file system permissions for output directories
  • Memory issues: Monitor worker memory usage, especially with large datasets

Debug common problems:

# Force task re-execution
python data_pipeline.py DataPipeline --local-scheduler --no-lock

# Check task status
python data_pipeline.py DataPipeline --local-scheduler --dry-run

# Enable debug logging
python data_pipeline.py DataPipeline --local-scheduler --log-level=DEBUG

Monitor Luigi performance and task execution through the web interface at http://localhost:8082, where you can view task dependency graphs, execution times, and failure reasons. For comprehensive documentation and advanced features, refer to the official Luigi documentation.

Luigi provides a solid foundation for building maintainable data pipelines, especially when you need fine-grained control over task execution and prefer writing pipeline logic in pure Python rather than configuration files or domain-specific languages.



This article incorporates information and material from various online sources. We acknowledge and appreciate the work of all original authors, publishers, and websites. While every effort has been made to appropriately credit the source material, any unintentional oversight or omission does not constitute a copyright infringement. All trademarks, logos, and images mentioned are the property of their respective owners. If you believe that any content used in this article infringes upon your copyright, please contact us immediately for review and prompt action.

This article is intended for informational and educational purposes only and does not infringe on the rights of the copyright owners. If any copyrighted material has been used without proper credit or in violation of copyright laws, it is unintentional and we will rectify it promptly upon notification. Please note that the republishing, redistribution, or reproduction of part or all of the contents in any form is prohibited without express written permission from the author and website owner. For permissions or further inquiries, please contact us.

Leave a reply

Your email address will not be published. Required fields are marked