
How to Use subprocess to Run External Programs in Python 3
Python’s subprocess module is arguably one of the most powerful tools in your system administration and development toolkit, allowing you to execute external programs, shell commands, and system utilities directly from your Python scripts. Whether you’re automating deployment pipelines, managing server infrastructure, or integrating legacy applications, subprocess provides the bridge between Python and the broader system environment. This comprehensive guide will walk you through everything from basic command execution to advanced process management, covering real-world scenarios, security considerations, and performance optimization techniques that every developer should master.
Understanding How subprocess Works
The subprocess module replaced older methods like os.system() and os.popen() by providing more robust process creation and management capabilities. At its core, subprocess creates new processes, connects to their input/output/error pipes, and obtains their return codes. Unlike shell-based execution methods, subprocess gives you fine-grained control over process execution, environment variables, working directories, and I/O handling.
The module offers several functions, but the most important ones are:
- subprocess.run() – The recommended high-level interface for most use cases
- subprocess.Popen() – Lower-level interface for advanced process management
- subprocess.call() – Legacy function, now superseded by run()
- subprocess.check_output() – Convenience function for capturing output
When you execute a subprocess command, Python creates a new process using the operating system’s process creation mechanisms (fork/exec on Unix-like systems, CreateProcess on Windows). This new process runs independently but remains connected to your Python script through pipes for communication.
Step-by-Step Implementation Guide
Let’s start with the basics and progressively move to more complex scenarios. The subprocess.run() function is your go-to choice for most applications.
Basic Command Execution
import subprocess
# Simple command execution
result = subprocess.run(['ls', '-la'], capture_output=True, text=True)
print(f"Return code: {result.returncode}")
print(f"Output: {result.stdout}")
print(f"Errors: {result.stderr}")
For system monitoring tasks common in VPS environments, you might need to check disk usage:
import subprocess
import json
def get_disk_usage():
try:
result = subprocess.run(['df', '-h'], capture_output=True, text=True, check=True)
lines = result.stdout.strip().split('\n')[1:] # Skip header
disk_info = []
for line in lines:
parts = line.split()
if len(parts) >= 6:
disk_info.append({
'filesystem': parts[0],
'size': parts[1],
'used': parts[2],
'available': parts[3],
'use_percent': parts[4],
'mounted_on': parts[5]
})
return disk_info
except subprocess.CalledProcessError as e:
print(f"Command failed with return code {e.returncode}")
return None
# Usage
disks = get_disk_usage()
print(json.dumps(disks, indent=2))
Handling Different Input/Output Scenarios
Real-world applications often require more sophisticated I/O handling. Here’s how to manage different scenarios:
import subprocess
import tempfile
# Passing input to a command
def compress_data(data):
process = subprocess.run(
['gzip', '-c'],
input=data,
capture_output=True,
text=False # Important for binary data
)
return process.stdout
# Working with files
def process_log_file(log_path, pattern):
with open(log_path, 'r') as log_file:
result = subprocess.run(
['grep', '-i', pattern],
stdin=log_file,
capture_output=True,
text=True
)
return result.stdout.split('\n') if result.returncode == 0 else []
# Using temporary files for large data
def sort_large_dataset(data_list):
with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file:
for item in data_list:
temp_file.write(f"{item}\n")
temp_path = temp_file.name
try:
result = subprocess.run(
['sort', '-n', temp_path],
capture_output=True,
text=True,
check=True
)
return result.stdout.strip().split('\n')
finally:
os.unlink(temp_path)
Real-World Examples and Use Cases
Let’s explore practical applications you’ll encounter in production environments, especially when managing dedicated servers or complex deployment scenarios.
Database Backup Automation
import subprocess
import datetime
import os
from pathlib import Path
class DatabaseBackupManager:
def __init__(self, db_config):
self.db_config = db_config
self.backup_dir = Path('/var/backups/mysql')
self.backup_dir.mkdir(exist_ok=True)
def create_backup(self):
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = self.backup_dir / f"backup_{timestamp}.sql.gz"
# Create mysqldump command
dump_cmd = [
'mysqldump',
f"--host={self.db_config['host']}",
f"--user={self.db_config['user']}",
f"--password={self.db_config['password']}",
'--single-transaction',
'--routines',
'--triggers',
self.db_config['database']
]
# Pipe through gzip for compression
try:
with open(backup_file, 'wb') as f:
dump_process = subprocess.Popen(dump_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
gzip_process = subprocess.Popen(['gzip'], stdin=dump_process.stdout, stdout=f, stderr=subprocess.PIPE)
dump_process.stdout.close() # Allow dump_process to receive SIGPIPE
_, gzip_error = gzip_process.communicate()
_, dump_error = dump_process.communicate()
if dump_process.returncode != 0:
raise subprocess.SubprocessError(f"mysqldump failed: {dump_error.decode()}")
if gzip_process.returncode != 0:
raise subprocess.SubprocessError(f"gzip failed: {gzip_error.decode()}")
return backup_file
except Exception as e:
if backup_file.exists():
backup_file.unlink()
raise e
def cleanup_old_backups(self, keep_days=7):
cutoff_time = datetime.datetime.now() - datetime.timedelta(days=keep_days)
for backup_file in self.backup_dir.glob('backup_*.sql.gz'):
if backup_file.stat().st_mtime < cutoff_time.timestamp():
backup_file.unlink()
print(f"Removed old backup: {backup_file}")
# Usage
db_config = {
'host': 'localhost',
'user': 'backup_user',
'password': 'secure_password',
'database': 'production_db'
}
backup_manager = DatabaseBackupManager(db_config)
backup_file = backup_manager.create_backup()
backup_manager.cleanup_old_backups()
print(f"Backup created: {backup_file}")
System Monitoring and Alerting
import subprocess
import time
import smtplib
from email.mime.text import MIMEText
class SystemMonitor:
def __init__(self, thresholds):
self.thresholds = thresholds
def check_cpu_usage(self):
# Get CPU usage over 5 seconds
result = subprocess.run(
['sar', '-u', '1', '5'],
capture_output=True,
text=True,
check=True
)
lines = result.stdout.strip().split('\n')
# Parse the average line (last line typically contains average)
avg_line = [line for line in lines if 'Average' in line][0]
cpu_idle = float(avg_line.split()[-1])
cpu_usage = 100 - cpu_idle
return cpu_usage
def check_memory_usage(self):
result = subprocess.run(['free', '-m'], capture_output=True, text=True, check=True)
lines = result.stdout.strip().split('\n')
mem_line = lines[1].split()
total_mem = int(mem_line[1])
used_mem = int(mem_line[2])
memory_percent = (used_mem / total_mem) * 100
return memory_percent
def check_disk_usage(self, mount_point='/'):
result = subprocess.run(['df', mount_point], capture_output=True, text=True, check=True)
line = result.stdout.strip().split('\n')[1]
usage_percent = int(line.split()[4].rstrip('%'))
return usage_percent
def run_monitoring_cycle(self):
alerts = []
cpu_usage = self.check_cpu_usage()
if cpu_usage > self.thresholds['cpu']:
alerts.append(f"High CPU usage: {cpu_usage:.1f}%")
memory_usage = self.check_memory_usage()
if memory_usage > self.thresholds['memory']:
alerts.append(f"High memory usage: {memory_usage:.1f}%")
disk_usage = self.check_disk_usage()
if disk_usage > self.thresholds['disk']:
alerts.append(f"High disk usage: {disk_usage}%")
return alerts
# Configuration
thresholds = {
'cpu': 80.0,
'memory': 85.0,
'disk': 90
}
monitor = SystemMonitor(thresholds)
alerts = monitor.run_monitoring_cycle()
if alerts:
print("System alerts:")
for alert in alerts:
print(f" - {alert}")
Comparing subprocess with Alternatives
Understanding when to use subprocess versus other approaches is crucial for optimal system integration:
Method | Use Case | Pros | Cons | Performance |
---|---|---|---|---|
subprocess.run() | General command execution | Secure, flexible, cross-platform | More verbose than shell alternatives | Good |
os.system() | Simple shell commands | Very simple syntax | Security risks, limited control | Fair |
os.popen() | Command with output capture | Simple for basic output capture | Deprecated, limited error handling | Fair |
Native Python libraries | File operations, HTTP requests | Pure Python, no external dependencies | May not be available for all operations | Excellent |
subprocess.Popen() | Advanced process management | Maximum control and flexibility | Complex API, easy to misuse | Excellent |
Performance Comparison
Here's a practical benchmark comparing different approaches for executing simple commands:
import subprocess
import os
import time
import statistics
def benchmark_methods(command, iterations=100):
results = {}
# subprocess.run() benchmark
times = []
for _ in range(iterations):
start = time.time()
subprocess.run(command, capture_output=True, text=True)
times.append(time.time() - start)
results['subprocess.run'] = statistics.mean(times)
# os.system() benchmark
times = []
for _ in range(iterations):
start = time.time()
os.system(' '.join(command) + ' > /dev/null 2>&1')
times.append(time.time() - start)
results['os.system'] = statistics.mean(times)
# subprocess.Popen() benchmark
times = []
for _ in range(iterations):
start = time.time()
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process.communicate()
times.append(time.time() - start)
results['subprocess.Popen'] = statistics.mean(times)
return results
# Test with a simple command
results = benchmark_methods(['echo', 'hello world'], 1000)
print("Average execution times (seconds):")
for method, avg_time in results.items():
print(f" {method}: {avg_time:.6f}")
Advanced Process Management with Popen
For scenarios requiring real-time process interaction or complex I/O handling, subprocess.Popen() provides the necessary control:
import subprocess
import threading
import queue
import time
class InteractiveProcessManager:
def __init__(self, command):
self.process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1, # Line buffered
universal_newlines=True
)
self.output_queue = queue.Queue()
self.error_queue = queue.Queue()
# Start output reading threads
self.stdout_thread = threading.Thread(target=self._read_output, args=(self.process.stdout, self.output_queue))
self.stderr_thread = threading.Thread(target=self._read_output, args=(self.process.stderr, self.error_queue))
self.stdout_thread.daemon = True
self.stderr_thread.daemon = True
self.stdout_thread.start()
self.stderr_thread.start()
def _read_output(self, pipe, queue):
try:
for line in iter(pipe.readline, ''):
queue.put(line.rstrip())
finally:
pipe.close()
def send_input(self, text):
if self.process.stdin:
self.process.stdin.write(text + '\n')
self.process.stdin.flush()
def get_output(self, timeout=0.1):
outputs = []
try:
while True:
outputs.append(self.output_queue.get(timeout=timeout))
except queue.Empty:
pass
return outputs
def get_errors(self, timeout=0.1):
errors = []
try:
while True:
errors.append(self.error_queue.get(timeout=timeout))
except queue.Empty:
pass
return errors
def is_running(self):
return self.process.poll() is None
def terminate(self):
if self.is_running():
self.process.terminate()
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
# Example: Interactive Python REPL
def interactive_python_example():
python_process = InteractiveProcessManager(['python3', '-i'])
# Send some commands
python_process.send_input('x = 42')
python_process.send_input('print(f"The answer is {x}")')
python_process.send_input('import math')
python_process.send_input('print(f"Pi is approximately {math.pi:.4f}")')
time.sleep(1) # Wait for processing
outputs = python_process.get_output()
for output in outputs:
print(f"Output: {output}")
python_process.terminate()
interactive_python_example()
Security Considerations and Best Practices
Security should be your top priority when executing external commands, especially in production environments. Here are the essential practices:
Avoiding Shell Injection
import subprocess
import shlex
# DANGEROUS - Don't do this
def unsafe_command(user_input):
# This allows shell injection attacks
subprocess.run(f"ls {user_input}", shell=True)
# SAFE - Always use list format and avoid shell=True
def safe_command(user_input):
# Validate and sanitize input first
if not user_input.replace('/', '').replace('.', '').replace('-', '').replace('_', '').isalnum():
raise ValueError("Invalid characters in input")
# Use list format to prevent injection
subprocess.run(['ls', user_input], check=True)
# For complex shell commands, use shlex.quote()
def safe_shell_command(filename):
quoted_filename = shlex.quote(filename)
command = f"find /var/log -name {quoted_filename} -type f"
subprocess.run(command, shell=True, check=True)
# Best practice: Input validation function
def validate_filename(filename):
import re
# Only allow alphanumeric characters, dots, hyphens, and underscores
if not re.match(r'^[a-zA-Z0-9._-]+$', filename):
raise ValueError(f"Invalid filename: {filename}")
# Prevent directory traversal
if '..' in filename or filename.startswith('/'):
raise ValueError(f"Directory traversal attempt: {filename}")
return filename
# Secure file processing example
def process_user_file(user_filename):
try:
safe_filename = validate_filename(user_filename)
result = subprocess.run(
['wc', '-l', f'/safe/directory/{safe_filename}'],
capture_output=True,
text=True,
check=True,
timeout=30 # Always set timeouts
)
return int(result.stdout.split()[0])
except (ValueError, subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
print(f"Error processing file: {e}")
return None
Environment and Permission Management
import subprocess
import os
from pathlib import Path
def secure_command_execution(command, work_dir=None, restricted_env=True):
# Create minimal environment if needed
if restricted_env:
safe_env = {
'PATH': '/usr/bin:/bin',
'LANG': 'C',
'LC_ALL': 'C'
}
else:
safe_env = os.environ.copy()
# Validate working directory
if work_dir:
work_path = Path(work_dir).resolve()
allowed_paths = [Path('/tmp'), Path('/var/tmp'), Path('/home/webapp')]
if not any(str(work_path).startswith(str(allowed)) for allowed in allowed_paths):
raise ValueError(f"Working directory not allowed: {work_path}")
try:
result = subprocess.run(
command,
env=safe_env,
cwd=work_dir,
capture_output=True,
text=True,
check=True,
timeout=60
)
return result
except subprocess.TimeoutExpired:
print("Command timed out")
return None
except subprocess.CalledProcessError as e:
print(f"Command failed: {e}")
return None
# Example usage
result = secure_command_execution(
['grep', '-r', 'ERROR', '.'],
work_dir='/var/log/myapp',
restricted_env=True
)
Error Handling and Troubleshooting
Robust error handling is essential for production applications. Here's a comprehensive approach:
import subprocess
import logging
import sys
from pathlib import Path
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SubprocessManager:
def __init__(self, timeout=30):
self.timeout = timeout
def execute_with_retry(self, command, max_retries=3, retry_delay=1):
"""Execute command with automatic retry logic"""
for attempt in range(max_retries + 1):
try:
result = subprocess.run(
command,
capture_output=True,
text=True,
check=True,
timeout=self.timeout
)
logger.info(f"Command succeeded on attempt {attempt + 1}: {' '.join(command)}")
return result
except subprocess.CalledProcessError as e:
logger.warning(f"Command failed (attempt {attempt + 1}): {e}")
logger.warning(f"Return code: {e.returncode}")
logger.warning(f"Stderr: {e.stderr}")
if attempt == max_retries:
raise
time.sleep(retry_delay)
except subprocess.TimeoutExpired as e:
logger.error(f"Command timed out after {self.timeout} seconds: {' '.join(command)}")
if attempt == max_retries:
raise
time.sleep(retry_delay)
except FileNotFoundError as e:
logger.error(f"Command not found: {command[0]}")
raise # Don't retry for missing commands
def safe_execute(self, command, input_data=None):
"""Execute command with comprehensive error handling"""
try:
# Pre-execution validation
if not command or not isinstance(command, list):
raise ValueError("Command must be a non-empty list")
# Check if command exists
command_path = subprocess.run(['which', command[0]], capture_output=True, text=True)
if command_path.returncode != 0:
raise FileNotFoundError(f"Command not found: {command[0]}")
# Execute with timeout
result = subprocess.run(
command,
input=input_data,
capture_output=True,
text=True,
timeout=self.timeout
)
return {
'success': result.returncode == 0,
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr,
'command': ' '.join(command)
}
except subprocess.TimeoutExpired:
return {
'success': False,
'error': 'timeout',
'message': f'Command timed out after {self.timeout} seconds',
'command': ' '.join(command)
}
except FileNotFoundError as e:
return {
'success': False,
'error': 'not_found',
'message': str(e),
'command': ' '.join(command)
}
except Exception as e:
return {
'success': False,
'error': 'unknown',
'message': str(e),
'command': ' '.join(command)
}
# Usage examples
manager = SubprocessManager(timeout=60)
# Example 1: Retry logic for network-dependent commands
try:
result = manager.execute_with_retry(['curl', '-f', 'https://api.example.com/health'], max_retries=3)
print("API is healthy")
except subprocess.CalledProcessError:
print("API health check failed after retries")
# Example 2: Safe execution with detailed error info
result = manager.safe_execute(['ffmpeg', '-i', 'input.mp4', 'output.avi'])
if result['success']:
print("Video conversion successful")
else:
print(f"Conversion failed: {result['message']}")
if result.get('stderr'):
print(f"Error details: {result['stderr']}")
Performance Optimization Techniques
When dealing with high-frequency subprocess calls or resource-intensive operations, optimization becomes crucial:
import subprocess
import concurrent.futures
import time
import threading
from functools import lru_cache
class OptimizedSubprocessManager:
def __init__(self, max_workers=4):
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
self.command_cache = {}
self.cache_lock = threading.Lock()
@lru_cache(maxsize=128)
def get_command_path(self, command):
"""Cache command paths to avoid repeated 'which' calls"""
try:
result = subprocess.run(['which', command], capture_output=True, text=True, check=True)
return result.stdout.strip()
except subprocess.CalledProcessError:
return None
def execute_batch(self, commands, max_workers=None):
"""Execute multiple commands concurrently"""
if max_workers is None:
max_workers = min(len(commands), 4)
def execute_single(cmd_data):
command, input_data = cmd_data if isinstance(cmd_data, tuple) else (cmd_data, None)
start_time = time.time()
try:
result = subprocess.run(
command,
input=input_data,
capture_output=True,
text=True,
check=True
)
return {
'command': ' '.join(command),
'success': True,
'stdout': result.stdout,
'stderr': result.stderr,
'execution_time': time.time() - start_time
}
except subprocess.CalledProcessError as e:
return {
'command': ' '.join(command),
'success': False,
'returncode': e.returncode,
'stdout': e.stdout,
'stderr': e.stderr,
'execution_time': time.time() - start_time
}
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(execute_single, commands))
return results
def streaming_execution(self, command, line_processor):
"""Execute command with real-time line processing"""
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
def process_stream(stream, processor):
try:
for line in iter(stream.readline, ''):
if line:
processor(line.rstrip())
finally:
stream.close()
# Process stdout and stderr concurrently
stdout_thread = threading.Thread(
target=process_stream,
args=(process.stdout, lambda line: line_processor('stdout', line))
)
stderr_thread = threading.Thread(
target=process_stream,
args=(process.stderr, lambda line: line_processor('stderr', line))
)
stdout_thread.start()
stderr_thread.start()
# Wait for process completion
return_code = process.wait()
stdout_thread.join()
stderr_thread.join()
return return_code
# Example: Parallel log processing
def parallel_log_analysis():
manager = OptimizedSubprocessManager()
# Commands to run in parallel
log_commands = [
(['grep', '-c', 'ERROR', '/var/log/app1.log'], None),
(['grep', '-c', 'WARNING', '/var/log/app1.log'], None),
(['wc', '-l', '/var/log/app1.log'], None),
(['tail', '-n', '100', '/var/log/app1.log'], None)
]
start_time = time.time()
results = manager.execute_batch(log_commands)
total_time = time.time() - start_time
print(f"Processed {len(log_commands)} commands in {total_time:.2f} seconds")
for result in results:
if result['success']:
print(f"{result['command']}: {result['stdout'].strip()}")
else:
print(f"{result['command']} failed: {result['stderr']}")
# Example: Real-time log monitoring
def monitor_logs():
manager = OptimizedSubprocessManager()
def process_log_line(stream_type, line):
if 'ERROR' in line:
print(f"π¨ {stream_type}: {line}")
elif 'WARNING' in line:
print(f"β οΈ {stream_type}: {line}")
else:
print(f"βΉοΈ {stream_type}: {line}")
# Monitor log file in real-time
return_code = manager.streaming_execution(
['tail', '-f', '/var/log/application.log'],
process_log_line
)
# Run examples
if __name__ == "__main__":
parallel_log_analysis()
Integration with System Services
For production environments, especially when managing services on dedicated servers, integration with system services is often necessary:
import subprocess
import json
import time
from enum import Enum
class ServiceStatus(Enum):
ACTIVE = "active"
INACTIVE = "inactive"
FAILED = "failed"
UNKNOWN = "unknown"
class SystemServiceManager:
def __init__(self):
self.systemctl_path = self._find_systemctl()
def _find_systemctl(self):
try:
result = subprocess.run(['which', 'systemctl'], capture_output=True, text=True, check=True)
return result.stdout.strip()
except subprocess.CalledProcessError:
raise RuntimeError("systemctl not found - systemd required")
def get_service_status(self, service_name):
"""Get detailed service status"""
try:
# Get basic status
result = subprocess.run(
['systemctl', 'is-active', service_name],
capture_output=True,
text=True
)
basic_status = result.stdout.strip()
# Get detailed status
detail_result = subprocess.run(
['systemctl', 'status', service_name, '--no-pager', '--lines=0'],
capture_output=True,
text=True
)
# Parse the status output
status_lines = detail_result.stdout.split('\n')
status_info = {}
for line in status_lines:
if 'Active:' in line:
status_info['active'] = line.split('Active:')[1].strip()
elif 'Main PID:' in line:
status_info['main_pid'] = line.split('Main PID:')[1].strip()
elif 'Memory:' in line:
status_info['memory'] = line.split('Memory:')[1].strip()
elif 'CPU:' in line:
status_info['cpu'] = line.split('CPU:')[1].strip()
return {
'service': service_name,
'status': ServiceStatus.ACTIVE if basic_status == 'active' else ServiceStatus.INACTIVE,
'details': status_info
}
except subprocess.CalledProcessError as e:
return {
'service': service_name,
'status': ServiceStatus.FAILED,
'error': e.stderr
}
def restart_service(self, service_name, wait_for_startup=True):
"""Restart service with optional startup verification"""
try:
# Stop the service
subprocess.run(['systemctl', 'stop', service_name], check=True)
time.sleep(2)
# Start the service
subprocess.run(['systemctl', 'start', service_name], check=True)
if wait_for_startup:
# Wait and verify startup
for attempt in range(10):
time.sleep(1)
status = self.get_service_status(service_name)
if status['status'] == ServiceStatus.ACTIVE:
return {'success': True, 'message': f'{service_name} restarted successfully'}
return {'success': False, 'message': f'{service_name} failed to start properly'}
return {'success': True, 'message': f'{service_name} restart initiated'}
except subprocess.CalledProcessError as e:
return {'success': False, 'message': f'Failed to restart {service_name}: {e}'}
def get_service_logs(self, service_name, lines=50, since=None):
"""Get service logs using journalctl"""
cmd = ['journalctl', '-u', service_name, '--no-pager', f'--lines={lines}']
if since:
cmd.extend(['--since', since])
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return result.stdout.split('\n')
except subprocess.CalledProcessError as e:
return [f"Error retrieving logs: {e}"]
# Usage example for service management
def manage_web_services():
service_manager = SystemServiceManager()
services = ['nginx', 'mysql', 'redis-server']
print("Service Status Report:")
print("=" * 50)
for service in services:
status = service_manager.get_service_status(service)
print(f"\n{service.upper()}:")
print(f" Status: {status['status'].value}")
if 'details' in status:
for key, value in status['details'].items():
print(f" {key.title()}: {value}")
if status['status'] != ServiceStatus.ACTIVE:
print(f" π Attempting to restart {service}...")
restart_result = service_manager.restart_service(service)
print(f" Result: {restart_result['message']}")
# Get recent logs for failed services
print("\nRecent logs for any issues:")
print("=" * 50)
problematic_services = [s for s in services
if service_manager.get_service_status(s)['status'] != ServiceStatus.ACTIVE]
for service in problematic_services:
print(f"\nLogs for {service}:")
logs = service_manager.get_service_logs(service, lines=10, since='1 hour ago')
for log_line in logs[-5:]: # Show last 5 lines
print(f" {log_line}")
if __name__ == "__main__":
manage_web_services()
The subprocess module is an indispensable tool for Python developers working with system integration, automation, and infrastructure management. From simple command execution to complex process orchestration, mastering these techniques will significantly enhance your ability to build robust, production-ready applications. Remember to always prioritize security, implement proper error handling, and consider performance implications when designing subprocess-based solutions.
For additional information and advanced use cases, refer to the official Python subprocess documentation and the shlex module documentation for secure shell command construction.

This article incorporates information and material from various online sources. We acknowledge and appreciate the work of all original authors, publishers, and websites. While every effort has been made to appropriately credit the source material, any unintentional oversight or omission does not constitute a copyright infringement. All trademarks, logos, and images mentioned are the property of their respective owners. If you believe that any content used in this article infringes upon your copyright, please contact us immediately for review and prompt action.
This article is intended for informational and educational purposes only and does not infringe on the rights of the copyright owners. If any copyrighted material has been used without proper credit or in violation of copyright laws, it is unintentional and we will rectify it promptly upon notification. Please note that the republishing, redistribution, or reproduction of part or all of the contents in any form is prohibited without express written permission from the author and website owner. For permissions or further inquiries, please contact us.