
Grid Searching Using Python: A Practical Guide
Grid searching is a critical technique for hyperparameter optimization in machine learning that systematically tests different parameter combinations to find the optimal model configuration. Whether you’re deploying models on a VPS or running compute-intensive searches on dedicated servers, understanding grid search implementation can dramatically improve your model performance. This guide walks through practical Python implementations, performance optimization strategies, and real-world deployment scenarios that will help you build more effective machine learning pipelines.
How Grid Search Works Under the Hood
Grid search operates by creating a multidimensional grid of hyperparameter values and evaluating model performance at each intersection point. The algorithm uses cross-validation to assess each parameter combination, ensuring robust performance estimates that generalize well to unseen data.
The computational complexity grows exponentially with the number of parameters. For example, testing 10 values each across 3 parameters requires 1,000 model training iterations. This makes grid search computationally expensive but thorough in exploring the parameter space.
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
import numpy as np
import time
# Generate sample dataset
X, y = make_classification(n_samples=1000, n_features=20, n_informative=15,
n_redundant=5, random_state=42)
# Define parameter grid
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [10, 20, None],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4]
}
# Initialize model and grid search
rf = RandomForestClassifier(random_state=42)
grid_search = GridSearchCV(rf, param_grid, cv=5, scoring='accuracy',
n_jobs=-1, verbose=1)
# Execute grid search
start_time = time.time()
grid_search.fit(X, y)
end_time = time.time()
print(f"Best parameters: {grid_search.best_params_}")
print(f"Best score: {grid_search.best_score_:.4f}")
print(f"Search completed in {end_time - start_time:.2f} seconds")
Step-by-Step Implementation Guide
Setting up an effective grid search requires careful parameter selection, proper cross-validation configuration, and performance monitoring. Here’s a comprehensive implementation that covers common scenarios:
import pandas as pd
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
class GridSearchPipeline:
def __init__(self, model, param_grid, cv=5, scoring='accuracy'):
self.model = model
self.param_grid = param_grid
self.cv = cv
self.scoring = scoring
self.grid_search = None
self.results_df = None
def prepare_data(self, X, y, test_size=0.2, scale_features=True):
"""Prepare and split dataset for grid search"""
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
X, y, test_size=test_size, random_state=42, stratify=y
)
if scale_features:
self.scaler = StandardScaler()
self.X_train = self.scaler.fit_transform(self.X_train)
self.X_test = self.scaler.transform(self.X_test)
def execute_search(self, n_jobs=-1, verbose=2):
"""Execute grid search with performance tracking"""
self.grid_search = GridSearchCV(
self.model, self.param_grid, cv=self.cv,
scoring=self.scoring, n_jobs=n_jobs, verbose=verbose,
return_train_score=True
)
print("Starting grid search...")
start_time = time.time()
self.grid_search.fit(self.X_train, self.y_train)
end_time = time.time()
print(f"Grid search completed in {end_time - start_time:.2f} seconds")
self._process_results()
def _process_results(self):
"""Process and store grid search results"""
self.results_df = pd.DataFrame(self.grid_search.cv_results_)
# Sort by mean test score
self.results_df = self.results_df.sort_values(
'mean_test_score', ascending=False
).reset_index(drop=True)
def get_best_model_performance(self):
"""Evaluate best model on test set"""
best_model = self.grid_search.best_estimator_
test_predictions = best_model.predict(self.X_test)
print("Best Parameters:", self.grid_search.best_params_)
print("Best CV Score:", self.grid_search.best_score_)
print("\nTest Set Performance:")
print(classification_report(self.y_test, test_predictions))
return test_predictions
def plot_results(self, param_name, figsize=(10, 6)):
"""Plot grid search results for specific parameter"""
if self.results_df is None:
print("No results to plot. Run execute_search() first.")
return
# Extract parameter values and scores
param_values = []
scores = []
for params, score in zip(self.grid_search.cv_results_['params'],
self.grid_search.cv_results_['mean_test_score']):
if param_name in params:
param_values.append(params[param_name])
scores.append(score)
plt.figure(figsize=figsize)
plt.scatter(param_values, scores, alpha=0.7)
plt.xlabel(param_name)
plt.ylabel('Mean CV Score')
plt.title(f'Grid Search Results: {param_name}')
plt.grid(True)
plt.show()
Using the pipeline with an SVM classifier:
# Example usage with SVM
from sklearn.datasets import load_breast_cancer
# Load dataset
data = load_breast_cancer()
X, y = data.data, data.target
# Define SVM parameter grid
svm_param_grid = {
'C': [0.1, 1, 10, 100],
'gamma': ['scale', 'auto', 0.001, 0.01, 0.1, 1],
'kernel': ['rbf', 'poly', 'sigmoid']
}
# Initialize pipeline
svm_pipeline = GridSearchPipeline(SVC(), svm_param_grid, cv=5, scoring='f1')
# Execute grid search
svm_pipeline.prepare_data(X, y, scale_features=True)
svm_pipeline.execute_search(n_jobs=-1)
# Get results
predictions = svm_pipeline.get_best_model_performance()
svm_pipeline.plot_results('C')
Real-World Examples and Use Cases
Grid search applications span across various machine learning domains. Here are practical implementations for common scenarios:
Neural Network Hyperparameter Tuning
from sklearn.neural_network import MLPClassifier
from sklearn.datasets import load_digits
from sklearn.preprocessing import MinMaxScaler
# Load digits dataset
digits = load_digits()
X, y = digits.data, digits.target
# Scale features for neural network
scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)
# Neural network parameter grid
nn_param_grid = {
'hidden_layer_sizes': [(50,), (100,), (50, 50), (100, 50), (100, 100)],
'activation': ['relu', 'tanh', 'logistic'],
'alpha': [0.0001, 0.001, 0.01, 0.1],
'learning_rate': ['constant', 'adaptive'],
'max_iter': [200, 500, 1000]
}
# Execute grid search for neural network
nn_pipeline = GridSearchPipeline(MLPClassifier(random_state=42), nn_param_grid,
cv=3, scoring='accuracy')
nn_pipeline.prepare_data(X_scaled, y, scale_features=False) # Already scaled
nn_pipeline.execute_search(n_jobs=4) # Limit jobs for neural networks
# Display top 5 configurations
print("Top 5 Neural Network Configurations:")
top_configs = nn_pipeline.results_df.head(5)[['params', 'mean_test_score', 'std_test_score']]
for idx, row in top_configs.iterrows():
print(f"{idx+1}. Score: {row['mean_test_score']:.4f} (±{row['std_test_score']:.4f})")
print(f" Params: {row['params']}\n")
Time Series Forecasting Parameter Optimization
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error
import numpy as np
def create_time_series_features(data, window_size=5):
"""Create lagged features for time series"""
X, y = [], []
for i in range(window_size, len(data)):
X.append(data[i-window_size:i])
y.append(data[i])
return np.array(X), np.array(y)
# Generate synthetic time series data
np.random.seed(42)
time_series = np.cumsum(np.random.randn(1000)) + np.sin(np.arange(1000) * 0.1)
# Create features
X_ts, y_ts = create_time_series_features(time_series, window_size=10)
# Gradient Boosting parameter grid for time series
gb_param_grid = {
'n_estimators': [50, 100, 200],
'learning_rate': [0.01, 0.1, 0.2],
'max_depth': [3, 5, 7],
'subsample': [0.8, 0.9, 1.0],
'min_samples_split': [2, 5, 10]
}
# Custom scoring for time series (using negative MAE)
def neg_mean_absolute_error(y_true, y_pred):
return -mean_absolute_error(y_true, y_pred)
# Execute time series grid search
gb_pipeline = GridSearchPipeline(GradientBoostingRegressor(random_state=42),
gb_param_grid, cv=5, scoring='neg_mean_absolute_error')
gb_pipeline.prepare_data(X_ts, y_ts, scale_features=False)
gb_pipeline.execute_search(n_jobs=-1)
# Evaluate on test set
best_model = gb_pipeline.grid_search.best_estimator_
y_pred = best_model.predict(gb_pipeline.X_test)
mse = mean_squared_error(gb_pipeline.y_test, y_pred)
mae = mean_absolute_error(gb_pipeline.y_test, y_pred)
print(f"Time Series Forecasting Results:")
print(f"Best Parameters: {gb_pipeline.grid_search.best_params_}")
print(f"Test MSE: {mse:.4f}")
print(f"Test MAE: {mae:.4f}")
Performance Comparison: Grid Search vs Alternatives
Understanding when to use grid search versus other optimization methods is crucial for efficient model development. Here’s a comprehensive comparison:
Method | Search Strategy | Computational Cost | Coverage | Best Use Case | Time Complexity |
---|---|---|---|---|---|
Grid Search | Exhaustive | High | Complete within bounds | Small parameter spaces | O(n^p) where p=parameters |
Random Search | Random sampling | Medium | Probabilistic | Large parameter spaces | O(n) where n=iterations |
Bayesian Optimization | Informed sampling | Low-Medium | Focused on promising regions | Expensive model evaluation | O(n²) to O(n³) |
Halving Grid Search | Progressive elimination | Medium | Complete with early stopping | Budget-constrained searches | O(n log n) |
Performance benchmark comparing different search strategies:
from sklearn.model_selection import RandomizedSearchCV, HalvingGridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
import time
# Generate larger dataset for meaningful benchmarking
X_bench, y_bench = make_classification(n_samples=5000, n_features=50,
n_informative=30, n_redundant=10,
random_state=42)
# Define comprehensive parameter grid
comprehensive_grid = {
'n_estimators': [10, 50, 100, 200, 300],
'max_depth': [5, 10, 15, 20, None],
'min_samples_split': [2, 5, 10, 15],
'min_samples_leaf': [1, 2, 4, 8],
'max_features': ['sqrt', 'log2', None, 0.5]
}
def benchmark_search_method(search_method, X, y, param_grid, **kwargs):
"""Benchmark a hyperparameter search method"""
start_time = time.time()
search_method.fit(X, y)
end_time = time.time()
return {
'best_score': search_method.best_score_,
'best_params': search_method.best_params_,
'time_taken': end_time - start_time,
'n_iterations': len(search_method.cv_results_['params'])
}
# Initialize different search methods
rf_base = RandomForestClassifier(random_state=42)
# Grid Search (limited parameters to avoid excessive runtime)
limited_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [10, 20, None],
'min_samples_split': [2, 10]
}
grid_cv = GridSearchCV(rf_base, limited_grid, cv=3, n_jobs=-1, scoring='accuracy')
random_cv = RandomizedSearchCV(rf_base, comprehensive_grid, n_iter=50,
cv=3, n_jobs=-1, scoring='accuracy', random_state=42)
halving_cv = HalvingGridSearchCV(rf_base, limited_grid, cv=3,
n_jobs=-1, scoring='accuracy', random_state=42)
# Benchmark all methods
methods = {
'Grid Search': grid_cv,
'Random Search': random_cv,
'Halving Grid Search': halving_cv
}
benchmark_results = {}
for name, method in methods.items():
print(f"Benchmarking {name}...")
benchmark_results[name] = benchmark_search_method(
method, X_bench, y_bench, comprehensive_grid
)
# Display results
print("\nBenchmark Results:")
print("-" * 80)
print(f"{'Method':<20} {'Best Score':<12} {'Time (s)':<10} {'Iterations':<12}")
print("-" * 80)
for method, results in benchmark_results.items():
print(f"{method:<20} {results['best_score']:<12.4f} "
f"{results['time_taken']:<10.2f} {results['n_iterations']:<12}")
Best Practices and Common Pitfalls
Effective grid search implementation requires avoiding several common mistakes and following established best practices. Here are key considerations based on real-world deployment experience:
Memory and Computational Optimization
import psutil
import gc
from sklearn.model_selection import GridSearchCV
from joblib import parallel_backend
class OptimizedGridSearch:
def __init__(self, estimator, param_grid, cv=5, scoring='accuracy'):
self.estimator = estimator
self.param_grid = param_grid
self.cv = cv
self.scoring = scoring
self.memory_usage = []
def monitor_memory(self):
"""Monitor memory usage during grid search"""
process = psutil.Process()
memory_info = process.memory_info()
memory_mb = memory_info.rss / 1024 / 1024
self.memory_usage.append(memory_mb)
return memory_mb
def calculate_search_complexity(self):
"""Calculate expected number of fits"""
total_combinations = 1
for param, values in self.param_grid.items():
total_combinations *= len(values)
total_fits = total_combinations * self.cv
print(f"Expected parameter combinations: {total_combinations}")
print(f"Total model fits required: {total_fits}")
# Estimate memory requirements
estimated_memory = total_fits * 0.1 # Rough estimate in MB
print(f"Estimated memory usage: {estimated_memory:.1f} MB")
return total_combinations, total_fits
def fit_with_monitoring(self, X, y, n_jobs=-1, backend='threading'):
"""Fit with memory and performance monitoring"""
print("Starting optimized grid search...")
self.calculate_search_complexity()
initial_memory = self.monitor_memory()
print(f"Initial memory usage: {initial_memory:.1f} MB")
# Use appropriate backend for the task
with parallel_backend(backend, n_jobs=n_jobs):
self.grid_search = GridSearchCV(
self.estimator, self.param_grid, cv=self.cv,
scoring=self.scoring, n_jobs=n_jobs, verbose=1
)
start_time = time.time()
self.grid_search.fit(X, y)
end_time = time.time()
final_memory = self.monitor_memory()
peak_memory = max(self.memory_usage)
print(f"Grid search completed in {end_time - start_time:.2f} seconds")
print(f"Peak memory usage: {peak_memory:.1f} MB")
print(f"Memory increase: {final_memory - initial_memory:.1f} MB")
# Clean up memory
gc.collect()
return self.grid_search
# Example with memory optimization
def optimize_parameter_grid(param_grid, max_combinations=1000):
"""Optimize parameter grid to stay within computational limits"""
total_combinations = 1
for param, values in param_grid.items():
total_combinations *= len(values)
if total_combinations <= max_combinations:
return param_grid, total_combinations
# Reduce grid size proportionally
reduction_factor = (max_combinations / total_combinations) ** (1 / len(param_grid))
optimized_grid = {}
for param, values in param_grid.items():
if isinstance(values, list) and len(values) > 2:
new_size = max(2, int(len(values) * reduction_factor))
# Keep first, last, and evenly spaced middle values
if new_size >= len(values):
optimized_grid[param] = values
else:
indices = np.linspace(0, len(values)-1, new_size, dtype=int)
optimized_grid[param] = [values[i] for i in indices]
else:
optimized_grid[param] = values
new_combinations = 1
for param, values in optimized_grid.items():
new_combinations *= len(values)
print(f"Reduced parameter combinations from {total_combinations} to {new_combinations}")
return optimized_grid, new_combinations
# Usage example with optimization
large_param_grid = {
'n_estimators': list(range(50, 501, 50)), # 10 values
'max_depth': list(range(5, 31, 2)), # 13 values
'min_samples_split': list(range(2, 21)), # 19 values
'min_samples_leaf': list(range(1, 11)) # 10 values
} # Total: 24,700 combinations!
# Optimize the grid
optimized_grid, combinations = optimize_parameter_grid(large_param_grid, max_combinations=500)
# Run optimized search
optimized_search = OptimizedGridSearch(
RandomForestClassifier(random_state=42),
optimized_grid, cv=3
)
# Execute with monitoring
X_opt, y_opt = make_classification(n_samples=2000, n_features=20, random_state=42)
grid_result = optimized_search.fit_with_monitoring(X_opt, y_opt, n_jobs=4, backend='threading')
Cross-Validation Strategy Selection
from sklearn.model_selection import (StratifiedKFold, TimeSeriesSplit,
GroupKFold, LeaveOneGroupOut)
from sklearn.datasets import make_classification
import numpy as np
def select_cv_strategy(X, y, data_type='standard', groups=None, time_column=None):
"""Select appropriate cross-validation strategy based on data characteristics"""
n_samples, n_features = X.shape
# Check class balance
unique_classes, class_counts = np.unique(y, return_counts=True)
min_class_count = min(class_counts)
class_balance_ratio = min_class_count / max(class_counts)
print(f"Dataset characteristics:")
print(f"Samples: {n_samples}, Features: {n_features}")
print(f"Classes: {len(unique_classes)}, Min class count: {min_class_count}")
print(f"Class balance ratio: {class_balance_ratio:.3f}")
# Recommend CV strategy
if data_type == 'time_series':
cv_folds = min(5, n_samples // 100) # Conservative for time series
cv_strategy = TimeSeriesSplit(n_splits=cv_folds)
strategy_name = f"TimeSeriesSplit (n_splits={cv_folds})"
elif groups is not None:
unique_groups = len(np.unique(groups))
if unique_groups > 10:
cv_strategy = GroupKFold(n_splits=min(5, unique_groups // 2))
strategy_name = f"GroupKFold (n_splits={min(5, unique_groups // 2)})"
else:
cv_strategy = LeaveOneGroupOut()
strategy_name = "LeaveOneGroupOut"
elif class_balance_ratio < 0.1 or min_class_count < 10:
# Imbalanced dataset
cv_folds = min(3, min_class_count) # Ensure each fold has minority class
cv_strategy = StratifiedKFold(n_splits=cv_folds, shuffle=True, random_state=42)
strategy_name = f"StratifiedKFold (n_splits={cv_folds}) - Imbalanced data"
else:
# Standard case
cv_folds = 5 if n_samples > 1000 else 3
cv_strategy = StratifiedKFold(n_splits=cv_folds, shuffle=True, random_state=42)
strategy_name = f"StratifiedKFold (n_splits={cv_folds})"
print(f"Recommended CV strategy: {strategy_name}")
return cv_strategy, strategy_name
# Example usage with different data types
def demonstrate_cv_selection():
"""Demonstrate CV strategy selection for different scenarios"""
scenarios = {
'Balanced Dataset': make_classification(n_samples=1000, n_classes=3,
n_informative=10, weights=None,
random_state=42),
'Imbalanced Dataset': make_classification(n_samples=1000, n_classes=3,
n_informative=10, weights=[0.8, 0.15, 0.05],
random_state=42),
'Small Dataset': make_classification(n_samples=100, n_classes=2,
n_informative=5, random_state=42)
}
for scenario_name, (X, y) in scenarios.items():
print(f"\n{scenario_name}:")
print("-" * 50)
cv_strategy, strategy_name = select_cv_strategy(X, y)
# Test the strategy with a simple grid search
param_grid = {'n_estimators': [50, 100], 'max_depth': [5, 10]}
grid_search = GridSearchCV(
RandomForestClassifier(random_state=42), param_grid,
cv=cv_strategy, scoring='f1_weighted', n_jobs=-1
)
start_time = time.time()
grid_search.fit(X, y)
end_time = time.time()
print(f"Best score with {strategy_name}: {grid_search.best_score_:.4f}")
print(f"Time taken: {end_time - start_time:.2f} seconds")
demonstrate_cv_selection()
Common Pitfalls and Solutions
- Data Leakage: Always perform preprocessing inside cross-validation folds or use Pipeline objects to prevent information leakage from test sets.
- Overfitting to Validation Set: Use nested cross-validation for unbiased performance estimation when doing extensive hyperparameter tuning.
- Ignoring Computational Constraints: Calculate expected runtime before starting extensive searches, especially on VPS instances with limited resources.
- Inappropriate Scoring Metrics: Choose scoring metrics that align with business objectives and handle class imbalance appropriately.
- Static Parameter Ranges: Start with wide ranges and iteratively narrow down based on initial results for more efficient searches.
# Example of nested CV for unbiased evaluation
from sklearn.model_selection import cross_val_score
def nested_cross_validation(X, y, estimator, param_grid, outer_cv=5, inner_cv=3):
"""Perform nested cross-validation for unbiased performance estimation"""
outer_scores = []
best_params_list = []
# Create outer CV splits
outer_cv_splitter = StratifiedKFold(n_splits=outer_cv, shuffle=True, random_state=42)
for fold, (train_idx, test_idx) in enumerate(outer_cv_splitter.split(X, y)):
print(f"Processing outer fold {fold + 1}/{outer_cv}")
X_train_outer, X_test_outer = X[train_idx], X[test_idx]
y_train_outer, y_test_outer = y[train_idx], y[test_idx]
# Inner CV for hyperparameter tuning
inner_cv_splitter = StratifiedKFold(n_splits=inner_cv, shuffle=True, random_state=42)
grid_search = GridSearchCV(
estimator, param_grid, cv=inner_cv_splitter,
scoring='f1_weighted', n_jobs=-1
)
# Fit on outer training set
grid_search.fit(X_train_outer, y_train_outer)
# Evaluate best model on outer test set
best_model = grid_search.best_estimator_
outer_score = best_model.score(X_test_outer, y_test_outer)
outer_scores.append(outer_score)
best_params_list.append(grid_search.best_params_)
print(f"Fold {fold + 1} score: {outer_score:.4f}")
print(f"Best params: {grid_search.best_params_}")
# Calculate final statistics
mean_score = np.mean(outer_scores)
std_score = np.std(outer_scores)
print(f"\nNested CV Results:")
print(f"Mean score: {mean_score:.4f} ± {std_score:.4f}")
print(f"Score range: [{min(outer_scores):.4f}, {max(outer_scores):.4f}]")
return outer_scores, best_params_list
# Example usage
X_nested, y_nested = make_classification(n_samples=1000, n_features=20,
n_informative=15, random_state=42)
param_grid_nested = {
'n_estimators': [50, 100, 200],
'max_depth': [5, 10, 15],
'min_samples_split': [2, 5, 10]
}
nested_scores, nested_params = nested_cross_validation(
X_nested, y_nested, RandomForestClassifier(random_state=42),
param_grid_nested, outer_cv=5, inner_cv=3
)
Advanced Grid Search Techniques
For production environments and complex scenarios, several advanced techniques can significantly improve grid search efficiency and effectiveness:
Parallel Processing and Distributed Computing
from joblib import Parallel, delayed
from sklearn.base import clone
import multiprocessing as mp
class DistributedGridSearch:
def __init__(self, estimator, param_grid, cv=5, scoring='accuracy'):
self.estimator = estimator
self.param_grid = param_grid
self.cv = cv
self.scoring = scoring
self.results = []
def generate_param_combinations(self):
"""Generate all parameter combinations"""
from itertools import product
param_names = list(self.param_grid.keys())
param_values = list(self.param_grid.values())
combinations = []
for values in product(*param_values):
param_dict = dict(zip(param_names, values))
combinations.append(param_dict)
return combinations
def evaluate_single_combination(self, params, X, y):
"""Evaluate a single parameter combination"""
try:
# Clone estimator and set parameters
estimator = clone(self.estimator)
estimator.set_params(**params)
# Perform cross-validation
cv_scores = cross_val_score(estimator, X, y, cv=self.cv,
scoring=self.scoring, n_jobs=1)
result = {
'params': params,
'mean_score': np.mean(cv_scores),
'std_score': np.std(cv_scores),
'cv_scores': cv_scores.tolist()
}
return result
except Exception as e:
return {
'params': params,
'mean_score': -np.inf,
'std_score': np.inf,
'error': str(e)
}
def fit_parallel(self, X, y, n_jobs=-1, batch_size=None):
"""Fit using parallel processing with batching"""
param_combinations = self.generate_param_combinations()
if n_jobs == -1:
n_jobs = mp.cpu_count()
if batch_size is None:
batch_size = max(1, len(param_combinations) // (n_jobs * 4))
print(f"Processing {len(param_combinations)} combinations using {n_jobs} workers")
print(f"Batch size: {batch_size}")
# Process in batches to manage memory
all_results = []
for i in range(0, len(param_combinations), batch_size * n_jobs):
batch_combinations = param_combinations[i:i + batch_size * n_jobs]
print(f"Processing batch {i//batch_size//n_jobs + 1}")
# Parallel processing for current batch
batch_results = Parallel(n_jobs=n_jobs, verbose=1)(
delayed(self.evaluate_single_combination)(params, X, y)
for params in batch_combinations
)
all_results.extend(batch_results)
# Optional: Force garbage collection between batches
gc.collect()
# Sort results by mean score
self.results = sorted(all_results, key=lambda x: x['mean_score'], reverse=True)
# Find best result
self.best_params_ = self.results[0]['params']
self.best_score_ = self.results[0]['mean_score']
return self
def get_top_results(self, n=10):
"""Get top n results"""
return self.results[:n]
# Example usage with distributed processing
distributed_param_grid = {
'n_estimators': [50, 100, 150, 200, 250],
'max_depth': [5, 10, 15, 20, None],
'min_samples_split': [2, 5, 10, 15],
'min_samples_leaf': [1, 2, 4, 6],
'max_features': ['sqrt', 'log2', None]
}
# Generate test data
X_dist, y_dist = make_classification(n_samples=2000, n_features=30,
n_informative=20, random_state=42)
# Run distributed grid search
distributed_search = DistributedGridSearch(
RandomForestClassifier(random_state=42),
distributed_param_grid, cv=3, scoring='f1_weighted'
)
start_time = time.time()
distributed_search.fit_parallel(X_dist, y_dist, n_jobs=4, batch_size=50)
end_time = time.time()
print(f"\nDistributed grid search completed in {end_time - start_time:.2f} seconds")
print(f"Best parameters: {distributed_search.best_params_}")
print(f"Best score: {distributed_search.best_score_:.4f}")
# Display top 5 results
print(f"\nTop 5 Results:")
for i, result in enumerate(distributed_search.get_top_results(5)):
print(f"{i+1}. Score: {result['mean_score']:.4f} ± {result['std_score']:.4f}")
print(f" Params: {result['params']}")
Integration with MLOps and Production Systems
For deployment on dedicated servers or cloud infrastructure, grid search needs integration with monitoring, logging, and model management systems:
import json
import logging
from datetime import datetime
import pickle
import os
class ProductionGridSearch:
def __init__(self, estimator, param_grid, cv=5, scoring='accuracy',
experiment_name=None, model_registry_path='./models'):
self.estimator = estimator
self.param_grid = param_grid
self.cv = cv
self.scoring = scoring
self.experiment_name = experiment_name or f"grid_search_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
self.model_registry_path = model_registry_path
# Setup logging
self.setup_logging()
# Create model registry directory
os.makedirs(model_registry_path, exist_ok=True)
def setup_logging(self):
"""Setup comprehensive logging"""
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=log_format)
# Create file handler
log_file = f"grid_search_{self.experiment_name}.log"
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(logging.Formatter(log_format))
self.logger = logging.getLogger(f"GridSearch_{self.experiment_name}")
self.logger.addHandler(file_handler)
def log_system_info(self):
"""Log system information"""
import platform
import psutil
system_info = {
'platform': platform.platform(),
'processor': platform.processor(),
'cpu_count': psutil.cpu_count(),
'memory_total_gb': psutil.virtual_memory().total / (1024**3),
'python_version': platform.python_version()
}
self.logger.info(f"System Info: {json.dumps(system_info, indent=2)}")
return system_info
def save_experiment_metadata(self, results, system_info, execution_time):
"""Save experiment metadata"""
metadata = {
'experiment_name': self.experiment_name,
'timestamp': datetime.now().isoformat(),
'estimator': str(self.estimator),
'param_grid': self.param_grid,
'cv_folds': self.cv,
'scoring': self.scoring,
'execution_time_seconds': execution_time,
'system_info': system_info,
'best_params': results.best_params_,
'best_score': results.best_score_,
'total_combinations': len(results.cv_results_['params'])
}
metadata_file = os.path.join(self.model_registry_path,
f"{self.experiment_name}_metadata.json")
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2, default=str)
self.logger.info(f"Experiment metadata saved to {metadata_file}")
return metadata_file
def save_best_model(self, grid_search_results, X_train, y_train):
"""Save the best model with training data hash for verification"""
import hashlib
# Create data hash for integrity checking
data_hash = hashlib.md5(
str(X_train.shape).encode() +
str(y_train.shape).encode()
).hexdigest()[:8]
# Train best model on full training set
best_model = grid_search_results.best_estimator_
best_model.fit(X_train, y_train)
# Save model
model_filename = f"{self.experiment_name}_best_model_{data_hash}.pkl"
model_path = os.path.join(self.model_registry_path, model_filename)
with open(model_path, 'wb') as f:
pickle.dump({
'model': best_model,
'best_params': grid_search_results.best_params_,
'best_score': grid_search_results.best_score_,
'data_hash': data_hash,
'training_timestamp': datetime.now().isoformat()
}, f)
self.logger.info(f"Best model saved to {model_path}")
return model_path
def fit_with_monitoring(self, X, y, n_jobs=-1):
"""Fit with comprehensive monitoring and logging"""
self.logger.info(f"Starting grid search experiment: {self.experiment_name}")
# Log system information
system_info = self.log_system_info()
# Log dataset information
dataset_info = {
'n_samples': X.shape[0],
'n_features': X.shape[1],
'target_classes': len(np.unique(y)) if hasattr(y, '__len__') else 'continuous'
}
self.logger.info(f"Dataset Info: {json.dumps(dataset_info)}")
# Calculate expected runtime
total_combinations = 1
for param, values in self.param_grid.items():
total_combinations *= len(values)
expected_fits = total_combinations * self.cv
self.logger.info(f"Expected combinations: {total_combinations}")
self.logger.info(f"Expected total fits: {expected_fits}")
# Start grid search
start_time = time.time()
self.logger.info("Grid search started")
try:
grid_search = GridSearchCV(
self.estimator, self.param_grid, cv=self.cv,
scoring=self.scoring, n_jobs=n_jobs, verbose=1
)
grid_search.fit(X, y)
end_time = time.time()
execution_time = end_time - start_time
self.logger.info(f"Grid search completed successfully in {execution_time:.2f} seconds")
self.logger.info(f"Best score: {grid_search.best_score_:.4f}")
self.logger.info(f"Best parameters: {grid_search.best_params_}")
# Save results
metadata_file = self.save_experiment_metadata(grid_search, system_info, execution_time)
model_path = self.save_best_model(grid_search, X, y)
# Log completion
self.logger.info(f"Experiment completed. Model: {model_path}, Metadata: {metadata_file}")
return grid_search
except Exception as e:
self.logger.error(f"Grid search failed: {str(e)}")
raise
# Example production usage
production_param_grid = {
'n_estimators': [100, 200, 300],
'max_depth': [10, 15, 20],
'min_samples_split': [5, 10],
'min_samples_leaf': [2, 4]
}
# Initialize production grid search
production_search = ProductionGridSearch(
RandomForestClassifier(random_state=42),
production_param_grid,
cv=5,
scoring='f1_weighted',
experiment_name='rf_optimization_v1'
)
# Execute with full monitoring
X_prod, y_prod = make_classification(n_samples=3000, n_features=25,
n_informative=20, random_state=42)
final_results = production_search.fit_with_monitoring(X_prod, y_prod, n_jobs=-1)
print(f"Production grid search completed!")
print(f"Check logs and model registry in './models' directory")
Grid search remains a fundamental technique for hyperparameter optimization, especially when computational resources allow exhaustive search within reasonable parameter bounds. The key to successful implementation lies in understanding your data characteristics, choosing appropriate cross-validation strategies, and implementing proper monitoring and resource management. For production deployments on VPS or dedicated servers, combining grid search with MLOps practices ensures reproducible, monitored, and scalable machine learning workflows.
Additional resources for grid search optimization include the official scikit-learn Grid Search documentation and the Joblib parallel processing guide for advanced parallelization strategies.

This article incorporates information and material from various online sources. We acknowledge and appreciate the work of all original authors, publishers, and websites. While every effort has been made to appropriately credit the source material, any unintentional oversight or omission does not constitute a copyright infringement. All trademarks, logos, and images mentioned are the property of their respective owners. If you believe that any content used in this article infringes upon your copyright, please contact us immediately for review and prompt action.
This article is intended for informational and educational purposes only and does not infringe on the rights of the copyright owners. If any copyrighted material has been used without proper credit or in violation of copyright laws, it is unintentional and we will rectify it promptly upon notification. Please note that the republishing, redistribution, or reproduction of part or all of the contents in any form is prohibited without express written permission from the author and website owner. For permissions or further inquiries, please contact us.