BLOG POSTS
    MangoHost Blog / Python scikit-learn Tutorial – Machine Learning Basics
Python scikit-learn Tutorial – Machine Learning Basics

Python scikit-learn Tutorial – Machine Learning Basics

Scikit-learn is one of the most widely-used machine learning libraries in Python, providing a comprehensive toolkit for data scientists and developers who need to implement ML solutions efficiently. Unlike deep learning frameworks that focus on neural networks, scikit-learn excels at traditional machine learning algorithms like classification, regression, clustering, and dimensionality reduction. This tutorial will take you through the essential concepts, practical implementations, and real-world applications of scikit-learn, covering everything from basic setup to advanced techniques and common troubleshooting scenarios you’ll encounter in production environments.

How Scikit-learn Works – Technical Foundation

Scikit-learn follows a consistent API design pattern that makes it intuitive once you understand the core concepts. Every machine learning algorithm in scikit-learn is implemented as an estimator object with standardized methods:

  • fit() – Trains the model on your data
  • predict() – Makes predictions on new data
  • score() – Evaluates model performance
  • transform() – Applies data transformations (for preprocessors)

The library is built on top of NumPy, SciPy, and matplotlib, leveraging optimized C and Fortran libraries under the hood for performance. This architecture allows scikit-learn to handle datasets with millions of samples while maintaining a Python-friendly interface.

Here’s the fundamental workflow pattern you’ll use repeatedly:

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# Load and split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize and train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# Make predictions and evaluate
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)

Installation and Setup Guide

Getting scikit-learn running properly requires attention to dependencies and virtual environment management. Here’s the recommended setup process:

Step 1: Create a Virtual Environment

# Using venv (Python 3.3+)
python -m venv sklearn_env
source sklearn_env/bin/activate  # Linux/Mac
# or
sklearn_env\Scripts\activate  # Windows

# Using conda (recommended for scientific computing)
conda create -n sklearn_env python=3.9
conda activate sklearn_env

Step 2: Install Scikit-learn and Dependencies

# Basic installation
pip install scikit-learn

# Full data science stack
pip install scikit-learn pandas numpy matplotlib seaborn jupyter

# Or using conda (often more stable for scientific packages)
conda install scikit-learn pandas numpy matplotlib seaborn jupyter

Step 3: Verify Installation

import sklearn
print(f"Scikit-learn version: {sklearn.__version__}")

# Check available modules
from sklearn import datasets, model_selection, ensemble
print("Installation successful!")

For production deployments on VPS or dedicated servers, you’ll want to consider using Docker containers to ensure consistent environments across development and production systems.

Essential Machine Learning Workflows

Let’s dive into practical implementations of the most common machine learning tasks. I’ll show you complete examples that you can run immediately.

Classification Example – Predicting Customer Churn

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.datasets import make_classification

# Generate sample dataset (replace with your actual data)
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10, 
                          n_redundant=10, n_clusters_per_class=1, random_state=42)

# Split the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, 
                                                    stratify=y, random_state=42)

# Scale features (important for many algorithms)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Train multiple models for comparison
models = {
    'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
    'SVM': SVC(kernel='rbf', random_state=42),
    'Logistic Regression': LogisticRegression(random_state=42)
}

results = {}
for name, model in models.items():
    # Use scaled data for SVM and Logistic Regression
    if name in ['SVM', 'Logistic Regression']:
        model.fit(X_train_scaled, y_train)
        predictions = model.predict(X_test_scaled)
    else:
        model.fit(X_train, y_train)
        predictions = model.predict(X_test)
    
    results[name] = accuracy_score(y_test, predictions)
    print(f"{name} Accuracy: {results[name]:.4f}")

Regression Example – Price Prediction

from sklearn.datasets import fetch_california_housing
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
import matplotlib.pyplot as plt

# Load housing dataset
housing = fetch_california_housing()
X, y = housing.data, housing.target

# Split and train
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Compare regression models
rf_model = RandomForestRegressor(n_estimators=100, random_state=42)
lr_model = LinearRegression()

rf_model.fit(X_train, y_train)
lr_model.fit(X_train, y_train)

# Evaluate both models
rf_pred = rf_model.predict(X_test)
lr_pred = lr_model.predict(X_test)

print(f"Random Forest RMSE: {np.sqrt(mean_squared_error(y_test, rf_pred)):.4f}")
print(f"Linear Regression RMSE: {np.sqrt(mean_squared_error(y_test, lr_pred)):.4f}")
print(f"Random Forest R²: {r2_score(y_test, rf_pred):.4f}")
print(f"Linear Regression R²: {r2_score(y_test, lr_pred):.4f}")

Algorithm Comparison and Selection

Choosing the right algorithm depends on your data characteristics, performance requirements, and interpretability needs. Here’s a practical comparison of popular scikit-learn algorithms:

Algorithm Best For Pros Cons Training Time Prediction Time
Random Forest Tabular data, feature importance Handles missing values, low overfitting Can be slow on large datasets Medium Fast
SVM High-dimensional data, text classification Effective in high dimensions Slow on large datasets, needs scaling Slow Fast
Logistic Regression Linear relationships, probability estimates Fast, interpretable, probabilistic Assumes linear relationships Fast Very Fast
XGBoost Competitions, complex patterns Often highest accuracy Hyperparameter tuning required Medium Fast
K-Means Customer segmentation, clustering Simple, fast, scalable Needs predefined cluster count Fast Very Fast

Here’s a practical algorithm selection helper:

def recommend_algorithm(n_samples, n_features, problem_type, need_probability=False):
    """
    Simple algorithm recommendation based on data characteristics
    """
    recommendations = []
    
    if problem_type == 'classification':
        if n_samples < 100000:
            if need_probability:
                recommendations.append('Logistic Regression')
            recommendations.extend(['Random Forest', 'SVM'])
        else:
            recommendations.extend(['Logistic Regression', 'SGD Classifier'])
            
    elif problem_type == 'regression':
        if n_samples < 100000:
            recommendations.extend(['Random Forest', 'SVR'])
        else:
            recommendations.extend(['Linear Regression', 'SGD Regressor'])
    
    return recommendations

# Example usage
print(recommend_algorithm(10000, 50, 'classification', need_probability=True))

Real-World Use Cases and Applications

Let me show you some practical applications I've implemented in production environments, including the challenges you'll face and how to solve them.

Use Case 1: Log Analysis and Anomaly Detection

This is particularly relevant for system administrators managing servers. Here's how to detect unusual patterns in server logs:

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
import re
from datetime import datetime

class LogAnomalyDetector:
    def __init__(self):
        self.vectorizer = TfidfVectorizer(max_features=1000, stop_words='english')
        self.scaler = StandardScaler()
        self.clusterer = DBSCAN(eps=0.5, min_samples=5)
        
    def preprocess_logs(self, log_lines):
        """Extract features from log lines"""
        features = []
        for line in log_lines:
            # Extract timestamp, IP, response code, etc.
            timestamp = re.search(r'\[(.*?)\]', line)
            ip = re.search(r'(\d+\.\d+\.\d+\.\d+)', line)
            response_code = re.search(r'" (\d{3}) ', line)
            
            feature_dict = {
                'hour': datetime.strptime(timestamp.group(1), '%d/%b/%Y:%H:%M:%S %z').hour if timestamp else 0,
                'response_code': int(response_code.group(1)) if response_code else 200,
                'ip': ip.group(1) if ip else 'unknown'
            }
            features.append(feature_dict)
        return features
    
    def fit_predict(self, log_lines):
        """Detect anomalies in log data"""
        # Convert logs to feature vectors
        text_features = self.vectorizer.fit_transform([line.lower() for line in log_lines])
        
        # Combine with numerical features
        numerical_features = self.preprocess_logs(log_lines)
        
        # Cluster and identify outliers
        clusters = self.clusterer.fit_predict(text_features.toarray())
        anomalies = [i for i, cluster in enumerate(clusters) if cluster == -1]
        
        return anomalies

# Usage example
detector = LogAnomalyDetector()
sample_logs = [
    '192.168.1.1 - - [25/Dec/2023:10:00:00 +0000] "GET /index.html HTTP/1.1" 200 1234',
    '192.168.1.1 - - [25/Dec/2023:10:00:01 +0000] "GET /about.html HTTP/1.1" 200 567',
    '10.0.0.1 - - [25/Dec/2023:10:00:02 +0000] "POST /admin/shell.php HTTP/1.1" 404 0'  # Suspicious
]
anomalies = detector.fit_predict(sample_logs)
print(f"Found {len(anomalies)} potential anomalies")

Use Case 2: Resource Usage Prediction for Auto-scaling

Perfect for VPS and dedicated server management:

import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error
from datetime import datetime, timedelta

class ResourcePredictor:
    def __init__(self):
        self.cpu_model = RandomForestRegressor(n_estimators=50, random_state=42)
        self.memory_model = RandomForestRegressor(n_estimators=50, random_state=42)
        
    def create_features(self, timestamps, metrics):
        """Create time-based features for prediction"""
        features = []
        for i, ts in enumerate(timestamps):
            dt = datetime.fromtimestamp(ts)
            feature_row = [
                dt.hour,
                dt.weekday(),
                dt.day,
                metrics['cpu'][max(0, i-1)],  # Previous CPU
                metrics['memory'][max(0, i-1)],  # Previous Memory
                np.mean(metrics['cpu'][max(0, i-5):i+1]),  # CPU average last 5 points
                np.mean(metrics['memory'][max(0, i-5):i+1])  # Memory average last 5 points
            ]
            features.append(feature_row)
        return np.array(features)
    
    def train(self, historical_data):
        """Train models on historical server metrics"""
        timestamps = historical_data['timestamps']
        metrics = historical_data['metrics']
        
        X = self.create_features(timestamps, metrics)
        
        # Skip first few rows due to lookback features
        X = X[5:]
        cpu_y = metrics['cpu'][5:]
        memory_y = metrics['memory'][5:]
        
        self.cpu_model.fit(X, cpu_y)
        self.memory_model.fit(X, memory_y)
        
    def predict_next_hour(self, recent_data):
        """Predict resource usage for next hour"""
        X = self.create_features(recent_data['timestamps'], recent_data['metrics'])
        
        cpu_pred = self.cpu_model.predict(X[-1:])
        memory_pred = self.memory_model.predict(X[-1:])
        
        return {
            'cpu_prediction': cpu_pred[0],
            'memory_prediction': memory_pred[0],
            'scale_recommendation': 'up' if cpu_pred[0] > 80 or memory_pred[0] > 85 else 'maintain'
        }

# Example usage with simulated data
predictor = ResourcePredictor()

# Simulate training data
timestamps = [time.time() - (3600 * i) for i in range(168, 0, -1)]  # Last week
cpu_usage = np.random.normal(50, 15, 168)  # Simulate CPU usage
memory_usage = np.random.normal(60, 20, 168)  # Simulate memory usage

training_data = {
    'timestamps': timestamps,
    'metrics': {'cpu': cpu_usage, 'memory': memory_usage}
}

predictor.train(training_data)

# Make prediction
recent_data = {
    'timestamps': timestamps[-24:],  # Last 24 hours
    'metrics': {'cpu': cpu_usage[-24:], 'memory': memory_usage[-24:]}
}

prediction = predictor.predict_next_hour(recent_data)
print(f"Predicted CPU: {prediction['cpu_prediction']:.1f}%")
print(f"Predicted Memory: {prediction['memory_prediction']:.1f}%")
print(f"Scaling recommendation: {prediction['scale_recommendation']}")

Model Evaluation and Hyperparameter Tuning

Getting good results requires proper evaluation and tuning. Here are the techniques that actually work in practice:

from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, cross_val_score
from sklearn.metrics import make_scorer, precision_recall_fscore_support
import time

def comprehensive_evaluation(model, X, y, param_grid, cv_folds=5):
    """
    Perform thorough model evaluation with cross-validation and hyperparameter tuning
    """
    results = {}
    
    # Baseline performance
    baseline_scores = cross_val_score(model, X, y, cv=cv_folds, scoring='accuracy')
    results['baseline_accuracy'] = {
        'mean': baseline_scores.mean(),
        'std': baseline_scores.std()
    }
    
    # Grid search for best parameters
    print("Starting hyperparameter tuning...")
    start_time = time.time()
    
    grid_search = GridSearchCV(
        model, 
        param_grid, 
        cv=cv_folds, 
        scoring='accuracy',
        n_jobs=-1,
        verbose=1
    )
    
    grid_search.fit(X, y)
    tuning_time = time.time() - start_time
    
    results['best_params'] = grid_search.best_params_
    results['best_score'] = grid_search.best_score_
    results['tuning_time'] = tuning_time
    
    # Detailed evaluation of best model
    best_model = grid_search.best_estimator_
    
    # Cross-validation with multiple metrics
    scoring_metrics = ['accuracy', 'precision_macro', 'recall_macro', 'f1_macro']
    for metric in scoring_metrics:
        scores = cross_val_score(best_model, X, y, cv=cv_folds, scoring=metric)
        results[f'{metric}_cv'] = {
            'mean': scores.mean(),
            'std': scores.std()
        }
    
    return results, best_model

# Example usage
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification

# Generate sample data
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10, random_state=42)

# Define parameter grid
param_grid = {
    'n_estimators': [50, 100, 200],
    'max_depth': [None, 10, 20],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4]
}

model = RandomForestClassifier(random_state=42)
results, best_model = comprehensive_evaluation(model, X, y, param_grid)

print("\nEvaluation Results:")
for key, value in results.items():
    if isinstance(value, dict):
        print(f"{key}: {value['mean']:.4f} (+/- {value['std']*2:.4f})")
    else:
        print(f"{key}: {value}")

Common Pitfalls and Troubleshooting

Here are the issues you'll definitely encounter and how to fix them:

Data Leakage Prevention

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, f_classif

# WRONG WAY - leads to data leakage
# scaler = StandardScaler()
# X_scaled = scaler.fit_transform(X)  # Uses entire dataset
# X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2)

# CORRECT WAY - use pipelines
def create_safe_pipeline(model):
    """Create pipeline that prevents data leakage"""
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('feature_selection', SelectKBest(f_classif, k=10)),
        ('model', model)
    ])
    return pipeline

# Usage
safe_model = create_safe_pipeline(RandomForestClassifier(random_state=42))
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Now scaling and feature selection happen only on training data
safe_model.fit(X_train, y_train)
predictions = safe_model.predict(X_test)

Memory Management for Large Datasets

from sklearn.linear_model import SGDClassifier
import numpy as np

def train_large_dataset(file_path, chunk_size=10000):
    """
    Handle datasets too large to fit in memory using partial_fit
    """
    model = SGDClassifier(loss='log', random_state=42)
    
    # Initialize with first chunk to set up classes
    first_chunk = pd.read_csv(file_path, nrows=chunk_size)
    X_first = first_chunk.drop('target', axis=1).values
    y_first = first_chunk['target'].values
    
    model.partial_fit(X_first, y_first, classes=np.unique(y_first))
    
    # Process remaining chunks
    chunk_reader = pd.read_csv(file_path, chunksize=chunk_size, skiprows=chunk_size)
    
    for chunk in chunk_reader:
        X_chunk = chunk.drop('target', axis=1).values
        y_chunk = chunk['target'].values
        model.partial_fit(X_chunk, y_chunk)
    
    return model

# Alternative: Use memory mapping for NumPy arrays
def create_memory_mapped_array(data, filename):
    """Create memory-mapped array for large datasets"""
    memmap = np.memmap(filename, dtype='float32', mode='w+', shape=data.shape)
    memmap[:] = data[:]
    return memmap

Handling Categorical Variables Properly

from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.compose import ColumnTransformer

def create_preprocessing_pipeline(numerical_features, categorical_features):
    """
    Create robust preprocessing pipeline for mixed data types
    """
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', StandardScaler(), numerical_features),
            ('cat', OneHotEncoder(drop='first', sparse=False), categorical_features)
        ],
        remainder='passthrough'
    )
    
    return preprocessor

# Handle high-cardinality categorical variables
from sklearn.feature_extraction import FeatureHasher

def hash_categorical_features(categorical_data, n_features=1000):
    """
    Use feature hashing for high-cardinality categorical variables
    """
    hasher = FeatureHasher(n_features=n_features, input_type='string')
    hashed_features = hasher.transform(categorical_data)
    return hashed_features

Performance Optimization and Production Deployment

When deploying scikit-learn models in production environments, especially on VPS or dedicated servers, performance becomes critical:

import joblib
import pickle
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np

class OptimizedPreprocessor(BaseEstimator, TransformerMixin):
    """Custom preprocessor optimized for production"""
    
    def __init__(self, feature_names=None):
        self.feature_names = feature_names
        self.scalers = {}
        self.encoders = {}
    
    def fit(self, X, y=None):
        # Fit scalers and encoders
        for col in X.columns:
            if X[col].dtype in ['int64', 'float64']:
                scaler = StandardScaler()
                self.scalers[col] = scaler.fit(X[[col]])
            else:
                encoder = LabelEncoder()
                self.encoders[col] = encoder.fit(X[col])
        return self
    
    def transform(self, X):
        X_processed = X.copy()
        
        # Apply transformations
        for col in X.columns:
            if col in self.scalers:
                X_processed[col] = self.scalers[col].transform(X[[col]]).flatten()
            elif col in self.encoders:
                # Handle unseen categories gracefully
                try:
                    X_processed[col] = self.encoders[col].transform(X[col])
                except ValueError:
                    # Assign a default value for unseen categories
                    X_processed[col] = -1
        
        return X_processed.values

# Model serving class
class ModelServer:
    def __init__(self, model_path, preprocessor_path=None):
        self.model = joblib.load(model_path)
        self.preprocessor = joblib.load(preprocessor_path) if preprocessor_path else None
        
    def predict_single(self, features):
        """Optimized single prediction"""
        if isinstance(features, dict):
            features = pd.DataFrame([features])
        
        if self.preprocessor:
            features = self.preprocessor.transform(features)
        
        prediction = self.model.predict(features)[0]
        probability = None
        
        if hasattr(self.model, 'predict_proba'):
            probability = self.model.predict_proba(features)[0].max()
            
        return {
            'prediction': prediction,
            'confidence': probability,
            'model_version': getattr(self.model, 'version', '1.0')
        }
    
    def predict_batch(self, features_list):
        """Optimized batch prediction"""
        if self.preprocessor:
            features_list = self.preprocessor.transform(features_list)
            
        predictions = self.model.predict(features_list)
        
        if hasattr(self.model, 'predict_proba'):
            probabilities = self.model.predict_proba(features_list).max(axis=1)
            return list(zip(predictions, probabilities))
        
        return predictions.tolist()

# Save model efficiently
def save_production_model(model, preprocessor, model_path, preprocessor_path):
    """Save model with compression for production"""
    joblib.dump(model, model_path, compress=3)
    if preprocessor:
        joblib.dump(preprocessor, preprocessor_path, compress=3)
    
    # Verify saved model works
    loaded_model = joblib.load(model_path)
    print(f"Model saved successfully. Size: {os.path.getsize(model_path) / 1024 / 1024:.2f} MB")

# Performance monitoring
import time
from functools import wraps

def monitor_performance(func):
    """Decorator to monitor model performance"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        
        print(f"{func.__name__} took {end_time - start_time:.4f} seconds")
        return result
    return wrapper

# Usage example
@monitor_performance
def make_prediction(server, features):
    return server.predict_single(features)

Integration with Web Applications and APIs

Here's how to integrate your scikit-learn models with web applications, particularly useful for developers working with VPS deployments:

from flask import Flask, request, jsonify
import numpy as np
import pandas as pd
import redis
import json
from datetime import datetime

app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, db=0)

class ModelAPI:
    def __init__(self, model_server, cache_ttl=3600):
        self.model_server = model_server
        self.cache_ttl = cache_ttl
        
    def _get_cache_key(self, features):
        """Generate cache key from features"""
        feature_str = json.dumps(features, sort_keys=True)
        return f"prediction:{hash(feature_str)}"
    
    def predict_with_cache(self, features):
        """Make prediction with Redis caching"""
        cache_key = self._get_cache_key(features)
        
        # Check cache first
        cached_result = redis_client.get(cache_key)
        if cached_result:
            return json.loads(cached_result)
        
        # Make prediction
        result = self.model_server.predict_single(features)
        result['timestamp'] = datetime.now().isoformat()
        
        # Cache result
        redis_client.setex(cache_key, self.cache_ttl, json.dumps(result))
        
        return result

# Initialize model API
model_api = ModelAPI(ModelServer('model.joblib', 'preprocessor.joblib'))

@app.route('/predict', methods=['POST'])
def predict():
    try:
        features = request.json
        
        # Validate input
        required_fields = ['feature1', 'feature2', 'feature3']  # Adjust as needed
        if not all(field in features for field in required_fields):
            return jsonify({'error': 'Missing required features'}), 400
        
        # Make prediction
        result = model_api.predict_with_cache(features)
        
        return jsonify(result)
        
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/batch_predict', methods=['POST'])
def batch_predict():
    try:
        features_list = request.json['features']
        
        if len(features_list) > 1000:  # Limit batch size
            return jsonify({'error': 'Batch size too large'}), 400
        
        predictions = model_api.model_server.predict_batch(features_list)
        
        return jsonify({
            'predictions': predictions,
            'count': len(predictions),
            'timestamp': datetime.now().isoformat()
        })
        
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/model_info', methods=['GET'])
def model_info():
    """Return model metadata"""
    return jsonify({
        'model_type': type(model_api.model_server.model).__name__,
        'features': getattr(model_api.model_server.model, 'feature_names_in_', None),
        'version': '1.0',
        'last_updated': datetime.now().isoformat()
    })

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=False)

Advanced Features and Best Practices

Let me share some advanced techniques that separate production-ready implementations from basic tutorials:

Custom Transformers and Pipeline Components

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline

class OutlierRemover(BaseEstimator, TransformerMixin):
    """Custom transformer to remove outliers using IQR method"""
    
    def __init__(self, columns=None, factor=1.5):
        self.columns = columns
        self.factor = factor
        self.bounds = {}
    
    def fit(self, X, y=None):
        df = pd.DataFrame(X) if not isinstance(X, pd.DataFrame) else X
        columns = self.columns or df.select_dtypes(include=[np.number]).columns
        
        for col in columns:
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            
            self.bounds[col] = {
                'lower': Q1 - self.factor * IQR,
                'upper': Q3 + self.factor * IQR
            }
        
        return self
    
    def transform(self, X):
        df = pd.DataFrame(X) if not isinstance(X, pd.DataFrame) else X.copy()
        
        for col, bounds in self.bounds.items():
            if col in df.columns:
                # Clip outliers instead of removing rows
                df[col] = df[col].clip(lower=bounds['lower'], upper=bounds['upper'])
        
        return df.values if not isinstance(X, pd.DataFrame) else df

class FeatureEngineer(BaseEstimator, TransformerMixin):
    """Custom feature engineering transformer"""
    
    def __init__(self, create_interactions=True, polynomial_degree=2):
        self.create_interactions = create_interactions
        self.polynomial_degree = polynomial_degree
        self.feature_names = None
    
    def fit(self, X, y=None):
        self.feature_names = X.columns if hasattr(X, 'columns') else [f'feature_{i}' for i in range(X.shape[1])]
        return self
    
    def transform(self, X):
        df = pd.DataFrame(X, columns=self.feature_names) if not isinstance(X, pd.DataFrame) else X.copy()
        
        # Create polynomial features for numerical columns
        numerical_cols = df.select_dtypes(include=[np.number]).columns
        
        if self.polynomial_degree > 1:
            for col in numerical_cols:
                for degree in range(2, self.polynomial_degree + 1):
                    df[f'{col}_power_{degree}'] = df[col] ** degree
        
        # Create interaction features
        if self.create_interactions and len(numerical_cols) > 1:
            for i, col1 in enumerate(numerical_cols):
                for col2 in numerical_cols[i+1:]:
                    df[f'{col1}_x_{col2}'] = df[col1] * df[col2]
        
        return df

# Create advanced pipeline
def create_advanced_pipeline(model):
    """Create pipeline with custom transformers"""
    pipeline = Pipeline([
        ('outlier_removal', OutlierRemover()),
        ('feature_engineering', FeatureEngineer()),
        ('scaling', StandardScaler()),
        ('model', model)
    ])
    
    return pipeline

Model Monitoring and Drift Detection

import numpy as np
from scipy import stats
from sklearn.metrics import accuracy_score
import warnings

class ModelMonitor:
    """Monitor model performance and detect data drift"""
    
    def __init__(self, reference_data, significance_level=0.05):
        self.reference_data = reference_data
        self.significance_level = significance_level
        self.performance_history = []
        
    def detect_data_drift(self, new_data, method='ks_test'):
        """Detect data drift using statistical tests"""
        drift_detected = {}
        
        for col in range(new_data.shape[1]):
            if method == 'ks_test':
                statistic, p_value = stats.ks_2samp(
                    self.reference_data[:, col], 
                    new_data[:, col]
                )
            elif method == 'chi2_test':
                # For categorical data
                statistic, p_value = stats.chi2_contingency(
                    np.histogram2d(self.reference_data[:, col], new_data[:, col])[0]
                )[:2]
            
            drift_detected[f'feature_{col}'] = {
                'drift_detected': p_value < self.significance_level,
                'p_value': p_value,
                'statistic': statistic
            }
        
        return drift_detected
    
    def monitor_performance(self, model, X_new, y_new):
        """Monitor model performance over time"""
        predictions = model.predict(X_new)
        current_accuracy = accuracy_score(y_new, predictions)
        
        self.performance_history.append(current_accuracy)
        
        # Alert if performance drops significantly
        if len(self.performance_history) > 10:
            recent_avg = np.mean(self.performance_history[-10:])
            historical_avg = np.mean(self.performance_history[:-10])
            
            performance_drop = historical_avg - recent_avg
            
            if performance_drop > 0.05:  # 5% drop threshold
                warnings.warn(f"Model performance dropped by {performance_drop:.3f}")
        
        return {
            'current_accuracy': current_accuracy,
            'performance_trend': np.polyfit(range(len(self.performance_history)), 
                                          self.performance_history, 1)[0]
        }

# Usage example
monitor = ModelMonitor(X_train)

# Check for drift in new data
drift_results = monitor.detect_data_drift(X_test)
performance_results = monitor.monitor_performance(model, X_test, y_test)

print("Drift Detection Results:")
for feature, result in drift_results.items():
    if result['drift_detected']:
        print(f"⚠️  Drift detected in {feature} (p-value: {result['p_value']:.4f})")

This comprehensive guide should give you everything you need to implement scikit-learn effectively in production environments. The key is to start with simple implementations and gradually add complexity as your requirements grow. Remember to always validate your models thoroughly and monitor their performance in production.

For more detailed information, check out the official scikit-learn documentation at https://scikit-learn.org/stable/ and the comprehensive user guide at https://scikit-learn.org/stable/user_guide.html.

When deploying these solutions on production servers, consider using robust hosting solutions like MangoHost VPS for development and testing environments, or dedicated servers for high-performance machine learning workloads that require consistent computational resources.



This article incorporates information and material from various online sources. We acknowledge and appreciate the work of all original authors, publishers, and websites. While every effort has been made to appropriately credit the source material, any unintentional oversight or omission does not constitute a copyright infringement. All trademarks, logos, and images mentioned are the property of their respective owners. If you believe that any content used in this article infringes upon your copyright, please contact us immediately for review and prompt action.

This article is intended for informational and educational purposes only and does not infringe on the rights of the copyright owners. If any copyrighted material has been used without proper credit or in violation of copyright laws, it is unintentional and we will rectify it promptly upon notification. Please note that the republishing, redistribution, or reproduction of part or all of the contents in any form is prohibited without express written permission from the author and website owner. For permissions or further inquiries, please contact us.

Leave a reply

Your email address will not be published. Required fields are marked