
K Nearest Neighbors (KNN) Algorithm in Python
The K Nearest Neighbors (KNN) algorithm is a fundamental machine learning technique that’s surprisingly simple yet powerful for classification and regression tasks. Unlike complex algorithms that require extensive training phases, KNN makes predictions by finding the ‘K’ most similar data points to a query and using their values to make decisions. This lazy learning approach makes it perfect for developers who need quick implementation without deep mathematical backgrounds, and it’s particularly valuable when you’re working with datasets where local patterns matter more than global trends. In this guide, you’ll learn how to implement KNN from scratch in Python, optimize its performance for production environments, and avoid the common pitfalls that can tank your application’s response times.
How KNN Works Under the Hood
KNN operates on a deceptively simple principle: similar things exist in close proximity. When you need to classify a new data point, the algorithm calculates distances between this point and all existing points in your training dataset, then picks the K closest neighbors to make a prediction.
The distance calculation typically uses Euclidean distance, but you can also use Manhattan, Minkowski, or even custom distance metrics depending on your data type. For classification tasks, KNN performs a majority vote among the K neighbors, while regression tasks average the target values of those neighbors.
Here’s the basic workflow:
- Load and normalize your training data
- Choose an appropriate value for K
- For each prediction, calculate distances to all training points
- Sort distances and select K nearest neighbors
- Aggregate neighbor labels/values for final prediction
The algorithm’s simplicity is both its strength and weakness. While it requires no training time and adapts well to new data, it can become computationally expensive with large datasets since every prediction requires distance calculations against the entire training set.
Step-by-Step KNN Implementation in Python
Let’s build a KNN classifier from scratch before diving into scikit-learn’s optimized version. This approach helps you understand the mechanics and gives you flexibility for custom distance metrics or specialized use cases.
Basic KNN Implementation
import numpy as np
from collections import Counter
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
class KNNClassifier:
def __init__(self, k=3):
self.k = k
def fit(self, X, y):
"""Store training data - KNN is lazy learning"""
self.X_train = X
self.y_train = y
def euclidean_distance(self, x1, x2):
"""Calculate Euclidean distance between two points"""
return np.sqrt(np.sum((x1 - x2) ** 2))
def predict(self, X):
"""Make predictions for test data"""
predictions = []
for x in X:
# Calculate distances to all training points
distances = [self.euclidean_distance(x, x_train)
for x_train in self.X_train]
# Get indices of k nearest neighbors
k_indices = np.argsort(distances)[:self.k]
# Get labels of k nearest neighbors
k_nearest_labels = [self.y_train[i] for i in k_indices]
# Perform majority vote
most_common = Counter(k_nearest_labels).most_common(1)
predictions.append(most_common[0][0])
return np.array(predictions)
# Generate sample dataset
X, y = make_classification(n_samples=1000, n_features=2, n_redundant=0,
n_informative=2, random_state=42, n_clusters_per_class=1)
# Split the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train and test our KNN
knn = KNNClassifier(k=5)
knn.fit(X_train, y_train)
predictions = knn.predict(X_test)
# Calculate accuracy
accuracy = np.mean(predictions == y_test)
print(f"Custom KNN Accuracy: {accuracy:.3f}")
Using Scikit-learn’s Optimized KNN
For production applications, scikit-learn’s implementation offers significant performance improvements through optimized data structures and algorithms:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
# Normalize features for better distance calculations
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Create and train KNN classifier
knn_sklearn = KNeighborsClassifier(
n_neighbors=5,
weights='uniform', # or 'distance' for weighted voting
algorithm='auto', # lets sklearn choose best algorithm
metric='euclidean'
)
knn_sklearn.fit(X_train_scaled, y_train)
sk_predictions = knn_sklearn.predict(X_test_scaled)
# Evaluate performance
print("Scikit-learn KNN Results:")
print(classification_report(y_test, sk_predictions))
# Visualize confusion matrix
cm = confusion_matrix(y_test, sk_predictions)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('KNN Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()
Optimizing K Value and Performance Tuning
Choosing the right K value is crucial for KNN performance. Too small and you’ll get noisy predictions; too large and you’ll lose local patterns. Here’s how to find the optimal K systematically:
from sklearn.model_selection import cross_val_score
import time
def find_optimal_k(X_train, y_train, max_k=20):
"""Find optimal K value using cross-validation"""
k_values = range(1, max_k + 1)
cv_scores = []
training_times = []
for k in k_values:
start_time = time.time()
knn = KNeighborsClassifier(n_neighbors=k)
# Use 5-fold cross-validation
scores = cross_val_score(knn, X_train, y_train, cv=5, scoring='accuracy')
cv_scores.append(scores.mean())
training_times.append(time.time() - start_time)
print(f"K={k}: CV Score = {scores.mean():.3f} (+/- {scores.std()*2:.3f})")
# Find best K
best_k = k_values[np.argmax(cv_scores)]
# Plot results
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(k_values, cv_scores, 'bo-')
plt.axvline(x=best_k, color='red', linestyle='--', label=f'Best K={best_k}')
plt.xlabel('K Value')
plt.ylabel('Cross-Validation Accuracy')
plt.title('K Value vs Accuracy')
plt.legend()
plt.grid(True)
plt.subplot(1, 2, 2)
plt.plot(k_values, training_times, 'ro-')
plt.xlabel('K Value')
plt.ylabel('Training Time (seconds)')
plt.title('K Value vs Training Time')
plt.grid(True)
plt.tight_layout()
plt.show()
return best_k, max(cv_scores)
# Find optimal K
optimal_k, best_score = find_optimal_k(X_train_scaled, y_train)
print(f"\nOptimal K: {optimal_k} with CV score: {best_score:.3f}")
Real-World Use Cases and Examples
KNN shines in several practical scenarios where you need quick implementation and interpretable results. Here are some production-ready examples:
Recommendation System
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
class MovieRecommender:
def __init__(self, k=5):
self.k = k
self.movies_df = None
self.user_movie_matrix = None
def fit(self, ratings_data):
"""ratings_data should have columns: user_id, movie_id, rating"""
self.movies_df = ratings_data
# Create user-movie matrix
self.user_movie_matrix = ratings_data.pivot_table(
index='user_id',
columns='movie_id',
values='rating'
).fillna(0)
def recommend_movies(self, user_id, n_recommendations=5):
"""Recommend movies for a specific user"""
if user_id not in self.user_movie_matrix.index:
return "User not found"
# Get user's ratings
user_ratings = self.user_movie_matrix.loc[user_id].values.reshape(1, -1)
# Calculate similarity with all users
similarities = cosine_similarity(user_ratings, self.user_movie_matrix.values)[0]
# Get K most similar users
similar_users_indices = similarities.argsort()[-self.k-1:-1][::-1]
# Get movies liked by similar users but not watched by target user
user_watched = set(self.user_movie_matrix.columns[
self.user_movie_matrix.loc[user_id] > 0
])
recommendations = {}
for idx in similar_users_indices:
similar_user_id = self.user_movie_matrix.index[idx]
similar_user_movies = self.user_movie_matrix.loc[similar_user_id]
for movie_id, rating in similar_user_movies.items():
if rating > 3.5 and movie_id not in user_watched:
if movie_id not in recommendations:
recommendations[movie_id] = []
recommendations[movie_id].append(rating * similarities[idx])
# Average scores and sort
final_recommendations = {
movie: np.mean(scores)
for movie, scores in recommendations.items()
}
return sorted(final_recommendations.items(),
key=lambda x: x[1], reverse=True)[:n_recommendations]
# Example usage with sample data
sample_ratings = pd.DataFrame({
'user_id': [1, 1, 1, 2, 2, 2, 3, 3, 3] * 10,
'movie_id': list(range(90)),
'rating': np.random.uniform(1, 5, 90)
})
recommender = MovieRecommender(k=3)
recommender.fit(sample_ratings)
recommendations = recommender.recommend_movies(user_id=1)
print("Recommended movies:", recommendations)
Anomaly Detection for Server Monitoring
This example is particularly relevant for VPS and dedicated server administrators who need to monitor system health:
from sklearn.neighbors import LocalOutlierFactor
import numpy as np
import pandas as pd
class ServerAnomalyDetector:
def __init__(self, n_neighbors=20, contamination=0.1):
self.detector = LocalOutlierFactor(
n_neighbors=n_neighbors,
contamination=contamination
)
self.feature_names = None
def prepare_features(self, server_metrics):
"""Prepare server metrics for anomaly detection"""
features = []
self.feature_names = ['cpu_usage', 'memory_usage', 'disk_io',
'network_io', 'connection_count']
for metric in server_metrics:
features.append([
metric['cpu_usage'],
metric['memory_usage'],
metric['disk_io'],
metric['network_io'],
metric['connection_count']
])
return np.array(features)
def detect_anomalies(self, server_metrics):
"""Detect anomalous server behavior"""
X = self.prepare_features(server_metrics)
# Fit and predict (-1 for anomalies, 1 for normal)
anomaly_labels = self.detector.fit_predict(X)
# Get anomaly scores (more negative = more anomalous)
anomaly_scores = self.detector.negative_outlier_factor_
results = []
for i, (label, score) in enumerate(zip(anomaly_labels, anomaly_scores)):
if label == -1: # Anomaly detected
results.append({
'timestamp': server_metrics[i].get('timestamp', i),
'anomaly_score': score,
'metrics': server_metrics[i],
'severity': 'HIGH' if score < -2 else 'MEDIUM'
})
return results
# Example usage with simulated server data
def generate_server_metrics(n_samples=1000):
"""Generate realistic server metrics with some anomalies"""
np.random.seed(42)
# Normal server behavior
metrics = []
for i in range(n_samples):
if i % 100 == 0 and i > 0: # Inject anomalies every 100 samples
# Simulate server spike
metric = {
'timestamp': i,
'cpu_usage': np.random.uniform(85, 98),
'memory_usage': np.random.uniform(90, 95),
'disk_io': np.random.uniform(80, 100),
'network_io': np.random.uniform(70, 90),
'connection_count': np.random.uniform(800, 1000)
}
else:
# Normal behavior
metric = {
'timestamp': i,
'cpu_usage': np.random.uniform(20, 60),
'memory_usage': np.random.uniform(30, 70),
'disk_io': np.random.uniform(10, 40),
'network_io': np.random.uniform(15, 35),
'connection_count': np.random.uniform(50, 200)
}
metrics.append(metric)
return metrics
# Test the anomaly detector
server_data = generate_server_metrics(500)
detector = ServerAnomalyDetector(n_neighbors=15, contamination=0.05)
anomalies = detector.detect_anomalies(server_data)
print(f"Detected {len(anomalies)} anomalies:")
for anomaly in anomalies[:5]: # Show first 5
print(f"Timestamp: {anomaly['timestamp']}, "
f"Severity: {anomaly['severity']}, "
f"Score: {anomaly['anomaly_score']:.3f}")
Algorithm Comparison and Performance Analysis
Understanding when to use KNN versus other algorithms helps you make informed decisions. Here’s a comprehensive comparison:
Algorithm | Training Time | Prediction Time | Memory Usage | Interpretability | Best Use Cases |
---|---|---|---|---|---|
KNN | O(1) | O(n*d) | High | High | Small datasets, irregular decision boundaries |
Decision Tree | O(n*log(n)*d) | O(log(n)) | Low | High | Categorical features, rule-based decisions |
Random Forest | O(n*log(n)*d*k) | O(log(n)*k) | Medium | Medium | Mixed data types, robust predictions |
SVM | O(n²) to O(n³) | O(s*d) | Medium | Low | High-dimensional data, clear margins |
Naive Bayes | O(n*d) | O(d) | Low | Medium | Text classification, independent features |
Let’s benchmark these algorithms on a real dataset:
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from sklearn.datasets import load_digits
import time
def benchmark_algorithms(X_train, X_test, y_train, y_test):
"""Compare multiple algorithms on the same dataset"""
algorithms = {
'KNN': KNeighborsClassifier(n_neighbors=5),
'Decision Tree': DecisionTreeClassifier(random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'SVM': SVC(random_state=42),
'Naive Bayes': GaussianNB()
}
results = []
for name, algo in algorithms.items():
# Training time
start_time = time.time()
algo.fit(X_train, y_train)
train_time = time.time() - start_time
# Prediction time
start_time = time.time()
predictions = algo.predict(X_test)
predict_time = time.time() - start_time
# Accuracy
accuracy = np.mean(predictions == y_test)
results.append({
'Algorithm': name,
'Training Time (s)': f"{train_time:.4f}",
'Prediction Time (s)': f"{predict_time:.4f}",
'Accuracy': f"{accuracy:.3f}"
})
return pd.DataFrame(results)
# Load a more complex dataset
digits = load_digits()
X_train, X_test, y_train, y_test = train_test_split(
digits.data, digits.target, test_size=0.2, random_state=42
)
# Run benchmark
benchmark_results = benchmark_algorithms(X_train, X_test, y_train, y_test)
print("Algorithm Benchmark Results:")
print(benchmark_results.to_string(index=False))
Best Practices and Common Pitfalls
After implementing KNN in production environments, here are the critical practices that separate reliable systems from problematic ones:
Data Preprocessing and Feature Scaling
This is where most KNN implementations fail. Different feature scales can completely dominate distance calculations:
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
class RobustKNNPipeline:
def __init__(self, k=5):
self.k = k
self.pipeline = None
def create_preprocessing_pipeline(self, numeric_features, categorical_features=None):
"""Create robust preprocessing pipeline"""
preprocessors = []
# Handle numeric features
if numeric_features:
# Use RobustScaler for outlier-resistant scaling
numeric_transformer = Pipeline([
('scaler', RobustScaler()),
# Could add feature selection here
])
preprocessors.append(('num', numeric_transformer, numeric_features))
# Handle categorical features if present
if categorical_features:
from sklearn.preprocessing import OneHotEncoder
categorical_transformer = Pipeline([
('onehot', OneHotEncoder(drop='first', sparse=False))
])
preprocessors.append(('cat', categorical_transformer, categorical_features))
return ColumnTransformer(preprocessors)
def build_pipeline(self, numeric_features, categorical_features=None):
"""Build complete ML pipeline"""
preprocessor = self.create_preprocessing_pipeline(
numeric_features, categorical_features
)
self.pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', KNeighborsClassifier(
n_neighbors=self.k,
weights='distance', # Weight by distance
algorithm='ball_tree', # Better for high dimensions
metric='euclidean'
))
])
return self.pipeline
def fit(self, X, y, numeric_features, categorical_features=None):
"""Fit the complete pipeline"""
if self.pipeline is None:
self.build_pipeline(numeric_features, categorical_features)
self.pipeline.fit(X, y)
return self
def predict(self, X):
"""Make predictions"""
return self.pipeline.predict(X)
def predict_proba(self, X):
"""Get prediction probabilities"""
return self.pipeline.predict_proba(X)
# Example with mixed data types
sample_data = pd.DataFrame({
'age': np.random.randint(18, 80, 1000),
'income': np.random.normal(50000, 20000, 1000),
'score': np.random.uniform(0, 100, 1000),
'category': np.random.choice(['A', 'B', 'C'], 1000),
'target': np.random.choice([0, 1], 1000)
})
# Identify feature types
numeric_features = ['age', 'income', 'score']
categorical_features = ['category']
# Use robust pipeline
robust_knn = RobustKNNPipeline(k=7)
X = sample_data[numeric_features + categorical_features]
y = sample_data['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
robust_knn.fit(X_train, y_train, numeric_features, categorical_features)
predictions = robust_knn.predict(X_test)
probabilities = robust_knn.predict_proba(X_test)
print(f"Robust KNN Accuracy: {np.mean(predictions == y_test):.3f}")
Memory and Performance Optimization
For large datasets, naive KNN becomes unusable. Here’s how to optimize for production:
from sklearn.neighbors import NearestNeighbors
from sklearn.decomposition import PCA
import joblib
from memory_profiler import profile
class OptimizedKNN:
def __init__(self, k=5, use_pca=True, n_components=10):
self.k = k
self.use_pca = use_pca
self.n_components = n_components
self.pca = None
self.nn_model = None
self.y_train = None
def fit(self, X, y):
"""Optimized fitting with dimensionality reduction"""
X_processed = X.copy()
# Apply PCA for high-dimensional data
if self.use_pca and X.shape[1] > self.n_components:
self.pca = PCA(n_components=self.n_components)
X_processed = self.pca.fit_transform(X)
print(f"Reduced dimensions from {X.shape[1]} to {X_processed.shape[1]}")
# Use optimized nearest neighbors search
self.nn_model = NearestNeighbors(
n_neighbors=self.k,
algorithm='ball_tree', # or 'kd_tree' for low dimensions
metric='euclidean',
n_jobs=-1 # Use all CPU cores
)
self.nn_model.fit(X_processed)
self.y_train = y
return self
def predict(self, X):
"""Fast prediction with optimized search"""
X_processed = X.copy()
if self.pca is not None:
X_processed = self.pca.transform(X)
# Find nearest neighbors
distances, indices = self.nn_model.kneighbors(X_processed)
predictions = []
for neighbor_indices in indices:
# Get labels of neighbors
neighbor_labels = self.y_train[neighbor_indices]
# Majority vote
prediction = Counter(neighbor_labels).most_common(1)[0][0]
predictions.append(prediction)
return np.array(predictions)
def save_model(self, filepath):
"""Save trained model"""
model_data = {
'nn_model': self.nn_model,
'pca': self.pca,
'y_train': self.y_train,
'k': self.k
}
joblib.dump(model_data, filepath)
def load_model(self, filepath):
"""Load trained model"""
model_data = joblib.load(filepath)
self.nn_model = model_data['nn_model']
self.pca = model_data['pca']
self.y_train = model_data['y_train']
self.k = model_data['k']
return self
# Performance comparison
def compare_knn_implementations(X_train, X_test, y_train, y_test):
"""Compare standard vs optimized KNN"""
print("Testing Standard KNN...")
start_time = time.time()
standard_knn = KNeighborsClassifier(n_neighbors=5)
standard_knn.fit(X_train, y_train)
standard_pred = standard_knn.predict(X_test)
standard_time = time.time() - start_time
standard_acc = np.mean(standard_pred == y_test)
print("Testing Optimized KNN...")
start_time = time.time()
optimized_knn = OptimizedKNN(k=5, use_pca=True, n_components=10)
optimized_knn.fit(X_train, y_train)
optimized_pred = optimized_knn.predict(X_test)
optimized_time = time.time() - start_time
optimized_acc = np.mean(optimized_pred == y_test)
print(f"\nResults:")
print(f"Standard KNN - Time: {standard_time:.3f}s, Accuracy: {standard_acc:.3f}")
print(f"Optimized KNN - Time: {optimized_time:.3f}s, Accuracy: {optimized_acc:.3f}")
print(f"Speedup: {standard_time/optimized_time:.2f}x")
# Test with larger dataset
from sklearn.datasets import make_classification
X_large, y_large = make_classification(
n_samples=5000, n_features=50, n_informative=30,
n_redundant=10, random_state=42
)
X_train_large, X_test_large, y_train_large, y_test_large = train_test_split(
X_large, y_large, test_size=0.2, random_state=42
)
compare_knn_implementations(X_train_large, X_test_large, y_train_large, y_test_large)
Common Pitfalls and Debugging
Here are the most frequent issues you’ll encounter and how to solve them:
- Curse of Dimensionality: In high dimensions, all points become equidistant. Use PCA or feature selection to reduce dimensions below 20-30 features when possible.
- Imbalanced Classes: KNN can be biased toward majority classes. Use stratified sampling or adjust class weights in your evaluation.
- Choosing Odd vs Even K: Use odd K values for binary classification to avoid ties, but this matters less with weighted voting.
- Memory Issues: KNN stores all training data. For datasets >100MB, consider approximate nearest neighbor libraries like Annoy or Faiss.
- Distance Metric Selection: Euclidean works for continuous features, but consider Manhattan for high dimensions or Hamming for categorical data.
def debug_knn_performance(X_train, X_test, y_train, y_test):
"""Comprehensive KNN debugging function"""
print("=== KNN Performance Debug Report ===\n")
# Check data characteristics
print(f"Training set shape: {X_train.shape}")
print(f"Test set shape: {X_test.shape}")
print(f"Feature ranges:")
feature_stats = pd.DataFrame({
'min': X_train.min(axis=0),
'max': X_train.max(axis=0),
'std': X_train.std(axis=0)
})
print(feature_stats.head())
# Check for scaling issues
range_ratios = (X_train.max(axis=0) - X_train.min(axis=0))
max_ratio = range_ratios.max() / range_ratios.min()
print(f"\nMax/Min feature range ratio: {max_ratio:.2f}")
if max_ratio > 100:
print("⚠️ WARNING: Large feature scale differences detected!")
print(" Consider using StandardScaler or RobustScaler")
# Check class balance
class_counts = Counter(y_train)
print(f"\nClass distribution: {dict(class_counts)}")
imbalance_ratio = max(class_counts.values()) / min(class_counts.values())
if imbalance_ratio > 3:
print(f"⚠️ WARNING: Class imbalance detected (ratio: {imbalance_ratio:.2f})")
print(" Consider stratified sampling or weighted KNN")
# Test different K values
print(f"\nTesting different K values:")
k_values = [1, 3, 5, 7, 9, 15]
for k in k_values:
knn = KNeighborsClassifier(n_neighbors=k)
knn.fit(X_train, y_train)
pred = knn.predict(X_test)
acc = np.mean(pred == y_test)
print(f"K={k}: Accuracy = {acc:.3f}")
# Memory usage estimation
memory_mb = (X_train.nbytes + y_train.nbytes) / (1024 * 1024)
print(f"\nTraining data memory usage: {memory_mb:.2f} MB")
if memory_mb > 500:
print("⚠️ WARNING: Large dataset detected")
print(" Consider using approximate methods or dimensionality reduction")
# Example usage
debug_knn_performance(X_train_large, X_test_large, y_train_large, y_test_large)
Integration with Production Systems
Deploying KNN in production requires careful consideration of latency, memory usage, and scalability. Here’s a production-ready implementation with caching and monitoring:
import redis
import pickle
import hashlib
from functools import lru_cache
import logging
class ProductionKNN:
def __init__(self, k=5, cache_size=1000, redis_host='localhost', redis_port=6379):
self.k = k
self.model = None
self.scaler = None
self.cache_size = cache_size
# Setup Redis for distributed caching
try:
self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=False)
self.redis_client.ping()
self.use_redis = True
except:
self.use_redis = False
logging.warning("Redis not available, using local cache only")
# Setup logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def fit(self, X, y):
"""Train the model with preprocessing"""
# Scale features
self.scaler = StandardScaler()
X_scaled = self.scaler.fit_transform(X)
# Train KNN
self.model = KNeighborsClassifier(
n_neighbors=self.k,
weights='distance',
algorithm='ball_tree',
n_jobs=-1
)
self.model.fit(X_scaled, y)
self.logger.info(f"Model trained with {X.shape[0]} samples, {X.shape[1]} features")
return self
def _get_cache_key(self, X):
"""Generate cache key for input"""
return hashlib.md5(X.tobytes()).hexdigest()
def _get_from_cache(self, cache_key):
"""Get prediction from cache"""
if self.use_redis:
try:
cached = self.redis_client.get(f"knn_pred:{cache_key}")
if cached:
return pickle.loads(cached)
except Exception as e:
self.logger.warning(f"Redis cache error: {e}")
return None
def _save_to_cache(self, cache_key, prediction):
"""Save prediction to cache"""
if self.use_redis:
try:
self.redis_client.setex(
f"knn_pred:{cache_key}",
3600, # 1 hour TTL
pickle.dumps(prediction)
)
except Exception as e:
self.logger.warning(f"Redis cache save error: {e}")
@lru_cache(maxsize=1000)
def _cached_predict_single(self, x_tuple):
"""Cached prediction for single sample"""
x_array = np.array(x_tuple).reshape(1, -1)
x_scaled = self.scaler.transform(x_array)
return self.model.predict(x_scaled)[0]
def predict(self, X, use_cache=True):
"""Make predictions with caching"""
if self.model is None:
raise ValueError("Model not trained. Call fit() first.")
predictions = []
cache_hits = 0
for i, x in enumerate(X):
if use_cache:
# Try cache first
cache_key = self._get_cache_key(x)
cached_pred = self._get_from_cache(cache_key)
if cached_pred is not None:
predictions.append(cached_pred)
cache_hits += 1
continue
# Make prediction
x_scaled = self.scaler.transform(x.reshape(1, -1))
pred = self.model.predict(x_scaled)[0]
predictions.append(pred)
# Cache result
if use_cache:
self._save_to_cache(cache_key, pred)
if use_cache:
cache_hit_rate = cache_hits / len(X) * 100
self.logger.info(f"Cache hit rate: {cache_hit_rate:.1f}%")
return np.array(predictions)
def predict_with_confidence(self, X):
"""Predict with confidence scores"""
X_scaled = self.scaler.transform(X)
# Get probabilities
probabilities = self.model.predict_proba(X_scaled)
predictions = self.model.predict(X_scaled)
# Calculate confidence as max probability
confidence_scores = np.max(probabilities, axis=1)
results = []
for pred, conf, probs in zip(predictions, confidence_scores, probabilities):
results.append({
'prediction': pred,
'confidence': conf,
'probabilities': dict(zip(self.model.classes_, probs))
})
return results
def get_model_stats(self):
"""Get model performance statistics"""
if self.model is None:
return {"error": "Model not trained"}
return {
"n_samples": self.model.n_samples_fit_,
"n_features": self.model.n_features_in_,
"k_neighbors": self.k,
"algorithm": self.model.algorithm,
"cache_info": self._cached_predict_single.cache_info()._asdict() if hasattr(self._cached_predict_single, 'cache_info') else None
}
# Example deployment
if __name__ == "__main__":
# Create sample data
X, y = make_classification(n_samples=1000, n_features=10, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train production model
prod_knn = ProductionKNN(k=5, cache_size=500)
prod_knn.fit(X_train, y_train)
# Make predictions
predictions = prod_knn.predict(X_test, use_cache=True)
confidence_results = prod_knn.predict_with_confidence(X_test[:5])
print(f"Accuracy: {np.mean(predictions == y_test):.3f}")
print(f"Model stats: {prod_knn.get_model_stats()}")
print(f"Sample confidence results: {confidence_results[0]}")
The K Nearest Neighbors algorithm provides an excellent entry point into machine learning while remaining powerful enough for production use cases. Its intuitive nature makes it perfect for rapid prototyping, but successful deployment requires attention to preprocessing, performance optimization, and proper system integration. Whether you’re building recommendation systems, anomaly detection for your servers, or classification systems, KNN’s simplicity and interpretability make it a valuable tool in your machine learning toolkit.
For more advanced implementations, consider exploring approximate nearest neighbor libraries like Annoy or Faiss for large-scale applications, and always benchmark your specific use case against alternatives to ensure you’re choosing the right algorithm for your problem domain.

This article incorporates information and material from various online sources. We acknowledge and appreciate the work of all original authors, publishers, and websites. While every effort has been made to appropriately credit the source material, any unintentional oversight or omission does not constitute a copyright infringement. All trademarks, logos, and images mentioned are the property of their respective owners. If you believe that any content used in this article infringes upon your copyright, please contact us immediately for review and prompt action.
This article is intended for informational and educational purposes only and does not infringe on the rights of the copyright owners. If any copyrighted material has been used without proper credit or in violation of copyright laws, it is unintentional and we will rectify it promptly upon notification. Please note that the republishing, redistribution, or reproduction of part or all of the contents in any form is prohibited without express written permission from the author and website owner. For permissions or further inquiries, please contact us.