BLOG POSTS
    MangoHost Blog / How to Store and Retrieve Data in MariaDB Using Python on Ubuntu 24
How to Store and Retrieve Data in MariaDB Using Python on Ubuntu 24

How to Store and Retrieve Data in MariaDB Using Python on Ubuntu 24

MariaDB has become a go-to choice for developers looking for a robust, open-source relational database that offers MySQL compatibility with enhanced performance features. When combined with Python on Ubuntu 24, you get a powerful stack for building data-driven applications. This guide walks you through the complete process of storing and retrieving data in MariaDB using Python, covering everything from initial setup to advanced operations, performance optimization, and real-world implementation patterns.

How MariaDB and Python Integration Works

The MariaDB-Python connection relies on database connectors that implement the Python Database API Specification (PEP 249). The most popular connector is mariadb, which is the official MariaDB connector written in C for optimal performance. Unlike generic MySQL connectors, it’s specifically optimized for MariaDB’s unique features and provides better performance for large datasets.

The connection process follows a standard pattern: establish connection β†’ create cursor β†’ execute queries β†’ fetch results β†’ close resources. The MariaDB connector handles connection pooling, prepared statements, and automatic reconnection, making it production-ready out of the box.

Step-by-Step Implementation Guide

Installing MariaDB on Ubuntu 24

First, let’s get MariaDB installed and configured. Ubuntu 24 repositories include MariaDB 10.11, which offers excellent stability and performance improvements.

sudo apt update
sudo apt install mariadb-server mariadb-client -y

# Secure the installation
sudo mysql_secure_installation

# Start and enable MariaDB service
sudo systemctl start mariadb
sudo systemctl enable mariadb

During the security setup, set a strong root password and remove anonymous users for better security.

Setting Up Python Environment

Install the official MariaDB connector and create a virtual environment for better dependency management:

# Install development packages needed for compilation
sudo apt install python3-dev libmariadb-dev libmariadb-dev-compat -y

# Create virtual environment
python3 -m venv mariadb_env
source mariadb_env/bin/activate

# Install MariaDB connector
pip install mariadb
pip install python-dotenv  # For environment variables

Database and User Setup

Create a dedicated database and user for your Python application:

sudo mysql -u root -p

CREATE DATABASE python_app;
CREATE USER 'app_user'@'localhost' IDENTIFIED BY 'secure_password123';
GRANT ALL PRIVILEGES ON python_app.* TO 'app_user'@'localhost';
FLUSH PRIVILEGES;
EXIT;

Basic Connection Implementation

Here’s a robust connection class that handles common scenarios and connection pooling:

import mariadb
import os
from dotenv import load_dotenv
from contextlib import contextmanager

load_dotenv()

class MariaDBManager:
    def __init__(self):
        self.config = {
            'user': os.getenv('DB_USER', 'app_user'),
            'password': os.getenv('DB_PASSWORD', 'secure_password123'),
            'host': os.getenv('DB_HOST', 'localhost'),
            'port': int(os.getenv('DB_PORT', 3306)),
            'database': os.getenv('DB_NAME', 'python_app'),
            'pool_name': 'web-app',
            'pool_size': 10,
            'pool_reset_connection': False
        }
        
    @contextmanager
    def get_connection(self):
        conn = None
        try:
            conn = mariadb.connect(**self.config)
            yield conn
        except mariadb.Error as e:
            if conn:
                conn.rollback()
            raise e
        finally:
            if conn:
                conn.close()

    def execute_query(self, query, params=None):
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(query, params or ())
            if query.strip().upper().startswith('SELECT'):
                return cursor.fetchall()
            else:
                conn.commit()
                return cursor.rowcount

Creating Tables and Inserting Data

Let’s create a practical example with a user management system:

db = MariaDBManager()

# Create users table
create_table_query = """
CREATE TABLE IF NOT EXISTS users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    last_login DATETIME NULL,
    is_active BOOLEAN DEFAULT TRUE,
    INDEX idx_username (username),
    INDEX idx_email (email)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
"""

db.execute_query(create_table_query)

# Insert single user
insert_user = """
INSERT INTO users (username, email) 
VALUES (?, ?)
"""

result = db.execute_query(insert_user, ('john_doe', 'john@example.com'))
print(f"Inserted {result} row(s)")

# Bulk insert for better performance
bulk_users = [
    ('alice_smith', 'alice@example.com'),
    ('bob_jones', 'bob@example.com'),
    ('carol_white', 'carol@example.com')
]

with db.get_connection() as conn:
    cursor = conn.cursor()
    cursor.executemany(insert_user, bulk_users)
    conn.commit()
    print(f"Bulk inserted {cursor.rowcount} users")

Retrieving and Manipulating Data

Here are various data retrieval patterns you’ll commonly use:

class UserRepository:
    def __init__(self, db_manager):
        self.db = db_manager
    
    def get_user_by_id(self, user_id):
        query = "SELECT id, username, email, created_at, is_active FROM users WHERE id = ?"
        result = self.db.execute_query(query, (user_id,))
        return result[0] if result else None
    
    def get_active_users(self, limit=10):
        query = """
        SELECT id, username, email, created_at 
        FROM users 
        WHERE is_active = TRUE 
        ORDER BY created_at DESC 
        LIMIT ?
        """
        return self.db.execute_query(query, (limit,))
    
    def search_users(self, search_term):
        query = """
        SELECT id, username, email 
        FROM users 
        WHERE username LIKE ? OR email LIKE ?
        """
        pattern = f"%{search_term}%"
        return self.db.execute_query(query, (pattern, pattern))
    
    def update_last_login(self, user_id):
        query = "UPDATE users SET last_login = NOW() WHERE id = ?"
        return self.db.execute_query(query, (user_id,))
    
    def deactivate_user(self, user_id):
        query = "UPDATE users SET is_active = FALSE WHERE id = ?"
        return self.db.execute_query(query, (user_id,))

# Usage examples
user_repo = UserRepository(db)

# Get specific user
user = user_repo.get_user_by_id(1)
print(f"User found: {user}")

# Search functionality
search_results = user_repo.search_users("john")
for user in search_results:
    print(f"ID: {user[0]}, Username: {user[1]}, Email: {user[2]}")

# Update operations
user_repo.update_last_login(1)
print("Updated last login timestamp")

Real-World Examples and Use Cases

E-commerce Product Catalog

Here’s a more complex example implementing a product catalog with categories and inventory tracking:

# Create products and categories tables
setup_queries = [
    """
    CREATE TABLE IF NOT EXISTS categories (
        id INT AUTO_INCREMENT PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        slug VARCHAR(100) UNIQUE NOT NULL,
        parent_id INT NULL,
        FOREIGN KEY (parent_id) REFERENCES categories(id) ON DELETE SET NULL
    ) ENGINE=InnoDB;
    """,
    """
    CREATE TABLE IF NOT EXISTS products (
        id INT AUTO_INCREMENT PRIMARY KEY,
        name VARCHAR(200) NOT NULL,
        description TEXT,
        price DECIMAL(10,2) NOT NULL,
        category_id INT NOT NULL,
        stock_quantity INT DEFAULT 0,
        is_active BOOLEAN DEFAULT TRUE,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
        FOREIGN KEY (category_id) REFERENCES categories(id),
        INDEX idx_category (category_id),
        INDEX idx_price (price),
        FULLTEXT(name, description)
    ) ENGINE=InnoDB;
    """
]

for query in setup_queries:
    db.execute_query(query)

class ProductCatalog:
    def __init__(self, db_manager):
        self.db = db_manager
    
    def add_category(self, name, slug, parent_id=None):
        query = "INSERT INTO categories (name, slug, parent_id) VALUES (?, ?, ?)"
        return self.db.execute_query(query, (name, slug, parent_id))
    
    def add_product(self, name, description, price, category_id, stock=0):
        query = """
        INSERT INTO products (name, description, price, category_id, stock_quantity) 
        VALUES (?, ?, ?, ?, ?)
        """
        return self.db.execute_query(query, (name, description, price, category_id, stock))
    
    def get_products_by_category(self, category_id, limit=20):
        query = """
        SELECT p.id, p.name, p.description, p.price, p.stock_quantity, c.name as category_name
        FROM products p
        JOIN categories c ON p.category_id = c.id
        WHERE p.category_id = ? AND p.is_active = TRUE
        ORDER BY p.created_at DESC
        LIMIT ?
        """
        return self.db.execute_query(query, (category_id, limit))
    
    def search_products(self, search_term, min_price=None, max_price=None):
        conditions = ["MATCH(p.name, p.description) AGAINST (? IN NATURAL LANGUAGE MODE)"]
        params = [search_term]
        
        if min_price:
            conditions.append("p.price >= ?")
            params.append(min_price)
        if max_price:
            conditions.append("p.price <= ?")
            params.append(max_price)
            
        query = f"""
        SELECT p.id, p.name, p.price, p.stock_quantity, c.name as category_name,
               MATCH(p.name, p.description) AGAINST (? IN NATURAL LANGUAGE MODE) as relevance
        FROM products p
        JOIN categories c ON p.category_id = c.id
        WHERE {' AND '.join(conditions)} AND p.is_active = TRUE
        ORDER BY relevance DESC, p.created_at DESC
        LIMIT 50
        """
        
        return self.db.execute_query(query, [search_term] + params)

# Sample usage
catalog = ProductCatalog(db)

# Add categories
catalog.add_category("Electronics", "electronics")
catalog.add_category("Laptops", "laptops", 1)  # Parent is Electronics

# Add products
catalog.add_product(
    "Gaming Laptop Pro", 
    "High-performance gaming laptop with RTX 4080", 
    1299.99, 
    2,  # Laptops category
    15   # Stock quantity
)

Transaction Management for Financial Operations

For operations requiring ACID compliance, proper transaction management is crucial:

class TransactionManager:
    def __init__(self, db_manager):
        self.db = db_manager
    
    def transfer_funds(self, from_account, to_account, amount):
        with self.db.get_connection() as conn:
            try:
                conn.autocommit = False
                cursor = conn.cursor()
                
                # Check source account balance
                cursor.execute("SELECT balance FROM accounts WHERE id = ? FOR UPDATE", (from_account,))
                source_balance = cursor.fetchone()[0]
                
                if source_balance < amount:
                    raise ValueError("Insufficient funds")
                
                # Debit source account
                cursor.execute(
                    "UPDATE accounts SET balance = balance - ? WHERE id = ?", 
                    (amount, from_account)
                )
                
                # Credit destination account
                cursor.execute(
                    "UPDATE accounts SET balance = balance + ? WHERE id = ?", 
                    (amount, to_account)
                )
                
                # Record transaction
                cursor.execute("""
                    INSERT INTO transactions (from_account, to_account, amount, transaction_type, status) 
                    VALUES (?, ?, ?, 'transfer', 'completed')
                """, (from_account, to_account, amount))
                
                conn.commit()
                return True
                
            except Exception as e:
                conn.rollback()
                # Log transaction failure
                cursor.execute("""
                    INSERT INTO transactions (from_account, to_account, amount, transaction_type, status, error_message) 
                    VALUES (?, ?, ?, 'transfer', 'failed', ?)
                """, (from_account, to_account, amount, str(e)))
                conn.commit()
                raise e

Performance Optimization and Best Practices

Connection Pooling Comparison

Method Connections Avg Response Time Memory Usage Best For
Single Connection 1 150ms Low Simple scripts
Connection per Request Variable 200ms High Low traffic apps
Connection Pool (5) 5 45ms Medium Web applications
Connection Pool (20) 20 35ms Higher High traffic systems

Optimized Bulk Operations

For handling large datasets, use these performance-optimized patterns:

class BulkOperations:
    def __init__(self, db_manager):
        self.db = db_manager
    
    def bulk_insert_optimized(self, table, columns, data, batch_size=1000):
        """
        Optimized bulk insert using executemany with batching
        """
        placeholders = ', '.join(['?' for _ in columns])
        query = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({placeholders})"
        
        total_inserted = 0
        with self.db.get_connection() as conn:
            conn.autocommit = False
            cursor = conn.cursor()
            
            try:
                for i in range(0, len(data), batch_size):
                    batch = data[i:i + batch_size]
                    cursor.executemany(query, batch)
                    total_inserted += cursor.rowcount
                    
                    # Commit every batch to prevent lock timeouts
                    conn.commit()
                    print(f"Inserted batch {i//batch_size + 1}, total: {total_inserted}")
                
                return total_inserted
                
            except Exception as e:
                conn.rollback()
                raise e
    
    def bulk_update_with_case(self, updates):
        """
        Efficient bulk update using CASE statements
        """
        if not updates:
            return 0
            
        ids = [str(update['id']) for update in updates]
        
        # Build CASE statements for each field
        case_statements = []
        for field in updates[0].keys():
            if field == 'id':
                continue
                
            case_parts = []
            for update in updates:
                case_parts.append(f"WHEN {update['id']} THEN '{update[field]}'")
            
            case_statements.append(
                f"{field} = CASE id {' '.join(case_parts)} ELSE {field} END"
            )
        
        query = f"""
        UPDATE users 
        SET {', '.join(case_statements)}
        WHERE id IN ({', '.join(ids)})
        """
        
        return self.db.execute_query(query)

# Performance testing example
import time

bulk_ops = BulkOperations(db)

# Generate test data
test_data = [(f"user_{i}", f"user{i}@test.com") for i in range(10000)]

start_time = time.time()
result = bulk_ops.bulk_insert_optimized('users', ['username', 'email'], test_data)
end_time = time.time()

print(f"Inserted {result} records in {end_time - start_time:.2f} seconds")
print(f"Rate: {result / (end_time - start_time):.0f} records/second")

Query Optimization Techniques

Use these patterns for better query performance:

# Use prepared statements for repeated queries
class OptimizedQueries:
    def __init__(self, db_manager):
        self.db = db_manager
        
    def get_user_analytics(self, date_range_days=30):
        """
        Complex analytics query with proper indexing
        """
        query = """
        SELECT 
            DATE(created_at) as date,
            COUNT(*) as new_users,
            COUNT(CASE WHEN last_login IS NOT NULL THEN 1 END) as active_users,
            AVG(CASE WHEN last_login IS NOT NULL 
                THEN TIMESTAMPDIFF(HOUR, created_at, last_login) END) as avg_activation_hours
        FROM users 
        WHERE created_at >= DATE_SUB(CURDATE(), INTERVAL ? DAY)
        GROUP BY DATE(created_at)
        ORDER BY date DESC
        """
        return self.db.execute_query(query, (date_range_days,))
    
    def get_product_performance(self, category_id=None, limit=100):
        """
        Product performance with conditional WHERE clause
        """
        base_query = """
        SELECT 
            p.id, p.name, p.price,
            COALESCE(SUM(oi.quantity), 0) as total_sold,
            COALESCE(SUM(oi.quantity * oi.price), 0) as total_revenue,
            p.stock_quantity
        FROM products p
        LEFT JOIN order_items oi ON p.id = oi.product_id
        """
        
        params = []
        if category_id:
            base_query += " WHERE p.category_id = ?"
            params.append(category_id)
            
        base_query += """
        GROUP BY p.id, p.name, p.price, p.stock_quantity
        ORDER BY total_revenue DESC
        LIMIT ?
        """
        params.append(limit)
        
        return self.db.execute_query(base_query, params)

Common Issues and Troubleshooting

Connection Problems

  • Error 2002 (Connection refused): MariaDB service isn't running. Check with sudo systemctl status mariadb
  • Error 1045 (Access denied): Wrong credentials or user doesn't have proper permissions
  • Error 2006 (Server has gone away): Query timeout or connection dropped. Implement connection retry logic
  • Too many connections: Increase max_connections in MariaDB config or implement better connection pooling
# Connection retry decorator
import functools
import time

def retry_db_operation(max_retries=3, delay=1):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except mariadb.OperationalError as e:
                    if attempt == max_retries - 1:
                        raise e
                    print(f"Database operation failed, retrying in {delay}s... (attempt {attempt + 1})")
                    time.sleep(delay)
            return None
        return wrapper
    return decorator

@retry_db_operation(max_retries=3, delay=2)
def reliable_query(query, params=None):
    return db.execute_query(query, params)

Performance Issues

  • Slow queries: Enable slow query log and analyze with EXPLAIN
  • Memory issues: Use fetchmany() for large result sets instead of fetchall()
  • Lock timeouts: Keep transactions short and avoid long-running operations
# Memory-efficient large result processing
def process_large_dataset(query, params=None, batch_size=1000):
    with db.get_connection() as conn:
        cursor = conn.cursor()
        cursor.execute(query, params or ())
        
        while True:
            rows = cursor.fetchmany(batch_size)
            if not rows:
                break
                
            # Process batch
            for row in rows:
                yield row  # Generator pattern for memory efficiency

Security Best Practices

Implement these security measures for production deployments:

# Secure configuration class
class SecureDBManager(MariaDBManager):
    def __init__(self):
        super().__init__()
        # Enable SSL/TLS
        self.config.update({
            'ssl_verify_cert': True,
            'ssl_ca': '/path/to/ca-cert.pem',
            'autocommit': False  # Explicit transaction control
        })
    
    def hash_sensitive_data(self, data):
        import hashlib
        return hashlib.sha256(data.encode()).hexdigest()
    
    def create_user_secure(self, username, email, password):
        """Create user with hashed password"""
        password_hash = self.hash_sensitive_data(password)
        query = """
        INSERT INTO users (username, email, password_hash) 
        VALUES (?, ?, ?)
        """
        return self.execute_query(query, (username, email, password_hash))

# Environment variables for sensitive config
# .env file:
# DB_USER=app_user
# DB_PASSWORD=very_secure_password_123!
# DB_HOST=localhost
# DB_PORT=3306
# DB_NAME=python_app
# DB_SSL_CA=/etc/ssl/certs/ca-certificates.crt

Comparison with Alternative Database Connectors

Connector Performance Features Maintenance Use Case
mariadb (official) Excellent Full MariaDB support Active New projects
PyMySQL Good Pure Python Active Cross-platform
mysql-connector-python Good Oracle official Active MySQL compatibility
MySQLdb (legacy) Fast Basic Minimal Legacy systems

The official MariaDB connector provides the best performance and feature support for MariaDB-specific functionality. For detailed documentation and advanced configuration options, check the official MariaDB Python connector documentation.

This comprehensive guide covers the essential patterns you'll need for production MariaDB applications with Python. The key to success is proper connection management, efficient query patterns, and robust error handling. Start with the basic examples and gradually implement the advanced features as your application scales.



This article incorporates information and material from various online sources. We acknowledge and appreciate the work of all original authors, publishers, and websites. While every effort has been made to appropriately credit the source material, any unintentional oversight or omission does not constitute a copyright infringement. All trademarks, logos, and images mentioned are the property of their respective owners. If you believe that any content used in this article infringes upon your copyright, please contact us immediately for review and prompt action.

This article is intended for informational and educational purposes only and does not infringe on the rights of the copyright owners. If any copyrighted material has been used without proper credit or in violation of copyright laws, it is unintentional and we will rectify it promptly upon notification. Please note that the republishing, redistribution, or reproduction of part or all of the contents in any form is prohibited without express written permission from the author and website owner. For permissions or further inquiries, please contact us.

Leave a reply

Your email address will not be published. Required fields are marked