跳转到主要内容

Database Performance Optimization Strategies: From Query Tuning to System Architecture

博主
35 分钟
7314 字
--

AI 导读

深刻理解和准确把握"Database Performance Optimization Strategies: From Query Tuning to System Architecture"这一重要概念的核心要义,本文从理论基础、实践应用和发展前景等多个维度进行了系统性阐述,为读者提供了全面而深入的分析视角。

内容由AI智能生成

Database Performance Optimization Strategies: From Query Tuning to System Architecture

Database performance optimization is a critical aspect of modern application development. As data volumes grow and user expectations increase, the ability to efficiently query and manipulate data becomes paramount. This comprehensive guide explores various strategies and techniques for optimizing database performance, from basic query tuning to advanced architectural considerations.

Table of Contents

  1. Performance Analysis and Monitoring
  2. Query Optimization Techniques
  3. Indexing Strategies
  4. Database Schema Design
  5. System-Level Optimization
  6. Caching Strategies
  7. Partitioning and Sharding
  8. Monitoring and Alerting

Performance Analysis and Monitoring

1. Performance Metrics Collection

#!/usr/bin/env python3
# src/monitoring/performance_monitor.py

import time
import psutil
import pymysql
import psycopg2
import logging
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import threading
from collections import defaultdict, deque

@dataclass
class QueryMetrics:
    query_id: str
    query_text: str
    execution_time: float
    rows_examined: int
    rows_returned: int
    cpu_usage: float
    memory_usage: int
    timestamp: datetime
    database_name: str
    user: str

@dataclass
class SystemMetrics:
    timestamp: datetime
    cpu_percent: float
    memory_percent: float
    disk_io_read: int
    disk_io_write: int
    network_io_sent: int
    network_io_recv: int
    active_connections: int
    slow_queries: int

class DatabasePerformanceMonitor:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.logger = self._setup_logging()
        
        # Metrics storage
        self.query_metrics: deque = deque(maxlen=10000)
        self.system_metrics: deque = deque(maxlen=1000)
        
        # Performance thresholds
        self.slow_query_threshold = config.get('slow_query_threshold', 1.0)  # seconds
        self.cpu_threshold = config.get('cpu_threshold', 80.0)  # percent
        self.memory_threshold = config.get('memory_threshold', 85.0)  # percent
        
        # Database connections
        self.db_connections = {}
        self._initialize_connections()
        
        # Monitoring state
        self.monitoring_active = False
        self.monitor_thread = None
        
    def _setup_logging(self) -> logging.Logger:
        """Setup logging configuration"""
        logger = logging.getLogger('PerformanceMonitor')
        logger.setLevel(logging.INFO)
        
        handler = logging.FileHandler('performance_monitor.log')
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        
        return logger
    
    def _initialize_connections(self):
        """Initialize database connections"""
        for db_config in self.config.get('databases', []):
            db_type = db_config['type']
            db_name = db_config['name']
            
            try:
                if db_type == 'mysql':
                    conn = pymysql.connect(
                        host=db_config['host'],
                        port=db_config['port'],
                        user=db_config['user'],
                        password=db_config['password'],
                        database=db_config['database'],
                        charset='utf8mb4'
                    )
                elif db_type == 'postgresql':
                    conn = psycopg2.connect(
                        host=db_config['host'],
                        port=db_config['port'],
                        user=db_config['user'],
                        password=db_config['password'],
                        database=db_config['database']
                    )
                else:
                    continue
                
                self.db_connections[db_name] = {
                    'connection': conn,
                    'type': db_type,
                    'config': db_config
                }
                
                self.logger.info(f"Connected to {db_type} database: {db_name}")
                
            except Exception as e:
                self.logger.error(f"Failed to connect to {db_name}: {e}")
    
    def start_monitoring(self):
        """Start performance monitoring"""
        if self.monitoring_active:
            self.logger.warning("Monitoring is already active")
            return
        
        self.monitoring_active = True
        self.monitor_thread = threading.Thread(target=self._monitoring_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        
        self.logger.info("Performance monitoring started")
    
    def stop_monitoring(self):
        """Stop performance monitoring"""
        self.monitoring_active = False
        if self.monitor_thread:
            self.monitor_thread.join()
        
        self.logger.info("Performance monitoring stopped")
    
    def _monitoring_loop(self):
        """Main monitoring loop"""
        while self.monitoring_active:
            try:
                # Collect system metrics
                system_metrics = self._collect_system_metrics()
                self.system_metrics.append(system_metrics)
                
                # Collect database metrics
                for db_name, db_info in self.db_connections.items():
                    self._collect_database_metrics(db_name, db_info)
                
                # Check for performance issues
                self._check_performance_alerts()
                
                # Sleep for monitoring interval
                time.sleep(self.config.get('monitoring_interval', 30))
                
            except Exception as e:
                self.logger.error(f"Error in monitoring loop: {e}")
                time.sleep(5)
    
    def _collect_system_metrics(self) -> SystemMetrics:
        """Collect system-level metrics"""
        # CPU and Memory
        cpu_percent = psutil.cpu_percent(interval=1)
        memory = psutil.virtual_memory()
        
        # Disk I/O
        disk_io = psutil.disk_io_counters()
        
        # Network I/O
        network_io = psutil.net_io_counters()
        
        # Database connections (simplified)
        active_connections = sum(
            self._get_active_connections(db_name, db_info)
            for db_name, db_info in self.db_connections.items()
        )
        
        # Slow queries count
        slow_queries = self._count_recent_slow_queries()
        
        return SystemMetrics(
            timestamp=datetime.now(),
            cpu_percent=cpu_percent,
            memory_percent=memory.percent,
            disk_io_read=disk_io.read_bytes if disk_io else 0,
            disk_io_write=disk_io.write_bytes if disk_io else 0,
            network_io_sent=network_io.bytes_sent if network_io else 0,
            network_io_recv=network_io.bytes_recv if network_io else 0,
            active_connections=active_connections,
            slow_queries=slow_queries
        )
    
    def _collect_database_metrics(self, db_name: str, db_info: Dict[str, Any]):
        """Collect database-specific metrics"""
        try:
            conn = db_info['connection']
            db_type = db_info['type']
            
            if db_type == 'mysql':
                self._collect_mysql_metrics(db_name, conn)
            elif db_type == 'postgresql':
                self._collect_postgresql_metrics(db_name, conn)
                
        except Exception as e:
            self.logger.error(f"Error collecting metrics for {db_name}: {e}")
    
    def _collect_mysql_metrics(self, db_name: str, conn):
        """Collect MySQL-specific metrics"""
        cursor = conn.cursor()
        
        try:
            # Get slow query log entries
            cursor.execute("""
                SELECT sql_text, query_time, rows_examined, rows_sent, 
                       start_time, user_host
                FROM mysql.slow_log 
                WHERE start_time > %s
                ORDER BY start_time DESC
                LIMIT 100
            """, (datetime.now() - timedelta(minutes=5),))
            
            for row in cursor.fetchall():
                query_metrics = QueryMetrics(
                    query_id=self._generate_query_id(row[0]),
                    query_text=row[0][:500],  # Truncate long queries
                    execution_time=float(row[1]),
                    rows_examined=int(row[2]),
                    rows_returned=int(row[3]),
                    cpu_usage=0.0,  # Not available in slow log
                    memory_usage=0,  # Not available in slow log
                    timestamp=row[4],
                    database_name=db_name,
                    user=row[5].split('@')[0] if '@' in row[5] else row[5]
                )
                self.query_metrics.append(query_metrics)
            
            # Get current process list for active queries
            cursor.execute("SHOW PROCESSLIST")
            active_queries = cursor.fetchall()
            
            for process in active_queries:
                if process[4] and process[6] and process[6] != 'Sleep':
                    # This is an active query
                    query_metrics = QueryMetrics(
                        query_id=self._generate_query_id(process[7] or ''),
                        query_text=(process[7] or '')[:500],
                        execution_time=float(process[5] or 0),
                        rows_examined=0,  # Not available in process list
                        rows_returned=0,  # Not available in process list
                        cpu_usage=0.0,
                        memory_usage=0,
                        timestamp=datetime.now(),
                        database_name=db_name,
                        user=process[1] or ''
                    )
                    self.query_metrics.append(query_metrics)
                    
        except Exception as e:
            self.logger.error(f"Error collecting MySQL metrics: {e}")
        finally:
            cursor.close()
    
    def _collect_postgresql_metrics(self, db_name: str, conn):
        """Collect PostgreSQL-specific metrics"""
        cursor = conn.cursor()
        
        try:
            # Get active queries
            cursor.execute("""
                SELECT query, state, query_start, usename, 
                       backend_start, state_change
                FROM pg_stat_activity 
                WHERE state = 'active' 
                AND query NOT LIKE '%pg_stat_activity%'
            """)
            
            for row in cursor.fetchall():
                execution_time = (datetime.now() - row[2]).total_seconds() if row[2] else 0
                
                query_metrics = QueryMetrics(
                    query_id=self._generate_query_id(row[0]),
                    query_text=row[0][:500],
                    execution_time=execution_time,
                    rows_examined=0,  # Not directly available
                    rows_returned=0,  # Not directly available
                    cpu_usage=0.0,
                    memory_usage=0,
                    timestamp=datetime.now(),
                    database_name=db_name,
                    user=row[3]
                )
                self.query_metrics.append(query_metrics)
            
            # Get query statistics from pg_stat_statements if available
            try:
                cursor.execute("""
                    SELECT query, calls, total_time, mean_time, rows
                    FROM pg_stat_statements 
                    WHERE last_exec > %s
                    ORDER BY total_time DESC
                    LIMIT 50
                """, (datetime.now() - timedelta(minutes=5),))
                
                for row in cursor.fetchall():
                    query_metrics = QueryMetrics(
                        query_id=self._generate_query_id(row[0]),
                        query_text=row[0][:500],
                        execution_time=float(row[3]) / 1000.0,  # Convert to seconds
                        rows_examined=0,
                        rows_returned=int(row[4]),
                        cpu_usage=0.0,
                        memory_usage=0,
                        timestamp=datetime.now(),
                        database_name=db_name,
                        user='system'
                    )
                    self.query_metrics.append(query_metrics)
                    
            except Exception:
                # pg_stat_statements extension not available
                pass
                
        except Exception as e:
            self.logger.error(f"Error collecting PostgreSQL metrics: {e}")
        finally:
            cursor.close()
    
    def _get_active_connections(self, db_name: str, db_info: Dict[str, Any]) -> int:
        """Get number of active connections"""
        try:
            conn = db_info['connection']
            db_type = db_info['type']
            cursor = conn.cursor()
            
            if db_type == 'mysql':
                cursor.execute("SHOW STATUS LIKE 'Threads_connected'")
                result = cursor.fetchone()
                return int(result[1]) if result else 0
            elif db_type == 'postgresql':
                cursor.execute("SELECT count(*) FROM pg_stat_activity")
                result = cursor.fetchone()
                return int(result[0]) if result else 0
                
        except Exception as e:
            self.logger.error(f"Error getting active connections for {db_name}: {e}")
            return 0
        finally:
            if 'cursor' in locals():
                cursor.close()
    
    def _count_recent_slow_queries(self) -> int:
        """Count slow queries in recent time window"""
        cutoff_time = datetime.now() - timedelta(minutes=5)
        return sum(
            1 for metric in self.query_metrics
            if metric.timestamp > cutoff_time and 
               metric.execution_time > self.slow_query_threshold
        )
    
    def _check_performance_alerts(self):
        """Check for performance issues and generate alerts"""
        if not self.system_metrics:
            return
        
        latest_metrics = self.system_metrics[-1]
        
        # CPU threshold alert
        if latest_metrics.cpu_percent > self.cpu_threshold:
            self._send_alert(
                'HIGH_CPU',
                f"CPU usage is {latest_metrics.cpu_percent:.1f}%, "
                f"exceeding threshold of {self.cpu_threshold}%"
            )
        
        # Memory threshold alert
        if latest_metrics.memory_percent > self.memory_threshold:
            self._send_alert(
                'HIGH_MEMORY',
                f"Memory usage is {latest_metrics.memory_percent:.1f}%, "
                f"exceeding threshold of {self.memory_threshold}%"
            )
        
        # Slow query alert
        if latest_metrics.slow_queries > 10:
            self._send_alert(
                'SLOW_QUERIES',
                f"Detected {latest_metrics.slow_queries} slow queries in the last 5 minutes"
            )
    
    def _send_alert(self, alert_type: str, message: str):
        """Send performance alert"""
        alert = {
            'type': alert_type,
            'message': message,
            'timestamp': datetime.now().isoformat(),
            'severity': 'WARNING'
        }
        
        self.logger.warning(f"ALERT [{alert_type}]: {message}")
        
        # Here you could integrate with alerting systems like:
        # - Email notifications
        # - Slack/Teams webhooks
        # - PagerDuty
        # - Custom monitoring systems
    
    def _generate_query_id(self, query_text: str) -> str:
        """Generate unique ID for query"""
        import hashlib
        return hashlib.md5(query_text.encode()).hexdigest()[:16]
    
    def get_performance_summary(self, hours: int = 24) -> Dict[str, Any]:
        """Get performance summary for specified time period"""
        cutoff_time = datetime.now() - timedelta(hours=hours)
        
        # Filter metrics by time
        recent_queries = [
            m for m in self.query_metrics 
            if m.timestamp > cutoff_time
        ]
        recent_system = [
            m for m in self.system_metrics 
            if m.timestamp > cutoff_time
        ]
        
        if not recent_queries or not recent_system:
            return {'error': 'Insufficient data for analysis'}
        
        # Query analysis
        slow_queries = [q for q in recent_queries if q.execution_time > self.slow_query_threshold]
        avg_execution_time = sum(q.execution_time for q in recent_queries) / len(recent_queries)
        
        # System analysis
        avg_cpu = sum(s.cpu_percent for s in recent_system) / len(recent_system)
        avg_memory = sum(s.memory_percent for s in recent_system) / len(recent_system)
        max_connections = max(s.active_connections for s in recent_system)
        
        # Top slow queries
        top_slow_queries = sorted(
            slow_queries, 
            key=lambda x: x.execution_time, 
            reverse=True
        )[:10]
        
        return {
            'time_period_hours': hours,
            'total_queries': len(recent_queries),
            'slow_queries_count': len(slow_queries),
            'slow_query_percentage': (len(slow_queries) / len(recent_queries)) * 100,
            'average_execution_time': avg_execution_time,
            'average_cpu_usage': avg_cpu,
            'average_memory_usage': avg_memory,
            'max_concurrent_connections': max_connections,
            'top_slow_queries': [
                {
                    'query_text': q.query_text,
                    'execution_time': q.execution_time,
                    'database': q.database_name,
                    'user': q.user
                }
                for q in top_slow_queries
            ]
        }
    
    def export_metrics(self, filename: str, format: str = 'json'):
        """Export collected metrics to file"""
        data = {
            'query_metrics': [asdict(m) for m in self.query_metrics],
            'system_metrics': [asdict(m) for m in self.system_metrics],
            'export_timestamp': datetime.now().isoformat()
        }
        
        if format == 'json':
            with open(filename, 'w') as f:
                json.dump(data, f, indent=2, default=str)
        else:
            raise ValueError(f"Unsupported format: {format}")
        
        self.logger.info(f"Metrics exported to {filename}")

def main():
    # Example configuration
    config = {
        'databases': [
            {
                'name': 'main_db',
                'type': 'mysql',
                'host': 'localhost',
                'port': 3306,
                'user': 'monitor_user',
                'password': 'monitor_pass',
                'database': 'production'
            },
            {
                'name': 'analytics_db',
                'type': 'postgresql',
                'host': 'localhost',
                'port': 5432,
                'user': 'monitor_user',
                'password': 'monitor_pass',
                'database': 'analytics'
            }
        ],
        'slow_query_threshold': 1.0,
        'cpu_threshold': 80.0,
        'memory_threshold': 85.0,
        'monitoring_interval': 30
    }
    
    # Create and start monitor
    monitor = DatabasePerformanceMonitor(config)
    
    try:
        monitor.start_monitoring()
        
        # Let it run for a while
        time.sleep(300)  # 5 minutes
        
        # Get performance summary
        summary = monitor.get_performance_summary(hours=1)
        print("Performance Summary:")
        print(json.dumps(summary, indent=2))
        
        # Export metrics
        monitor.export_metrics('performance_metrics.json')
        
    finally:
        monitor.stop_monitoring()

if __name__ == "__main__":
    main()

Query Optimization Techniques

1. SQL Query Analyzer

#!/usr/bin/env python3
# src/optimization/query_analyzer.py

import re
import sqlparse
from sqlparse.sql import Statement, Token
from sqlparse.tokens import Keyword, Name, Punctuation
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from enum import Enum

class OptimizationLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class OptimizationSuggestion:
    level: OptimizationLevel
    category: str
    description: str
    original_query: str
    optimized_query: Optional[str]
    estimated_improvement: str
    explanation: str

class SQLQueryAnalyzer:
    def __init__(self):
        self.optimization_rules = [
            self._check_select_star,
            self._check_missing_where_clause,
            self._check_function_in_where,
            self._check_or_conditions,
            self._check_like_patterns,
            self._check_subquery_optimization,
            self._check_join_conditions,
            self._check_order_by_optimization,
            self._check_group_by_optimization,
            self._check_index_hints
        ]
    
    def analyze_query(self, query: str) -> List[OptimizationSuggestion]:
        """Analyze SQL query and provide optimization suggestions"""
        suggestions = []
        
        # Parse the query
        try:
            parsed = sqlparse.parse(query)[0]
        except Exception as e:
            return [OptimizationSuggestion(
                level=OptimizationLevel.CRITICAL,
                category="Syntax Error",
                description=f"Query parsing failed: {str(e)}",
                original_query=query,
                optimized_query=None,
                estimated_improvement="N/A",
                explanation="Fix syntax errors before optimization"
            )]
        
        # Apply optimization rules
        for rule in self.optimization_rules:
            try:
                rule_suggestions = rule(query, parsed)
                if rule_suggestions:
                    suggestions.extend(rule_suggestions)
            except Exception as e:
                # Log error but continue with other rules
                print(f"Error in rule {rule.__name__}: {e}")
        
        return suggestions
    
    def _check_select_star(self, query: str, parsed: Statement) -> List[OptimizationSuggestion]:
        """Check for SELECT * usage"""
        suggestions = []
        
        if re.search(r'\bSELECT\s+\*\b', query, re.IGNORECASE):
            suggestions.append(OptimizationSuggestion(
                level=OptimizationLevel.MEDIUM,
                category="Column Selection",
                description="Avoid SELECT * - specify only needed columns",
                original_query=query,
                optimized_query=None,
                estimated_improvement="10-50% reduction in I/O",
                explanation="SELECT * retrieves all columns, increasing network traffic and memory usage. "
                           "Specify only the columns you actually need."
            ))
        
        return suggestions
    
    def _check_missing_where_clause(self, query: str, parsed: Statement) -> List[OptimizationSuggestion]:
        """Check for missing WHERE clause in SELECT/UPDATE/DELETE"""
        suggestions = []
        
        query_upper = query.upper()
        
        # Check for SELECT without WHERE
        if ('SELECT' in query_upper and 
            'WHERE' not in query_upper and 
            'LIMIT' not in query_upper and
            'JOIN' not in query_upper):
            
            suggestions.append(OptimizationSuggestion(
                level=OptimizationLevel.HIGH,
                category="Filtering",
                description="SELECT query without WHERE clause may scan entire table",
                original_query=query,
                optimized_query=None,
                estimated_improvement="90%+ reduction in execution time",
                explanation="Add WHERE clause to filter rows and use indexes effectively"
            ))
        
        # Check for UPDATE/DELETE without WHERE
        if (('UPDATE' in query_upper or 'DELETE' in query_upper) and 
            'WHERE' not in query_upper):
            
            suggestions.append(OptimizationSuggestion(
                level=OptimizationLevel.CRITICAL,
                category="Safety",
                description="UPDATE/DELETE without WHERE clause affects all rows",
                original_query=query,
                optimized_query=None,
                estimated_improvement="Prevents accidental data modification",
                explanation="Always use WHERE clause with UPDATE/DELETE to avoid modifying all rows"
            ))
        
        return suggestions
    
    def _check_function_in_where(self, query: str, parsed: Statement) -> List[OptimizationSuggestion]:
        """Check for functions applied to columns in WHERE clause"""
        suggestions = []
        
        # Common function patterns that prevent index usage
        function_patterns = [
            r'\bUPPER\s*\(\s*\w+\s*\)',
            r'\bLOWER\s*\(\s*\w+\s*\)',
            r'\bSUBSTRING\s*\(\s*\w+',
            r'\bDATE\s*\(\s*\w+\s*\)',
            r'\bYEAR\s*\(\s*\w+\s*\)',
            r'\bMONTH\s*\(\s*\w+\s*\)'
        ]
        
        for pattern in function_patterns:
            if re.search(pattern, query, re.IGNORECASE):
                suggestions.append(OptimizationSuggestion(
                    level=OptimizationLevel.HIGH,
                    category="Index Usage",
                    description="Functions on columns in WHERE clause prevent index usage",
                    original_query=query,
                    optimized_query=None,
                    estimated_improvement="50-90% improvement with proper indexing",
                    explanation="Move functions to the right side of comparison or use functional indexes"
                ))
                break
        
        return suggestions
    
    def _check_or_conditions(self, query: str, parsed: Statement) -> List[OptimizationSuggestion]:
        """Check for OR conditions that might benefit from UNION"""
        suggestions = []
        
        # Count OR conditions in WHERE clause
        where_match = re.search(r'WHERE\s+(.*?)(?:\s+GROUP\s+BY|\s+ORDER\s+BY|\s+LIMIT|$)', 
                               query, re.IGNORECASE | re.DOTALL)
        
        if where_match:
            where_clause = where_match.group(1)
            or_count = len(re.findall(r'\bOR\b', where_clause, re.IGNORECASE))
            
            if or_count >= 3:
                suggestions.append(OptimizationSuggestion(
                    level=OptimizationLevel.MEDIUM,
                    category="Query Structure",
                    description=f"Multiple OR conditions ({or_count}) may benefit from UNION",
                    original_query=query,
                    optimized_query=None,
                    estimated_improvement="20-60% improvement with proper indexing",
                    explanation="Consider rewriting multiple OR conditions as UNION queries "
                               "to better utilize indexes"
                ))
        
        return suggestions
    
    def _check_like_patterns(self, query: str, parsed: Statement) -> List[OptimizationSuggestion]:
        """Check for inefficient LIKE patterns"""
        suggestions = []
        
        # Check for leading wildcard patterns
        leading_wildcard = re.findall(r"LIKE\s+['\"]%[^'\"]*['\"]", query, re.IGNORECASE)
        
        if leading_wildcard:
            suggestions.append(OptimizationSuggestion(
                level=OptimizationLevel.HIGH,
                category="Pattern Matching",
                description="LIKE patterns starting with % cannot use indexes",
                original_query=query,
                optimized_query=None,
                estimated_improvement="Consider full-text search or other alternatives",
                explanation="Leading wildcards force full table scans. "
                           "Consider full-text indexes or alternative search methods"
            ))
        
        return suggestions
    
    def _check_subquery_optimization(self, query: str, parsed: Statement) -> List[OptimizationSuggestion]:
        """Check for subqueries that could be optimized"""
        suggestions = []
        
        # Check for correlated subqueries
        if re.search(r'WHERE\s+\w+\s+IN\s*\(\s*SELECT', query, re.IGNORECASE):
            suggestions.append(OptimizationSuggestion(
                level=OptimizationLevel.MEDIUM,
                category="Subquery Optimization",
                description="IN subquery might be optimized with JOIN",
                original_query=query,
                optimized_query=None,
                estimated_improvement="20-80% improvement depending on data size",
                explanation="Consider rewriting IN subqueries as JOINs for better performance"
            ))
        
        # Check for EXISTS subqueries
        if re.search(r'WHERE\s+EXISTS\s*\(\s*SELECT', query, re.IGNORECASE):
            suggestions.append(OptimizationSuggestion(
                level=OptimizationLevel.LOW,
                category="Subquery Optimization",
                description="EXISTS subquery detected - ensure proper indexing",
                original_query=query,
                optimized_query=None,
                estimated_improvement="Varies with indexing strategy",
                explanation="EXISTS subqueries can be efficient but require proper indexing "
                           "on correlated columns"
            ))
        
        return suggestions
    
    def _check_join_conditions(self, query: str, parsed: Statement) -> List[OptimizationSuggestion]:
        """Check JOIN conditions and types"""
        suggestions = []
        
        # Check for CROSS JOINs or missing JOIN conditions
        if re.search(r'FROM\s+\w+\s*,\s*\w+', query, re.IGNORECASE):
            suggestions.append(OptimizationSuggestion(
                level=OptimizationLevel.HIGH,
                category="JOIN Optimization",
                description="Comma-separated tables create CROSS JOIN",
                original_query=query,
                optimized_query=None,
                estimated_improvement="Significant - prevents cartesian products",
                explanation="Use explicit JOIN syntax with proper ON conditions "
                           "to avoid cartesian products"
            ))
        
        # Check for JOIN without ON clause
        join_matches = re.findall(r'(\w+\s+)?JOIN\s+\w+(?!\s+ON)', query, re.IGNORECASE)
        if join_matches:
            suggestions.append(OptimizationSuggestion(
                level=OptimizationLevel.CRITICAL,
                category="JOIN Optimization",
                description="JOIN without ON clause creates cartesian product",
                original_query=query,
                optimized_query=None,
                estimated_improvement="Critical - prevents query from running efficiently",
                explanation="Always specify ON conditions for JOINs to define the relationship"
            ))
        
        return suggestions
    
    def _check_order_by_optimization(self, query: str, parsed: Statement) -> List[OptimizationSuggestion]:
        """Check ORDER BY optimization opportunities"""
        suggestions = []
        
        # Check for ORDER BY without LIMIT
        if ('ORDER BY' in query.upper() and 
            'LIMIT' not in query.upper() and
            'TOP' not in query.upper()):
            
            suggestions.append(OptimizationSuggestion(
                level=OptimizationLevel.MEDIUM,
                category="Sorting",
                description="ORDER BY without LIMIT sorts entire result set",
                original_query=query,
                optimized_query=None,
                estimated_improvement="Consider adding LIMIT if appropriate",
                explanation="If you only need top N results, add LIMIT to avoid sorting "
                           "the entire result set"
            ))
        
        return suggestions
    
    def _check_group_by_optimization(self, query: str, parsed: Statement) -> List[OptimizationSuggestion]:
        """Check GROUP BY optimization opportunities"""
        suggestions = []
        
        # Check for GROUP BY with ORDER BY on different columns
        group_match = re.search(r'GROUP\s+BY\s+([\w\s,]+)', query, re.IGNORECASE)
        order_match = re.search(r'ORDER\s+BY\s+([\w\s,]+)', query, re.IGNORECASE)
        
        if group_match and order_match:
            group_cols = [col.strip() for col in group_match.group(1).split(',')]
            order_cols = [col.strip().split()[0] for col in order_match.group(1).split(',')]
            
            if not any(col in group_cols for col in order_cols):
                suggestions.append(OptimizationSuggestion(
                    level=OptimizationLevel.MEDIUM,
                    category="Grouping and Sorting",
                    description="ORDER BY columns differ from GROUP BY columns",
                    original_query=query,
                    optimized_query=None,
                    estimated_improvement="Consider ordering by GROUP BY columns",
                    explanation="Ordering by GROUP BY columns can be more efficient "
                               "as the data is already grouped"
                ))
        
        return suggestions
    
    def _check_index_hints(self, query: str, parsed: Statement) -> List[OptimizationSuggestion]:
        """Suggest potential indexing opportunities"""
        suggestions = []
        
        # Extract WHERE conditions
        where_match = re.search(r'WHERE\s+(.*?)(?:\s+GROUP\s+BY|\s+ORDER\s+BY|\s+LIMIT|$)', 
                               query, re.IGNORECASE | re.DOTALL)
        
        if where_match:
            where_clause = where_match.group(1)
            
            # Find equality conditions
            eq_conditions = re.findall(r'(\w+)\s*=\s*', where_clause, re.IGNORECASE)
            
            if eq_conditions:
                suggestions.append(OptimizationSuggestion(
                    level=OptimizationLevel.LOW,
                    category="Indexing",
                    description=f"Consider indexes on columns: {', '.join(set(eq_conditions))}",
                    original_query=query,
                    optimized_query=None,
                    estimated_improvement="Significant with proper indexing",
                    explanation="Equality conditions in WHERE clause benefit from indexes"
                ))
        
        # Check JOIN conditions for indexing
        join_matches = re.findall(r'JOIN\s+\w+\s+ON\s+(\w+\.\w+)\s*=\s*(\w+\.\w+)', 
                                 query, re.IGNORECASE)
        
        if join_matches:
            join_columns = []
            for match in join_matches:
                join_columns.extend([match[0], match[1]])
            
            suggestions.append(OptimizationSuggestion(
                level=OptimizationLevel.LOW,
                category="Indexing",
                description=f"Ensure indexes on JOIN columns: {', '.join(set(join_columns))}",
                original_query=query,
                optimized_query=None,
                estimated_improvement="Critical for JOIN performance",
                explanation="JOIN conditions require indexes on both sides for optimal performance"
            ))
        
        return suggestions

def main():
    # Example usage
    analyzer = SQLQueryAnalyzer()
    
    test_queries = [
        """
        SELECT * FROM users u, orders o 
        WHERE UPPER(u.email) = 'TEST@EXAMPLE.COM'
        ORDER BY u.created_date
        """,
        
        """
        SELECT user_id, name, email FROM users 
        WHERE status = 'active' 
        AND (category = 'premium' OR category = 'gold' OR category = 'platinum')
        ORDER BY created_date DESC
        LIMIT 10
        """,
        
        """
        UPDATE users SET last_login = NOW()
        """,
        
        """
        SELECT u.name, COUNT(o.id) as order_count
        FROM users u
        LEFT JOIN orders o ON u.id = o.user_id
        WHERE u.email LIKE '%@gmail.com'
        GROUP BY u.id, u.name
        ORDER BY order_count DESC
        """
    ]
    
    for i, query in enumerate(test_queries, 1):
        print(f"\n=== Query {i} Analysis ===")
        print(f"Query: {query.strip()}")
        
        suggestions = analyzer.analyze_query(query)
        
        if suggestions:
            print(f"\nOptimization Suggestions ({len(suggestions)}):")
            for j, suggestion in enumerate(suggestions, 1):
                print(f"\n{j}. [{suggestion.level.value.upper()}] {suggestion.category}")
                print(f"   Description: {suggestion.description}")
                print(f"   Improvement: {suggestion.estimated_improvement}")
                print(f"   Explanation: {suggestion.explanation}")
        else:
            print("\nNo optimization suggestions found.")

if __name__ == "__main__":
    main()

Indexing Strategies

1. Index Recommendation Engine

#!/usr/bin/env python3
# src/indexing/index_advisor.py

import re
import json
import logging
from typing import Dict, List, Any, Optional, Set, Tuple
from dataclasses import dataclass, asdict
from collections import defaultdict, Counter
from enum import Enum

class IndexType(Enum):
    BTREE = "btree"
    HASH = "hash"
    BITMAP = "bitmap"
    PARTIAL = "partial"
    COMPOSITE = "composite"
    COVERING = "covering"
    FUNCTIONAL = "functional"

@dataclass
class IndexRecommendation:
    table_name: str
    index_name: str
    index_type: IndexType
    columns: List[str]
    priority: int  # 1-10, 10 being highest
    estimated_benefit: str
    creation_sql: str
    rationale: str
    query_patterns: List[str]
    estimated_size_mb: float

@dataclass
class QueryPattern:
    pattern_id: str
    query_template: str
    frequency: int
    avg_execution_time: float
    tables_accessed: List[str]
    where_columns: List[str]
    join_columns: List[str]
    order_by_columns: List[str]
    group_by_columns: List[str]

class IndexAdvisor:
    def __init__(self, database_type: str = 'mysql'):
        self.database_type = database_type.lower()
        self.logger = self._setup_logging()
        
        # Query patterns storage
        self.query_patterns: Dict[str, QueryPattern] = {}
        
        # Table metadata
        self.table_metadata: Dict[str, Dict[str, Any]] = {}
        
        # Existing indexes
        self.existing_indexes: Dict[str, List[Dict[str, Any]]] = {}
        
        # Configuration
        self.min_query_frequency = 10
        self.min_execution_time = 0.1  # seconds
        self.max_index_columns = 5
        
    def _setup_logging(self) -> logging.Logger:
        """Setup logging configuration"""
        logger = logging.getLogger('IndexAdvisor')
        logger.setLevel(logging.INFO)
        
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        
        return logger
    
    def analyze_query_workload(self, queries: List[Dict[str, Any]]) -> List[IndexRecommendation]:
        """Analyze query workload and generate index recommendations"""
        
        # Step 1: Extract query patterns
        self._extract_query_patterns(queries)
        
        # Step 2: Analyze patterns for indexing opportunities
        recommendations = []
        
        # Single column indexes
        recommendations.extend(self._recommend_single_column_indexes())
        
        # Composite indexes
        recommendations.extend(self._recommend_composite_indexes())
        
        # Covering indexes
        recommendations.extend(self._recommend_covering_indexes())
        
        # Partial indexes
        recommendations.extend(self._recommend_partial_indexes())
        
        # Step 3: Prioritize and filter recommendations
        recommendations = self._prioritize_recommendations(recommendations)
        
        return recommendations
    
    def _extract_query_patterns(self, queries: List[Dict[str, Any]]):
        """Extract patterns from query workload"""
        pattern_counter = defaultdict(list)
        
        for query_data in queries:
            query_text = query_data.get('query', '')
            execution_time = query_data.get('execution_time', 0)
            
            # Normalize query to create pattern
            normalized = self._normalize_query(query_text)
            pattern_id = self._generate_pattern_id(normalized)
            
            pattern_counter[pattern_id].append({
                'query': query_text,
                'normalized': normalized,
                'execution_time': execution_time
            })
        
        # Create QueryPattern objects
        for pattern_id, query_list in pattern_counter.items():
            if len(query_list) < self.min_query_frequency:
                continue
            
            representative_query = query_list[0]['normalized']
            avg_execution_time = sum(q['execution_time'] for q in query_list) / len(query_list)
            
            if avg_execution_time < self.min_execution_time:
                continue
            
            # Analyze query structure
            analysis = self._analyze_query_structure(representative_query)
            
            pattern = QueryPattern(
                pattern_id=pattern_id,
                query_template=representative_query,
                frequency=len(query_list),
                avg_execution_time=avg_execution_time,
                tables_accessed=analysis['tables'],
                where_columns=analysis['where_columns'],
                join_columns=analysis['join_columns'],
                order_by_columns=analysis['order_by_columns'],
                group_by_columns=analysis['group_by_columns']
            )
            
            self.query_patterns[pattern_id] = pattern
    
    def _normalize_query(self, query: str) -> str:
        """Normalize query by replacing literals with placeholders"""
        # Remove extra whitespace
        normalized = re.sub(r'\s+', ' ', query.strip())
        
        # Replace string literals
        normalized = re.sub(r"'[^']*'", "'?'", normalized)
        
        # Replace numeric literals
        normalized = re.sub(r'\b\d+\b', '?', normalized)
        
        # Convert to uppercase for consistency
        normalized = normalized.upper()
        
        return normalized
    
    def _generate_pattern_id(self, normalized_query: str) -> str:
        """Generate unique pattern ID"""
        import hashlib
        return hashlib.md5(normalized_query.encode()).hexdigest()[:16]
    
    def _analyze_query_structure(self, query: str) -> Dict[str, List[str]]:
        """Analyze query structure to extract column usage"""
        analysis = {
            'tables': [],
            'where_columns': [],
            'join_columns': [],
            'order_by_columns': [],
            'group_by_columns': []
        }
        
        # Extract tables
        table_matches = re.findall(r'FROM\s+(\w+)', query, re.IGNORECASE)
        table_matches.extend(re.findall(r'JOIN\s+(\w+)', query, re.IGNORECASE))
        analysis['tables'] = list(set(table_matches))
        
        # Extract WHERE columns
        where_match = re.search(r'WHERE\s+(.*?)(?:\s+GROUP\s+BY|\s+ORDER\s+BY|\s+LIMIT|$)', 
                               query, re.IGNORECASE | re.DOTALL)
        if where_match:
            where_clause = where_match.group(1)
            # Simple column extraction (can be improved)
            columns = re.findall(r'(\w+)\s*[=<>!]', where_clause)
            analysis['where_columns'] = list(set(columns))
        
        # Extract JOIN columns
        join_matches = re.findall(r'ON\s+(\w+)\.(\w+)\s*=\s*(\w+)\.(\w+)', query, re.IGNORECASE)
        for match in join_matches:
            analysis['join_columns'].extend([f"{match[0]}.{match[1]}", f"{match[2]}.{match[3]}"])
        
        # Extract ORDER BY columns
        order_match = re.search(r'ORDER\s+BY\s+(.*?)(?:\s+LIMIT|$)', query, re.IGNORECASE)
        if order_match:
            order_clause = order_match.group(1)
            columns = re.findall(r'(\w+)', order_clause)
            analysis['order_by_columns'] = list(set(columns))
        
        # Extract GROUP BY columns
        group_match = re.search(r'GROUP\s+BY\s+(.*?)(?:\s+ORDER\s+BY|\s+LIMIT|$)', query, re.IGNORECASE)
        if group_match:
            group_clause = group_match.group(1)
            columns = re.findall(r'(\w+)', group_clause)
            analysis['group_by_columns'] = list(set(columns))
        
        return analysis
    
    def _recommend_single_column_indexes(self) -> List[IndexRecommendation]:
        """Recommend single column indexes"""
        recommendations = []
        column_usage = defaultdict(list)
        
        # Collect column usage statistics
        for pattern in self.query_patterns.values():
            for column in pattern.where_columns:
                column_usage[column].append({
                    'pattern': pattern,
                    'usage_type': 'where'
                })
            
            for column in pattern.order_by_columns:
                column_usage[column].append({
                    'pattern': pattern,
                    'usage_type': 'order_by'
                })
        
        # Generate recommendations
        for column, usages in column_usage.items():
            if len(usages) < 2:  # Skip columns used in only one pattern
                continue
            
            # Calculate priority based on usage frequency and execution time
            total_frequency = sum(usage['pattern'].frequency for usage in usages)
            avg_execution_time = sum(usage['pattern'].avg_execution_time for usage in usages) / len(usages)
            
            priority = min(10, int((total_frequency * avg_execution_time) / 10))
            
            # Determine table name (simplified)
            table_name = self._infer_table_name(column, usages)
            
            if table_name:
                recommendation = IndexRecommendation(
                    table_name=table_name,
                    index_name=f"idx_{table_name}_{column}",
                    index_type=IndexType.BTREE,
                    columns=[column],
                    priority=priority,
                    estimated_benefit=f"Improves {len(usages)} query patterns",
                    creation_sql=self._generate_index_sql(table_name, f"idx_{table_name}_{column}", [column]),
                    rationale=f"Column '{column}' is frequently used in WHERE/ORDER BY clauses",
                    query_patterns=[usage['pattern'].pattern_id for usage in usages],
                    estimated_size_mb=self._estimate_index_size(table_name, [column])
                )
                recommendations.append(recommendation)
        
        return recommendations
    
    def _recommend_composite_indexes(self) -> List[IndexRecommendation]:
        """Recommend composite indexes"""
        recommendations = []
        
        for pattern in self.query_patterns.values():
            if len(pattern.where_columns) < 2:
                continue
            
            # Consider combinations of WHERE columns
            for table in pattern.tables_accessed:
                table_columns = [col for col in pattern.where_columns 
                               if not '.' in col or col.startswith(f"{table}.")]
                
                if len(table_columns) >= 2:
                    # Order columns by selectivity (simplified heuristic)
                    ordered_columns = self._order_columns_by_selectivity(table_columns)
                    
                    if len(ordered_columns) <= self.max_index_columns:
                        index_name = f"idx_{table}_composite_{'_'.join(ordered_columns[:3])}"
                        
                        recommendation = IndexRecommendation(
                            table_name=table,
                            index_name=index_name,
                            index_type=IndexType.COMPOSITE,
                            columns=ordered_columns,
                            priority=min(10, pattern.frequency // 5),
                            estimated_benefit=f"Optimizes multi-column WHERE conditions",
                            creation_sql=self._generate_index_sql(table, index_name, ordered_columns),
                            rationale=f"Multiple columns used together in WHERE clause",
                            query_patterns=[pattern.pattern_id],
                            estimated_size_mb=self._estimate_index_size(table, ordered_columns)
                        )
                        recommendations.append(recommendation)
        
        return recommendations
    
    def _recommend_covering_indexes(self) -> List[IndexRecommendation]:
        """Recommend covering indexes"""
        recommendations = []
        
        for pattern in self.query_patterns.values():
            if pattern.frequency < 50:  # Only for high-frequency queries
                continue
            
            for table in pattern.tables_accessed:
                # Get all columns accessed for this table
                accessed_columns = set()
                accessed_columns.update(pattern.where_columns)
                accessed_columns.update(pattern.order_by_columns)
                
                # Remove table prefixes
                clean_columns = [col.split('.')[-1] for col in accessed_columns 
                               if not '.' in col or col.startswith(f"{table}.")]
                
                if 2 <= len(clean_columns) <= self.max_index_columns:
                    # Order: WHERE columns first, then ORDER BY
                    where_cols = [col for col in clean_columns if col in pattern.where_columns]
                    order_cols = [col for col in clean_columns if col in pattern.order_by_columns and col not in where_cols]
                    
                    covering_columns = where_cols + order_cols
                    index_name = f"idx_{table}_covering_{'_'.join(covering_columns[:3])}"
                    
                    recommendation = IndexRecommendation(
                        table_name=table,
                        index_name=index_name,
                        index_type=IndexType.COVERING,
                        columns=covering_columns,
                        priority=min(10, pattern.frequency // 10),
                        estimated_benefit="Eliminates table lookups (index-only scan)",
                        creation_sql=self._generate_index_sql(table, index_name, covering_columns),
                        rationale="Covers all columns needed for query execution",
                        query_patterns=[pattern.pattern_id],
                        estimated_size_mb=self._estimate_index_size(table, covering_columns)
                    )
                    recommendations.append(recommendation)
        
        return recommendations
    
    def _recommend_partial_indexes(self) -> List[IndexRecommendation]:
        """Recommend partial indexes for filtered queries"""
        recommendations = []
        
        if self.database_type not in ['postgresql']:
            return recommendations  # Partial indexes mainly supported in PostgreSQL
        
        for pattern in self.query_patterns.values():
            # Look for queries with constant WHERE conditions
            query = pattern.query_template
            
            # Find constant conditions (simplified)
            constant_conditions = re.findall(r"(\w+)\s*=\s*'[^']*'", query)
            
            if constant_conditions:
                for table in pattern.tables_accessed:
                    for condition_col in constant_conditions:
                        # Create partial index for other columns when this condition is met
                        other_columns = [col for col in pattern.where_columns 
                                       if col != condition_col]
                        
                        if other_columns:
                            index_name = f"idx_{table}_{other_columns[0]}_partial"
                            
                            recommendation = IndexRecommendation(
                                table_name=table,
                                index_name=index_name,
                                index_type=IndexType.PARTIAL,
                                columns=other_columns[:2],  # Limit to 2 columns
                                priority=min(8, pattern.frequency // 20),
                                estimated_benefit="Smaller index size for filtered data",
                                creation_sql=self._generate_partial_index_sql(
                                    table, index_name, other_columns[:2], condition_col
                                ),
                                rationale=f"Frequent queries with constant {condition_col} condition",
                                query_patterns=[pattern.pattern_id],
                                estimated_size_mb=self._estimate_index_size(table, other_columns[:2]) * 0.3
                            )
                            recommendations.append(recommendation)
        
        return recommendations
    
    def _prioritize_recommendations(self, recommendations: List[IndexRecommendation]) -> List[IndexRecommendation]:
        """Prioritize and filter recommendations"""
        # Remove duplicates
        unique_recommendations = {}
        for rec in recommendations:
            key = (rec.table_name, tuple(rec.columns))
            if key not in unique_recommendations or rec.priority > unique_recommendations[key].priority:
                unique_recommendations[key] = rec
        
        # Sort by priority
        sorted_recommendations = sorted(
            unique_recommendations.values(),
            key=lambda x: x.priority,
            reverse=True
        )
        
        # Filter out low-priority recommendations
        filtered_recommendations = [rec for rec in sorted_recommendations if rec.priority >= 3]
        
        return filtered_recommendations[:20]  # Limit to top 20
    
    def _infer_table_name(self, column: str, usages: List[Dict[str, Any]]) -> Optional[str]:
        """Infer table name for a column"""
        # Simple heuristic: use the most common table from patterns
        table_counter = Counter()
        
        for usage in usages:
            for table in usage['pattern'].tables_accessed:
                table_counter[table] += 1
        
        if table_counter:
            return table_counter.most_common(1)[0][0]
        
        return None
    
    def _order_columns_by_selectivity(self, columns: List[str]) -> List[str]:
        """Order columns by estimated selectivity (most selective first)"""
        # Simplified heuristic: assume shorter column names are more selective
        # In practice, you'd use actual cardinality statistics
        return sorted(columns, key=len)
    
    def _generate_index_sql(self, table_name: str, index_name: str, columns: List[str]) -> str:
        """Generate CREATE INDEX SQL"""
        columns_str = ', '.join(columns)
        
        if self.database_type == 'mysql':
            return f"CREATE INDEX {index_name} ON {table_name} ({columns_str});"
        elif self.database_type == 'postgresql':
            return f"CREATE INDEX {index_name} ON {table_name} ({columns_str});"
        else:
            return f"CREATE INDEX {index_name} ON {table_name} ({columns_str});"
    
    def _generate_partial_index_sql(self, table_name: str, index_name: str, 
                                  columns: List[str], condition_column: str) -> str:
        """Generate partial index SQL (PostgreSQL)"""
        columns_str = ', '.join(columns)
        return f"CREATE INDEX {index_name} ON {table_name} ({columns_str}) WHERE {condition_column} IS NOT NULL;"
    
    def _estimate_index_size(self, table_name: str, columns: List[str]) -> float:
        """Estimate index size in MB"""
        # Simplified estimation
        base_size = 10.0  # Base size in MB
        column_factor = len(columns) * 2.0
        return base_size + column_factor
    
    def set_table_metadata(self, metadata: Dict[str, Dict[str, Any]]):
        """Set table metadata for better recommendations"""
        self.table_metadata = metadata
    
    def set_existing_indexes(self, indexes: Dict[str, List[Dict[str, Any]]]):
        """Set existing indexes to avoid duplicates"""
        self.existing_indexes = indexes
    
    def export_recommendations(self, filename: str):
        """Export recommendations to JSON file"""
        # This would be called after analyze_query_workload
        pass

def main():
    # Example usage
    advisor = IndexAdvisor(database_type='mysql')
    
    # Sample query workload
    sample_queries = [
        {
            'query': "SELECT * FROM users WHERE email = 'test@example.com'",
            'execution_time': 0.5,
            'frequency': 100
        },
        {
            'query': "SELECT name, email FROM users WHERE status = 'active' AND created_date > '2023-01-01'",
            'execution_time': 1.2,
            'frequency': 50
        },
        {
            'query': "SELECT u.name, COUNT(o.id) FROM users u JOIN orders o ON u.id = o.user_id WHERE u.status = 'active' GROUP BY u.id ORDER BY COUNT(o.id) DESC",
            'execution_time': 2.5,
            'frequency': 25
        }
    ] * 10  # Simulate frequency
    
    # Analyze workload
    recommendations = advisor.analyze_query_workload(sample_queries)
    
    # Display recommendations
    print(f"Generated {len(recommendations)} index recommendations:\n")
    
    for i, rec in enumerate(recommendations, 1):
        print(f"{i}. {rec.index_name} (Priority: {rec.priority})")
        print(f"   Table: {rec.table_name}")
        print(f"   Columns: {', '.join(rec.columns)}")
        print(f"   Type: {rec.index_type.value}")
        print(f"   Benefit: {rec.estimated_benefit}")
        print(f"   Size: {rec.estimated_size_mb:.1f} MB")
        print(f"   SQL: {rec.creation_sql}")
        print(f"   Rationale: {rec.rationale}")
        print()

if __name__ == "__main__":
    main()

Database Schema Design

1. Schema Optimization Analyzer

#!/usr/bin/env python3
# src/schema/schema_analyzer.py

import json
import logging
from typing import Dict, List, Any, Optional, Set
from dataclasses import dataclass
from enum import Enum

class SchemaIssueType(Enum):
    NORMALIZATION = "normalization"
    DENORMALIZATION = "denormalization"
    DATA_TYPE = "data_type"
    CONSTRAINT = "constraint"
    RELATIONSHIP = "relationship"

@dataclass
class SchemaIssue:
    issue_type: SchemaIssueType
    severity: str  # LOW, MEDIUM, HIGH, CRITICAL
    table_name: str
    column_name: Optional[str]
    description: str
    recommendation: str
    impact: str

class SchemaAnalyzer:
    def __init__(self):
        self.logger = self._setup_logging()
        
    def _setup_logging(self) -> logging.Logger:
        logger = logging.getLogger('SchemaAnalyzer')
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        return logger
    
    def analyze_schema(self, schema_metadata: Dict[str, Any]) -> List[SchemaIssue]:
        """Analyze database schema and identify optimization opportunities"""
        issues = []
        
        tables = schema_metadata.get('tables', {})
        
        for table_name, table_info in tables.items():
            # Check data types
            issues.extend(self._check_data_types(table_name, table_info))
            
            # Check normalization
            issues.extend(self._check_normalization(table_name, table_info))
            
            # Check constraints
            issues.extend(self._check_constraints(table_name, table_info))
            
            # Check relationships
            issues.extend(self._check_relationships(table_name, table_info, tables))
        
        return sorted(issues, key=lambda x: self._severity_weight(x.severity), reverse=True)
    
    def _check_data_types(self, table_name: str, table_info: Dict[str, Any]) -> List[SchemaIssue]:
        """Check for suboptimal data type choices"""
        issues = []
        columns = table_info.get('columns', {})
        
        for column_name, column_info in columns.items():
            data_type = column_info.get('type', '').upper()
            
            # Check for oversized VARCHAR
            if 'VARCHAR' in data_type:
                size_match = re.search(r'VARCHAR\((\d+)\)', data_type)
                if size_match:
                    size = int(size_match.group(1))
                    if size > 1000:
                        issues.append(SchemaIssue(
                            issue_type=SchemaIssueType.DATA_TYPE,
                            severity="MEDIUM",
                            table_name=table_name,
                            column_name=column_name,
                            description=f"VARCHAR({size}) may be oversized",
                            recommendation="Consider using TEXT for large text or smaller VARCHAR",
                            impact="Increased storage and memory usage"
                        ))
            
            # Check for CHAR vs VARCHAR
            if 'CHAR(' in data_type and not column_name.endswith('_code'):
                issues.append(SchemaIssue(
                    issue_type=SchemaIssueType.DATA_TYPE,
                    severity="LOW",
                    table_name=table_name,
                    column_name=column_name,
                    description="CHAR used for variable-length data",
                    recommendation="Consider VARCHAR for variable-length strings",
                    impact="Wasted storage space"
                ))
            
            # Check for inappropriate use of TEXT
            if data_type == 'TEXT' and column_name in ['status', 'type', 'category']:
                issues.append(SchemaIssue(
                    issue_type=SchemaIssueType.DATA_TYPE,
                    severity="MEDIUM",
                    table_name=table_name,
                    column_name=column_name,
                    description="TEXT used for short categorical data",
                    recommendation="Use ENUM or small VARCHAR instead",
                    impact="Inefficient storage and indexing"
                ))
        
        return issues
    
    def _check_normalization(self, table_name: str, table_info: Dict[str, Any]) -> List[SchemaIssue]:
        """Check for normalization issues"""
        issues = []
        columns = table_info.get('columns', {})
        
        # Check for repeated column patterns (potential denormalization)
        column_names = list(columns.keys())
        
        # Look for address fields that could be normalized
        address_fields = [col for col in column_names if any(addr in col.lower() 
                         for addr in ['address', 'street', 'city', 'state', 'zip', 'country'])]
        
        if len(address_fields) > 3:
            issues.append(SchemaIssue(
                issue_type=SchemaIssueType.NORMALIZATION,
                severity="MEDIUM",
                table_name=table_name,
                column_name=None,
                description="Multiple address fields in single table",
                recommendation="Consider creating separate address table",
                impact="Data redundancy and maintenance complexity"
            ))
        
        # Check for JSON/TEXT columns that might need normalization
        json_columns = [col for col, info in columns.items() 
                       if info.get('type', '').upper() in ['JSON', 'TEXT'] 
                       and col not in ['description', 'notes', 'content']]
        
        if json_columns:
            issues.append(SchemaIssue(
                issue_type=SchemaIssueType.NORMALIZATION,
                severity="LOW",
                table_name=table_name,
                column_name=', '.join(json_columns),
                description="JSON/TEXT columns may contain structured data",
                recommendation="Consider normalizing if data has consistent structure",
                impact="Limited query capabilities and indexing"
            ))
        
        return issues
    
    def _check_constraints(self, table_name: str, table_info: Dict[str, Any]) -> List[SchemaIssue]:
        """Check for missing or inappropriate constraints"""
        issues = []
        columns = table_info.get('columns', {})
        constraints = table_info.get('constraints', [])
        
        # Check for missing primary key
        has_primary_key = any(constraint.get('type') == 'PRIMARY KEY' for constraint in constraints)
        if not has_primary_key:
            issues.append(SchemaIssue(
                issue_type=SchemaIssueType.CONSTRAINT,
                severity="HIGH",
                table_name=table_name,
                column_name=None,
                description="Table lacks primary key",
                recommendation="Add primary key constraint",
                impact="Poor performance and replication issues"
            ))
        
        # Check for email columns without format constraints
        email_columns = [col for col in columns.keys() if 'email' in col.lower()]
        for email_col in email_columns:
            has_email_constraint = any(
                constraint.get('column') == email_col and 'email' in constraint.get('definition', '').lower()
                for constraint in constraints
            )
            if not has_email_constraint:
                issues.append(SchemaIssue(
                    issue_type=SchemaIssueType.CONSTRAINT,
                    severity="MEDIUM",
                    table_name=table_name,
                    column_name=email_col,
                    description="Email column lacks format validation",
                    recommendation="Add CHECK constraint for email format",
                    impact="Data quality issues"
                ))
        
        return issues
    
    def _check_relationships(self, table_name: str, table_info: Dict[str, Any], 
                           all_tables: Dict[str, Any]) -> List[SchemaIssue]:
        """Check for relationship issues"""
        issues = []
        columns = table_info.get('columns', {})
        foreign_keys = table_info.get('foreign_keys', [])
        
        # Check for potential foreign key columns without constraints
        potential_fk_columns = [col for col in columns.keys() 
                               if col.endswith('_id') and col != 'id']
        
        existing_fk_columns = [fk.get('column') for fk in foreign_keys]
        
        for fk_col in potential_fk_columns:
            if fk_col not in existing_fk_columns:
                # Check if referenced table exists
                referenced_table = fk_col.replace('_id', '') + 's'  # Simple heuristic
                if referenced_table in all_tables:
                    issues.append(SchemaIssue(
                        issue_type=SchemaIssueType.RELATIONSHIP,
                        severity="MEDIUM",
                        table_name=table_name,
                        column_name=fk_col,
                        description=f"Potential foreign key {fk_col} lacks constraint",
                        recommendation=f"Add foreign key constraint to {referenced_table}",
                        impact="Referential integrity not enforced"
                    ))
        
        return issues
    
    def _severity_weight(self, severity: str) -> int:
        """Convert severity to numeric weight for sorting"""
        weights = {'CRITICAL': 4, 'HIGH': 3, 'MEDIUM': 2, 'LOW': 1}
        return weights.get(severity, 0)

# Example usage
def main():
    analyzer = SchemaAnalyzer()
    
    # Sample schema metadata
    schema_metadata = {
        'tables': {
            'users': {
                'columns': {
                    'id': {'type': 'INT', 'nullable': False},
                    'email': {'type': 'VARCHAR(255)', 'nullable': False},
                    'name': {'type': 'VARCHAR(1000)', 'nullable': True},
                    'address': {'type': 'TEXT', 'nullable': True},
                    'city': {'type': 'VARCHAR(100)', 'nullable': True},
                    'state': {'type': 'VARCHAR(50)', 'nullable': True},
                    'zip': {'type': 'VARCHAR(20)', 'nullable': True},
                    'status': {'type': 'TEXT', 'nullable': True}
                },
                'constraints': [
                    {'type': 'PRIMARY KEY', 'column': 'id'}
                ],
                'foreign_keys': []
            },
            'orders': {
                'columns': {
                    'order_id': {'type': 'INT', 'nullable': False},
                    'user_id': {'type': 'INT', 'nullable': False},
                    'total': {'type': 'DECIMAL(10,2)', 'nullable': False}
                },
                'constraints': [],
                'foreign_keys': []
            }
        }
    }
    
    issues = analyzer.analyze_schema(schema_metadata)
    
    print(f"Found {len(issues)} schema issues:\n")
    for issue in issues:
        print(f"[{issue.severity}] {issue.table_name}.{issue.column_name or 'TABLE'}")
        print(f"  Issue: {issue.description}")
        print(f"  Recommendation: {issue.recommendation}")
        print(f"  Impact: {issue.impact}")
        print()

if __name__ == "__main__":
    main()

System-Level Optimization

1. Database Configuration Tuner

#!/bin/bash
# src/tuning/db_config_tuner.sh

# Database Configuration Tuner
# Analyzes system resources and generates optimized database configurations

set -euo pipefail

# Configuration
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
LOG_FILE="${SCRIPT_DIR}/tuning.log"
CONFIG_OUTPUT_DIR="${SCRIPT_DIR}/optimized_configs"

# Logging function
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "$LOG_FILE"
}

# System information gathering
get_system_info() {
    log "Gathering system information..."
    
    # Memory information
    TOTAL_MEMORY_KB=$(grep MemTotal /proc/meminfo | awk '{print $2}')
    TOTAL_MEMORY_GB=$((TOTAL_MEMORY_KB / 1024 / 1024))
    
    # CPU information
    CPU_CORES=$(nproc)
    CPU_MODEL=$(grep "model name" /proc/cpuinfo | head -1 | cut -d: -f2 | xargs)
    
    # Storage information
    STORAGE_INFO=$(df -h / | tail -1)
    
    # Network information
    NETWORK_INTERFACES=$(ip link show | grep -E "^[0-9]+" | awk -F: '{print $2}' | xargs)
    
    log "System Information:"
    log "  Memory: ${TOTAL_MEMORY_GB}GB"
    log "  CPU Cores: ${CPU_CORES}"
    log "  CPU Model: ${CPU_MODEL}"
    log "  Storage: ${STORAGE_INFO}"
    log "  Network Interfaces: ${NETWORK_INTERFACES}"
}

# MySQL configuration optimization
optimize_mysql_config() {
    local memory_gb=$1
    local cpu_cores=$2
    local config_file="${CONFIG_OUTPUT_DIR}/mysql_optimized.cnf"
    
    log "Generating optimized MySQL configuration..."
    
    # Calculate memory allocations (conservative approach)
    local innodb_buffer_pool=$((memory_gb * 70 / 100))  # 70% of total memory
    local query_cache_size=$((memory_gb * 5 / 100))     # 5% of total memory
    local tmp_table_size=$((memory_gb * 2 / 100))       # 2% of total memory
    
    # Ensure minimum values
    [[ $innodb_buffer_pool -lt 1 ]] && innodb_buffer_pool=1
    [[ $query_cache_size -lt 1 ]] && query_cache_size=1
    [[ $tmp_table_size -lt 1 ]] && tmp_table_size=1
    
    cat > "$config_file" << EOF
# Optimized MySQL Configuration
# Generated on $(date)
# System: ${memory_gb}GB RAM, ${cpu_cores} CPU cores

[mysqld]
# Basic Settings
user = mysql
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
port = 3306
basedir = /usr
datadir = /var/lib/mysql
tmpdir = /tmp

# Memory Settings
innodb_buffer_pool_size = ${innodb_buffer_pool}G
innodb_buffer_pool_instances = $((cpu_cores > 8 ? 8 : cpu_cores))
query_cache_size = ${query_cache_size}G
query_cache_type = 1
tmp_table_size = ${tmp_table_size}G
max_heap_table_size = ${tmp_table_size}G

# Connection Settings
max_connections = $((cpu_cores * 50))
max_connect_errors = 1000000
thread_cache_size = $((cpu_cores * 2))
table_open_cache = $((cpu_cores * 200))

# InnoDB Settings
innodb_file_per_table = 1
innodb_flush_log_at_trx_commit = 2
innodb_log_file_size = 256M
innodb_log_buffer_size = 64M
innodb_flush_method = O_DIRECT
innodb_io_capacity = 1000
innodb_io_capacity_max = 2000
innodb_read_io_threads = $((cpu_cores / 2 > 4 ? 4 : cpu_cores / 2))
innodb_write_io_threads = $((cpu_cores / 2 > 4 ? 4 : cpu_cores / 2))

# Query Cache
query_cache_limit = 2M
query_cache_min_res_unit = 2k

# Slow Query Log
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 1

# Binary Logging
log_bin = /var/log/mysql/mysql-bin.log
binlog_format = ROW
expire_logs_days = 7
max_binlog_size = 100M

# Performance Schema
performance_schema = ON
performance_schema_max_table_instances = 400
performance_schema_max_table_handles = 4000

# Security
local_infile = 0
skip_show_database

[mysql]
no-auto-rehash

[mysqldump]
quick
quote-names
max_allowed_packet = 16M
EOF

    log "MySQL configuration saved to: $config_file"
}

# PostgreSQL configuration optimization
optimize_postgresql_config() {
    local memory_gb=$1
    local cpu_cores=$2
    local config_file="${CONFIG_OUTPUT_DIR}/postgresql_optimized.conf"
    
    log "Generating optimized PostgreSQL configuration..."
    
    # Calculate memory allocations
    local shared_buffers=$((memory_gb * 25 / 100))      # 25% of total memory
    local effective_cache_size=$((memory_gb * 75 / 100)) # 75% of total memory
    local work_mem=$((memory_gb * 1024 / cpu_cores / 4)) # Conservative work_mem
    
    # Ensure minimum values
    [[ $shared_buffers -lt 1 ]] && shared_buffers=1
    [[ $effective_cache_size -lt 1 ]] && effective_cache_size=1
    [[ $work_mem -lt 4 ]] && work_mem=4
    
    cat > "$config_file" << EOF
# Optimized PostgreSQL Configuration
# Generated on $(date)
# System: ${memory_gb}GB RAM, ${cpu_cores} CPU cores

# Memory Settings
shared_buffers = ${shared_buffers}GB
effective_cache_size = ${effective_cache_size}GB
work_mem = ${work_mem}MB
maintenance_work_mem = $((work_mem * 10))MB

# Connection Settings
max_connections = $((cpu_cores * 25))
shared_preload_libraries = 'pg_stat_statements'

# WAL Settings
wal_buffers = 64MB
checkpoint_completion_target = 0.9
wal_writer_delay = 200ms
commit_delay = 0
commit_siblings = 5

# Query Planner
random_page_cost = 1.1
effective_io_concurrency = $((cpu_cores * 2))

# Logging
log_destination = 'stderr'
logging_collector = on
log_directory = 'pg_log'
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'
log_min_duration_statement = 1000
log_checkpoints = on
log_connections = on
log_disconnections = on
log_lock_waits = on

# Statistics
track_activities = on
track_counts = on
track_io_timing = on
track_functions = pl
stats_temp_directory = '/var/run/postgresql/stats_temp'

# Autovacuum
autovacuum = on
autovacuum_max_workers = $((cpu_cores / 4 > 1 ? cpu_cores / 4 : 1))
autovacuum_naptime = 1min
autovacuum_vacuum_threshold = 50
autovacuum_analyze_threshold = 50

# Background Writer
bgwriter_delay = 200ms
bgwriter_lru_maxpages = 100
bgwriter_lru_multiplier = 2.0

# Parallel Query
max_worker_processes = $cpu_cores
max_parallel_workers_per_gather = $((cpu_cores / 4 > 1 ? cpu_cores / 4 : 1))
max_parallel_workers = $cpu_cores
EOF

    log "PostgreSQL configuration saved to: $config_file"
}

# Operating system optimization
optimize_os_settings() {
    local config_file="${CONFIG_OUTPUT_DIR}/os_optimizations.sh"
    
    log "Generating OS optimization script..."
    
    cat > "$config_file" << 'EOF'
#!/bin/bash
# Operating System Optimizations for Database Performance

# Kernel parameters for database workloads
cat >> /etc/sysctl.conf << 'SYSCTL_EOF'

# Memory management
vm.swappiness = 1
vm.dirty_ratio = 15
vm.dirty_background_ratio = 5
vm.dirty_expire_centisecs = 500
vm.dirty_writeback_centisecs = 100

# Network settings
net.core.rmem_default = 262144
net.core.rmem_max = 16777216
net.core.wmem_default = 262144
net.core.wmem_max = 16777216
net.ipv4.tcp_rmem = 4096 65536 16777216
net.ipv4.tcp_wmem = 4096 65536 16777216
net.core.netdev_max_backlog = 5000

# File system
fs.file-max = 2097152

SYSCTL_EOF

# Apply kernel parameters
sysctl -p

# I/O scheduler optimization
echo "Setting I/O scheduler to deadline for database disks..."
for disk in /sys/block/sd*; do
    if [[ -f "$disk/queue/scheduler" ]]; then
        echo deadline > "$disk/queue/scheduler"
        echo "Set deadline scheduler for $(basename $disk)"
    fi
done

# Disable transparent huge pages (often problematic for databases)
echo "Disabling transparent huge pages..."
echo never > /sys/kernel/mm/transparent_hugepage/enabled
echo never > /sys/kernel/mm/transparent_hugepage/defrag

# Add to startup script
cat >> /etc/rc.local << 'RC_EOF'
# Disable transparent huge pages
echo never > /sys/kernel/mm/transparent_hugepage/enabled
echo never > /sys/kernel/mm/transparent_hugepage/defrag
RC_EOF

echo "OS optimizations applied. Reboot recommended."
EOF

    chmod +x "$config_file"
    log "OS optimization script saved to: $config_file"
}

# Storage optimization recommendations
analyze_storage() {
    log "Analyzing storage configuration..."
    
    local report_file="${CONFIG_OUTPUT_DIR}/storage_analysis.txt"
    
    {
        echo "Storage Analysis Report"
        echo "======================"
        echo "Generated on $(date)"
        echo
        
        echo "Disk Usage:"
        df -h
        echo
        
        echo "Mount Options:"
        mount | grep -E "(ext4|xfs|btrfs)"
        echo
        
        echo "I/O Statistics:"
        iostat -x 1 3 2>/dev/null || echo "iostat not available"
        echo
        
        echo "Recommendations:"
        echo "1. Use separate disks for data, logs, and temp files"
        echo "2. Consider RAID 10 for data files"
        echo "3. Use SSD for transaction logs"
        echo "4. Mount options: noatime,nodiratime for data partitions"
        echo "5. Consider XFS or ext4 with appropriate block size"
        echo
        
        echo "Suggested mount options for database partitions:"
        echo "/dev/sdb1 /var/lib/mysql ext4 defaults,noatime,nodiratime 0 2"
        echo "/dev/sdc1 /var/lib/mysql-logs ext4 defaults,noatime,nodiratime 0 2"
        
    } > "$report_file"
    
    log "Storage analysis saved to: $report_file"
}

# Main execution
main() {
    log "Starting database configuration tuning..."
    
    # Create output directory
    mkdir -p "$CONFIG_OUTPUT_DIR"
    
    # Gather system information
    get_system_info
    
    # Generate optimized configurations
    optimize_mysql_config "$TOTAL_MEMORY_GB" "$CPU_CORES"
    optimize_postgresql_config "$TOTAL_MEMORY_GB" "$CPU_CORES"
    optimize_os_settings
    analyze_storage
    
    log "Configuration tuning completed!"
    log "Generated files in: $CONFIG_OUTPUT_DIR"
    log "Review configurations before applying to production systems"
}

# Run main function
main "$@"

Caching Strategies

1. Multi-Level Cache Manager

#!/usr/bin/env python3
# src/caching/cache_manager.py

import redis
import memcache
import json
import hashlib
import time
import logging
from typing import Any, Optional, Dict, List, Union
from dataclasses import dataclass
from enum import Enum
import threading
from functools import wraps

class CacheLevel(Enum):
    L1_MEMORY = "l1_memory"
    L2_REDIS = "l2_redis"
    L3_MEMCACHED = "l3_memcached"

@dataclass
class CacheConfig:
    l1_max_size: int = 1000
    l1_ttl: int = 300  # 5 minutes
    l2_host: str = 'localhost'
    l2_port: int = 6379
    l2_ttl: int = 3600  # 1 hour
    l3_servers: List[str] = None
    l3_ttl: int = 7200  # 2 hours

class MultiLevelCacheManager:
    def __init__(self, config: CacheConfig):
        self.config = config
        self.logger = self._setup_logging()
        
        # L1 Cache: In-memory dictionary with LRU eviction
        self.l1_cache: Dict[str, Dict[str, Any]] = {}
        self.l1_access_order: List[str] = []
        self.l1_lock = threading.RLock()
        
        # L2 Cache: Redis
        try:
            self.redis_client = redis.Redis(
                host=config.l2_host,
                port=config.l2_port,
                decode_responses=True,
                socket_connect_timeout=5,
                socket_timeout=5
            )
            self.redis_client.ping()
            self.l2_available = True
        except Exception as e:
            self.logger.warning(f"Redis not available: {e}")
            self.l2_available = False
        
        # L3 Cache: Memcached
        try:
            if config.l3_servers:
                self.memcached_client = memcache.Client(config.l3_servers)
                # Test connection
                self.memcached_client.set('test', 'test', time=1)
                self.l3_available = True
            else:
                self.l3_available = False
        except Exception as e:
            self.logger.warning(f"Memcached not available: {e}")
            self.l3_available = False
        
        # Statistics
        self.stats = {
            'l1_hits': 0, 'l1_misses': 0,
            'l2_hits': 0, 'l2_misses': 0,
            'l3_hits': 0, 'l3_misses': 0,
            'total_requests': 0
        }
        self.stats_lock = threading.Lock()
    
    def _setup_logging(self) -> logging.Logger:
        logger = logging.getLogger('CacheManager')
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        return logger
    
    def _generate_key(self, key: str) -> str:
        """Generate consistent cache key"""
        return hashlib.md5(key.encode()).hexdigest()
    
    def _update_stats(self, level: str, hit: bool):
        """Update cache statistics"""
        with self.stats_lock:
            self.stats['total_requests'] += 1
            if hit:
                self.stats[f'{level}_hits'] += 1
            else:
                self.stats[f'{level}_misses'] += 1
    
    def _l1_get(self, key: str) -> Optional[Any]:
        """Get from L1 cache"""
        with self.l1_lock:
            if key in self.l1_cache:
                entry = self.l1_cache[key]
                if time.time() < entry['expires']:
                    # Move to end (most recently used)
                    self.l1_access_order.remove(key)
                    self.l1_access_order.append(key)
                    self._update_stats('l1', True)
                    return entry['value']
                else:
                    # Expired
                    del self.l1_cache[key]
                    self.l1_access_order.remove(key)
            
            self._update_stats('l1', False)
            return None
    
    def _l1_set(self, key: str, value: Any, ttl: int):
        """Set in L1 cache"""
        with self.l1_lock:
            # Evict if at capacity
            while len(self.l1_cache) >= self.config.l1_max_size:
                oldest_key = self.l1_access_order.pop(0)
                del self.l1_cache[oldest_key]
            
            self.l1_cache[key] = {
                'value': value,
                'expires': time.time() + ttl
            }
            
            if key in self.l1_access_order:
                self.l1_access_order.remove(key)
            self.l1_access_order.append(key)
    
    def _l2_get(self, key: str) -> Optional[Any]:
        """Get from L2 cache (Redis)"""
        if not self.l2_available:
            self._update_stats('l2', False)
            return None
        
        try:
            value = self.redis_client.get(key)
            if value is not None:
                self._update_stats('l2', True)
                return json.loads(value)
            else:
                self._update_stats('l2', False)
                return None
        except Exception as e:
            self.logger.error(f"Redis get error: {e}")
            self._update_stats('l2', False)
            return None
    
    def _l2_set(self, key: str, value: Any, ttl: int):
        """Set in L2 cache (Redis)"""
        if not self.l2_available:
            return
        
        try:
            self.redis_client.setex(key, ttl, json.dumps(value, default=str))
        except Exception as e:
            self.logger.error(f"Redis set error: {e}")
    
    def _l3_get(self, key: str) -> Optional[Any]:
        """Get from L3 cache (Memcached)"""
        if not self.l3_available:
            self._update_stats('l3', False)
            return None
        
        try:
            value = self.memcached_client.get(key)
            if value is not None:
                self._update_stats('l3', True)
                return json.loads(value)
            else:
                self._update_stats('l3', False)
                return None
        except Exception as e:
            self.logger.error(f"Memcached get error: {e}")
            self._update_stats('l3', False)
            return None
    
    def _l3_set(self, key: str, value: Any, ttl: int):
        """Set in L3 cache (Memcached)"""
        if not self.l3_available:
            return
        
        try:
            self.memcached_client.set(key, json.dumps(value, default=str), time=ttl)
        except Exception as e:
            self.logger.error(f"Memcached set error: {e}")
    
    def get(self, key: str) -> Optional[Any]:
        """Get value from cache (tries all levels)"""
        cache_key = self._generate_key(key)
        
        # Try L1 first
        value = self._l1_get(cache_key)
        if value is not None:
            return value
        
        # Try L2
        value = self._l2_get(cache_key)
        if value is not None:
            # Populate L1
            self._l1_set(cache_key, value, self.config.l1_ttl)
            return value
        
        # Try L3
        value = self._l3_get(cache_key)
        if value is not None:
            # Populate L1 and L2
            self._l1_set(cache_key, value, self.config.l1_ttl)
            self._l2_set(cache_key, value, self.config.l2_ttl)
            return value
        
        return None
    
    def set(self, key: str, value: Any, ttl: Optional[int] = None):
        """Set value in all cache levels"""
        cache_key = self._generate_key(key)
        
        # Use default TTLs if not specified
        l1_ttl = ttl or self.config.l1_ttl
        l2_ttl = ttl or self.config.l2_ttl
        l3_ttl = ttl or self.config.l3_ttl
        
        # Set in all levels
        self._l1_set(cache_key, value, l1_ttl)
        self._l2_set(cache_key, value, l2_ttl)
        self._l3_set(cache_key, value, l3_ttl)
    
    def delete(self, key: str):
        """Delete from all cache levels"""
        cache_key = self._generate_key(key)
        
        # Delete from L1
        with self.l1_lock:
            if cache_key in self.l1_cache:
                del self.l1_cache[cache_key]
                self.l1_access_order.remove(cache_key)
        
        # Delete from L2
        if self.l2_available:
            try:
                self.redis_client.delete(cache_key)
            except Exception as e:
                self.logger.error(f"Redis delete error: {e}")
        
        # Delete from L3
        if self.l3_available:
            try:
                self.memcached_client.delete(cache_key)
            except Exception as e:
                self.logger.error(f"Memcached delete error: {e}")
    
    def clear_all(self):
        """Clear all cache levels"""
        # Clear L1
        with self.l1_lock:
            self.l1_cache.clear()
            self.l1_access_order.clear()
        
        # Clear L2
        if self.l2_available:
            try:
                self.redis_client.flushdb()
            except Exception as e:
                self.logger.error(f"Redis clear error: {e}")
        
        # Clear L3
        if self.l3_available:
            try:
                self.memcached_client.flush_all()
            except Exception as e:
                self.logger.error(f"Memcached clear error: {e}")
    
    def get_stats(self) -> Dict[str, Any]:
        """Get cache statistics"""
        with self.stats_lock:
            total_hits = self.stats['l1_hits'] + self.stats['l2_hits'] + self.stats['l3_hits']
            total_misses = self.stats['l1_misses'] + self.stats['l2_misses'] + self.stats['l3_misses']
            
            hit_rate = (total_hits / (total_hits + total_misses)) * 100 if (total_hits + total_misses) > 0 else 0
            
            return {
                **self.stats,
                'total_hits': total_hits,
                'total_misses': total_misses,
                'hit_rate_percent': round(hit_rate, 2),
                'l1_size': len(self.l1_cache),
                'l2_available': self.l2_available,
                'l3_available': self.l3_available
            }

# Cache decorator
def cached(cache_manager: MultiLevelCacheManager, ttl: Optional[int] = None, key_prefix: str = ""):
    """Decorator for caching function results"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Generate cache key from function name and arguments
            key_parts = [key_prefix, func.__name__]
            key_parts.extend(str(arg) for arg in args)
            key_parts.extend(f"{k}={v}" for k, v in sorted(kwargs.items()))
            cache_key = ":".join(key_parts)
            
            # Try to get from cache
            result = cache_manager.get(cache_key)
            if result is not None:
                return result
            
            # Execute function and cache result
            result = func(*args, **kwargs)
            cache_manager.set(cache_key, result, ttl)
            
            return result
        return wrapper
    return decorator

# Example usage
def main():
    # Configure cache
    config = CacheConfig(
        l1_max_size=500,
        l1_ttl=300,
        l2_host='localhost',
        l2_port=6379,
        l2_ttl=3600,
        l3_servers=['localhost:11211'],
        l3_ttl=7200
    )
    
    # Create cache manager
    cache = MultiLevelCacheManager(config)
    
    # Example cached function
    @cached(cache, ttl=600, key_prefix="user_data")
    def get_user_data(user_id: int) -> Dict[str, Any]:
        # Simulate database query
        time.sleep(0.1)  # Simulate query time
        return {
            'id': user_id,
            'name': f'User {user_id}',
            'email': f'user{user_id}@example.com',
            'created_at': time.time()
        }
    
    # Test caching
    print("Testing multi-level cache...")
    
    # First call - cache miss
    start_time = time.time()
    user_data = get_user_data(123)
    first_call_time = time.time() - start_time
    print(f"First call (cache miss): {first_call_time:.3f}s")
    print(f"User data: {user_data}")
    
    # Second call - cache hit
    start_time = time.time()
    user_data = get_user_data(123)
    second_call_time = time.time() - start_time
    print(f"Second call (cache hit): {second_call_time:.3f}s")
    
    # Show cache statistics
    stats = cache.get_stats()
    print(f"\nCache Statistics:")
    for key, value in stats.items():
        print(f"  {key}: {value}")

if __name__ == "__main__":
    main()

Summary

This comprehensive database performance optimization guide provides practical strategies and tools for improving database performance across multiple dimensions:

Key Optimization Areas

  1. Performance Monitoring: Real-time metrics collection and analysis
  2. Query Optimization: Automated query analysis and improvement suggestions
  3. Indexing Strategies: Intelligent index recommendations based on workload analysis
  4. Schema Design: Best practices for efficient database schema design
  5. System Configuration: Optimized database and OS configurations
  6. Caching: Multi-level caching strategies for improved response times

Best Practices

  • Proactive Monitoring: Implement comprehensive monitoring before performance issues arise
  • Data-Driven Decisions: Use actual workload data to guide optimization efforts
  • Incremental Improvements: Apply optimizations gradually and measure their impact
  • Regular Maintenance: Schedule regular performance reviews and optimizations
  • Documentation: Maintain detailed records of all optimization changes

Implementation Strategy

  1. Start with monitoring and baseline establishment
  2. Identify the most impactful optimization opportunities
  3. Implement changes in a controlled manner
  4. Measure and validate improvements
  5. Iterate and refine based on results

The tools and techniques presented in this guide can be adapted to various database systems and scaled according to your specific requirements. Remember that performance optimization is an ongoing process that requires continuous attention and refinement.

分享文章