跳转到主要内容

Database Security and Access Control: A Comprehensive Guide to Protecting Your Data

博主
43 分钟
8974 字
--

AI 导读

深刻理解和准确把握"Database Security and Access Control: A Comprehensive Guide to Protecting Your Data"这一重要概念的核心要义,本文从理论基础、实践应用和发展前景等多个维度进行了系统性阐述,为读者提供了全面而深入的分析视角。

内容由AI智能生成

Database Security and Access Control: A Comprehensive Guide to Protecting Your Data

Database security is a critical aspect of modern data management, encompassing multiple layers of protection to safeguard sensitive information from unauthorized access, data breaches, and malicious attacks. This comprehensive guide explores essential security strategies, implementation techniques, and best practices for securing database systems.

Table of Contents

  1. Security Architecture Overview
  2. Authentication and Authorization
  3. Data Encryption
  4. Access Control Implementation
  5. Auditing and Monitoring
  6. Compliance and Governance
  7. Security Testing and Validation

Security Architecture Overview

1. Multi-Layer Security Model

graph TB
    A[Application Layer] --> B[Authentication Layer]
    B --> C[Authorization Layer]
    C --> D[Network Security Layer]
    D --> E[Database Security Layer]
    E --> F[Storage Encryption Layer]
    F --> G[Physical Security Layer]
    
    subgraph "Security Controls"
        H[Access Control]
        I[Encryption]
        J[Auditing]
        K[Monitoring]
    end
    
    B -.-> H
    C -.-> H
    E -.-> I
    F -.-> I
    D -.-> J
    E -.-> J
    A -.-> K
    E -.-> K

2. Security Configuration Framework

# Database Security Configuration
database_security:
  global_settings:
    security_level: "high"
    compliance_standards: ["SOX", "GDPR", "HIPAA", "PCI-DSS"]
    audit_enabled: true
    encryption_required: true
    
  authentication:
    methods:
      - type: "multi_factor"
        providers: ["ldap", "oauth2", "saml"]
      - type: "certificate"
        ca_validation: true
      - type: "kerberos"
        realm: "COMPANY.COM"
    
    password_policy:
      min_length: 12
      complexity_required: true
      expiration_days: 90
      history_count: 12
      lockout_attempts: 3
      lockout_duration: 30
    
  authorization:
    rbac_enabled: true
    principle_of_least_privilege: true
    role_hierarchy: true
    dynamic_permissions: true
    
  encryption:
    at_rest:
      algorithm: "AES-256"
      key_management: "external_hsm"
      transparent_encryption: true
    
    in_transit:
      tls_version: "1.3"
      certificate_validation: true
      cipher_suites: ["ECDHE-RSA-AES256-GCM-SHA384"]
    
    column_level:
      sensitive_data_types: ["ssn", "credit_card", "email", "phone"]
      format_preserving: true
      
  network_security:
    firewall_rules:
      - source: "application_tier"
        destination: "database_tier"
        ports: [3306, 5432, 1521]
        protocol: "tcp"
      
    vpc_isolation: true
    private_subnets: true
    bastion_host_required: true
    
  monitoring:
    real_time_alerts: true
    anomaly_detection: true
    failed_login_tracking: true
    privilege_escalation_detection: true
    
    retention:
      audit_logs: "7_years"
      security_events: "2_years"
      access_logs: "1_year"

Authentication and Authorization

1. Multi-Factor Authentication System

#!/usr/bin/env python3
# src/security/mfa_auth.py

import hashlib
import hmac
import time
import base64
import qrcode
import logging
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
from enum import Enum
import secrets
import pyotp
import ldap3
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC

class AuthMethod(Enum):
    PASSWORD = "password"
    TOTP = "totp"
    SMS = "sms"
    EMAIL = "email"
    CERTIFICATE = "certificate"
    BIOMETRIC = "biometric"

@dataclass
class AuthResult:
    success: bool
    user_id: Optional[str]
    session_token: Optional[str]
    required_methods: List[AuthMethod]
    error_message: Optional[str]
    risk_score: float

@dataclass
class UserCredentials:
    user_id: str
    password_hash: str
    salt: str
    totp_secret: Optional[str]
    certificate_thumbprint: Optional[str]
    failed_attempts: int
    locked_until: Optional[float]
    last_login: Optional[float]

class DatabaseAuthenticator:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.logger = self._setup_logging()
        self.encryption_key = self._derive_encryption_key()
        self.fernet = Fernet(self.encryption_key)
        
        # Initialize LDAP connection if configured
        self.ldap_connection = None
        if config.get('ldap', {}).get('enabled'):
            self._init_ldap()
    
    def _setup_logging(self) -> logging.Logger:
        logger = logging.getLogger('DatabaseAuth')
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        return logger
    
    def _derive_encryption_key(self) -> bytes:
        """Derive encryption key from master password"""
        master_password = self.config.get('master_password', 'default_key').encode()
        salt = self.config.get('encryption_salt', 'default_salt').encode()
        
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=100000,
        )
        return base64.urlsafe_b64encode(kdf.derive(master_password))
    
    def _init_ldap(self):
        """Initialize LDAP connection"""
        ldap_config = self.config['ldap']
        try:
            server = ldap3.Server(
                ldap_config['server'],
                port=ldap_config.get('port', 389),
                use_ssl=ldap_config.get('use_ssl', False)
            )
            self.ldap_connection = ldap3.Connection(
                server,
                user=ldap_config['bind_dn'],
                password=ldap_config['bind_password'],
                auto_bind=True
            )
            self.logger.info("LDAP connection established")
        except Exception as e:
            self.logger.error(f"Failed to connect to LDAP: {e}")
    
    def hash_password(self, password: str, salt: Optional[str] = None) -> Tuple[str, str]:
        """Hash password with salt"""
        if salt is None:
            salt = secrets.token_hex(32)
        
        # Use PBKDF2 with SHA-256
        password_hash = hashlib.pbkdf2_hmac(
            'sha256',
            password.encode('utf-8'),
            salt.encode('utf-8'),
            100000  # iterations
        )
        
        return base64.b64encode(password_hash).decode('utf-8'), salt
    
    def verify_password(self, password: str, stored_hash: str, salt: str) -> bool:
        """Verify password against stored hash"""
        computed_hash, _ = self.hash_password(password, salt)
        return hmac.compare_digest(computed_hash, stored_hash)
    
    def generate_totp_secret(self, user_id: str) -> Tuple[str, str]:
        """Generate TOTP secret and QR code"""
        secret = pyotp.random_base32()
        
        # Create TOTP URI
        totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
            name=user_id,
            issuer_name=self.config.get('app_name', 'Database System')
        )
        
        # Generate QR code
        qr = qrcode.QRCode(version=1, box_size=10, border=5)
        qr.add_data(totp_uri)
        qr.make(fit=True)
        
        return secret, totp_uri
    
    def verify_totp(self, secret: str, token: str) -> bool:
        """Verify TOTP token"""
        totp = pyotp.TOTP(secret)
        return totp.verify(token, valid_window=1)  # Allow 30-second window
    
    def authenticate_ldap(self, username: str, password: str) -> bool:
        """Authenticate against LDAP"""
        if not self.ldap_connection:
            return False
        
        try:
            user_dn = f"uid={username},{self.config['ldap']['user_base']}"
            return self.ldap_connection.authenticate(user_dn, password)
        except Exception as e:
            self.logger.error(f"LDAP authentication failed: {e}")
            return False
    
    def calculate_risk_score(self, user_id: str, request_info: Dict[str, Any]) -> float:
        """Calculate authentication risk score"""
        risk_score = 0.0
        
        # Check for unusual login time
        current_hour = time.localtime().tm_hour
        if current_hour < 6 or current_hour > 22:
            risk_score += 0.2
        
        # Check for new IP address
        if request_info.get('ip_address') not in self._get_known_ips(user_id):
            risk_score += 0.3
        
        # Check for new device/user agent
        if request_info.get('user_agent') not in self._get_known_devices(user_id):
            risk_score += 0.2
        
        # Check for geographic anomaly
        if self._is_geographic_anomaly(user_id, request_info.get('ip_address')):
            risk_score += 0.4
        
        # Check for recent failed attempts
        failed_attempts = self._get_recent_failed_attempts(user_id)
        if failed_attempts > 0:
            risk_score += min(failed_attempts * 0.1, 0.3)
        
        return min(risk_score, 1.0)
    
    def _get_known_ips(self, user_id: str) -> List[str]:
        """Get list of known IP addresses for user"""
        # This would typically query a database
        return []
    
    def _get_known_devices(self, user_id: str) -> List[str]:
        """Get list of known devices for user"""
        # This would typically query a database
        return []
    
    def _is_geographic_anomaly(self, user_id: str, ip_address: str) -> bool:
        """Check if login location is anomalous"""
        # This would typically use IP geolocation services
        return False
    
    def _get_recent_failed_attempts(self, user_id: str) -> int:
        """Get count of recent failed login attempts"""
        # This would typically query audit logs
        return 0
    
    def authenticate(self, user_id: str, credentials: Dict[str, Any], 
                    request_info: Dict[str, Any]) -> AuthResult:
        """Main authentication method"""
        try:
            # Load user credentials
            user_creds = self._load_user_credentials(user_id)
            if not user_creds:
                return AuthResult(
                    success=False,
                    user_id=None,
                    session_token=None,
                    required_methods=[],
                    error_message="User not found",
                    risk_score=1.0
                )
            
            # Check if account is locked
            if user_creds.locked_until and time.time() < user_creds.locked_until:
                return AuthResult(
                    success=False,
                    user_id=None,
                    session_token=None,
                    required_methods=[],
                    error_message="Account locked",
                    risk_score=1.0
                )
            
            # Calculate risk score
            risk_score = self.calculate_risk_score(user_id, request_info)
            
            # Determine required authentication methods based on risk
            required_methods = self._determine_required_methods(risk_score)
            
            # Verify provided credentials
            auth_success = True
            
            # Password authentication
            if AuthMethod.PASSWORD in required_methods:
                password = credentials.get('password')
                if not password or not self.verify_password(
                    password, user_creds.password_hash, user_creds.salt
                ):
                    auth_success = False
            
            # TOTP authentication
            if AuthMethod.TOTP in required_methods:
                totp_token = credentials.get('totp_token')
                if not totp_token or not self.verify_totp(
                    user_creds.totp_secret, totp_token
                ):
                    auth_success = False
            
            # LDAP authentication
            if self.config.get('ldap', {}).get('enabled'):
                password = credentials.get('password')
                if password and not self.authenticate_ldap(user_id, password):
                    auth_success = False
            
            if auth_success:
                # Generate session token
                session_token = self._generate_session_token(user_id)
                
                # Update last login time
                self._update_last_login(user_id)
                
                # Reset failed attempts
                self._reset_failed_attempts(user_id)
                
                self.logger.info(f"Successful authentication for user: {user_id}")
                
                return AuthResult(
                    success=True,
                    user_id=user_id,
                    session_token=session_token,
                    required_methods=required_methods,
                    error_message=None,
                    risk_score=risk_score
                )
            else:
                # Increment failed attempts
                self._increment_failed_attempts(user_id)
                
                self.logger.warning(f"Failed authentication for user: {user_id}")
                
                return AuthResult(
                    success=False,
                    user_id=None,
                    session_token=None,
                    required_methods=required_methods,
                    error_message="Authentication failed",
                    risk_score=risk_score
                )
        
        except Exception as e:
            self.logger.error(f"Authentication error: {e}")
            return AuthResult(
                success=False,
                user_id=None,
                session_token=None,
                required_methods=[],
                error_message="Internal error",
                risk_score=1.0
            )
    
    def _determine_required_methods(self, risk_score: float) -> List[AuthMethod]:
        """Determine required authentication methods based on risk score"""
        methods = [AuthMethod.PASSWORD]
        
        if risk_score > 0.3:
            methods.append(AuthMethod.TOTP)
        
        if risk_score > 0.7:
            methods.append(AuthMethod.EMAIL)
        
        return methods
    
    def _load_user_credentials(self, user_id: str) -> Optional[UserCredentials]:
        """Load user credentials from database"""
        # This would typically query the database
        # For demo purposes, return a sample user
        if user_id == "demo_user":
            password_hash, salt = self.hash_password("demo_password")
            return UserCredentials(
                user_id=user_id,
                password_hash=password_hash,
                salt=salt,
                totp_secret="JBSWY3DPEHPK3PXP",
                certificate_thumbprint=None,
                failed_attempts=0,
                locked_until=None,
                last_login=None
            )
        return None
    
    def _generate_session_token(self, user_id: str) -> str:
        """Generate secure session token"""
        token_data = {
            'user_id': user_id,
            'timestamp': time.time(),
            'random': secrets.token_hex(16)
        }
        
        token_json = str(token_data).encode()
        encrypted_token = self.fernet.encrypt(token_json)
        return base64.urlsafe_b64encode(encrypted_token).decode()
    
    def _update_last_login(self, user_id: str):
        """Update last login timestamp"""
        # This would typically update the database
        pass
    
    def _reset_failed_attempts(self, user_id: str):
        """Reset failed login attempts counter"""
        # This would typically update the database
        pass
    
    def _increment_failed_attempts(self, user_id: str):
        """Increment failed login attempts counter"""
        # This would typically update the database
        pass

# Example usage
def main():
    config = {
        'master_password': 'secure_master_key_2024',
        'encryption_salt': 'database_security_salt',
        'app_name': 'Secure Database System',
        'ldap': {
            'enabled': False,
            'server': 'ldap.company.com',
            'port': 389,
            'bind_dn': 'cn=admin,dc=company,dc=com',
            'bind_password': 'admin_password',
            'user_base': 'ou=users,dc=company,dc=com'
        }
    }
    
    authenticator = DatabaseAuthenticator(config)
    
    # Test authentication
    credentials = {
        'password': 'demo_password',
        'totp_token': '123456'
    }
    
    request_info = {
        'ip_address': '192.168.1.100',
        'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
        'timestamp': time.time()
    }
    
    result = authenticator.authenticate('demo_user', credentials, request_info)
    
    print(f"Authentication Result:")
    print(f"  Success: {result.success}")
    print(f"  User ID: {result.user_id}")
    print(f"  Session Token: {result.session_token}")
    print(f"  Required Methods: {[method.value for method in result.required_methods]}")
    print(f"  Risk Score: {result.risk_score}")
    print(f"  Error: {result.error_message}")

if __name__ == "__main__":
    main()

2. Role-Based Access Control (RBAC) System

#!/usr/bin/env python3
# src/security/rbac_system.py

import json
import logging
from typing import Dict, List, Set, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
import time
from datetime import datetime, timedelta

class Permission(Enum):
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    EXECUTE = "execute"
    ADMIN = "admin"
    GRANT = "grant"

class ResourceType(Enum):
    TABLE = "table"
    VIEW = "view"
    PROCEDURE = "procedure"
    FUNCTION = "function"
    SCHEMA = "schema"
    DATABASE = "database"

@dataclass
class Resource:
    resource_type: ResourceType
    name: str
    schema: Optional[str] = None
    database: Optional[str] = None
    
    def __str__(self) -> str:
        parts = []
        if self.database:
            parts.append(self.database)
        if self.schema:
            parts.append(self.schema)
        parts.append(self.name)
        return ".".join(parts)

@dataclass
class Role:
    name: str
    description: str
    permissions: Dict[str, Set[Permission]] = field(default_factory=dict)
    parent_roles: Set[str] = field(default_factory=set)
    created_at: datetime = field(default_factory=datetime.now)
    is_active: bool = True

@dataclass
class User:
    user_id: str
    username: str
    email: str
    roles: Set[str] = field(default_factory=set)
    direct_permissions: Dict[str, Set[Permission]] = field(default_factory=dict)
    created_at: datetime = field(default_factory=datetime.now)
    last_login: Optional[datetime] = None
    is_active: bool = True
    session_timeout: int = 3600  # seconds

@dataclass
class AccessRequest:
    user_id: str
    resource: Resource
    permission: Permission
    context: Dict[str, Any] = field(default_factory=dict)
    timestamp: datetime = field(default_factory=datetime.now)

@dataclass
class AccessResult:
    granted: bool
    reason: str
    effective_permissions: Set[Permission]
    applied_policies: List[str]

class RBACManager:
    def __init__(self):
        self.logger = self._setup_logging()
        self.users: Dict[str, User] = {}
        self.roles: Dict[str, Role] = {}
        self.access_policies: List[Dict[str, Any]] = []
        self.audit_log: List[Dict[str, Any]] = []
        
        # Initialize default roles
        self._create_default_roles()
    
    def _setup_logging(self) -> logging.Logger:
        logger = logging.getLogger('RBACManager')
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        return logger
    
    def _create_default_roles(self):
        """Create default system roles"""
        # Database Administrator
        dba_role = Role(
            name="dba",
            description="Database Administrator with full access"
        )
        dba_role.permissions["*"] = {Permission.READ, Permission.WRITE, 
                                   Permission.DELETE, Permission.EXECUTE, 
                                   Permission.ADMIN, Permission.GRANT}
        self.roles["dba"] = dba_role
        
        # Data Analyst
        analyst_role = Role(
            name="data_analyst",
            description="Read-only access to data tables"
        )
        analyst_role.permissions["*.data.*"] = {Permission.READ}
        self.roles["data_analyst"] = analyst_role
        
        # Application User
        app_user_role = Role(
            name="app_user",
            description="Standard application user"
        )
        app_user_role.permissions["app_db.public.*"] = {Permission.READ, Permission.WRITE}
        self.roles["app_user"] = app_user_role
        
        # Read-only User
        readonly_role = Role(
            name="readonly",
            description="Read-only access to specific schemas"
        )
        readonly_role.permissions["*.public.*"] = {Permission.READ}
        self.roles["readonly"] = readonly_role
    
    def create_user(self, user_id: str, username: str, email: str, 
                   roles: Optional[List[str]] = None) -> bool:
        """Create a new user"""
        try:
            if user_id in self.users:
                self.logger.warning(f"User {user_id} already exists")
                return False
            
            user = User(
                user_id=user_id,
                username=username,
                email=email,
                roles=set(roles or [])
            )
            
            # Validate roles exist
            for role_name in user.roles:
                if role_name not in self.roles:
                    self.logger.error(f"Role {role_name} does not exist")
                    return False
            
            self.users[user_id] = user
            self._audit_log("USER_CREATED", {"user_id": user_id, "username": username})
            self.logger.info(f"Created user: {username} ({user_id})")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to create user {user_id}: {e}")
            return False
    
    def create_role(self, name: str, description: str, 
                   parent_roles: Optional[List[str]] = None) -> bool:
        """Create a new role"""
        try:
            if name in self.roles:
                self.logger.warning(f"Role {name} already exists")
                return False
            
            role = Role(
                name=name,
                description=description,
                parent_roles=set(parent_roles or [])
            )
            
            # Validate parent roles exist
            for parent_role in role.parent_roles:
                if parent_role not in self.roles:
                    self.logger.error(f"Parent role {parent_role} does not exist")
                    return False
            
            self.roles[name] = role
            self._audit_log("ROLE_CREATED", {"role_name": name, "description": description})
            self.logger.info(f"Created role: {name}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to create role {name}: {e}")
            return False
    
    def grant_permission(self, role_name: str, resource_pattern: str, 
                        permissions: List[Permission]) -> bool:
        """Grant permissions to a role"""
        try:
            if role_name not in self.roles:
                self.logger.error(f"Role {role_name} does not exist")
                return False
            
            role = self.roles[role_name]
            if resource_pattern not in role.permissions:
                role.permissions[resource_pattern] = set()
            
            role.permissions[resource_pattern].update(permissions)
            
            self._audit_log("PERMISSION_GRANTED", {
                "role_name": role_name,
                "resource_pattern": resource_pattern,
                "permissions": [p.value for p in permissions]
            })
            
            self.logger.info(f"Granted permissions to role {role_name}: {resource_pattern}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to grant permission: {e}")
            return False
    
    def revoke_permission(self, role_name: str, resource_pattern: str, 
                         permissions: List[Permission]) -> bool:
        """Revoke permissions from a role"""
        try:
            if role_name not in self.roles:
                self.logger.error(f"Role {role_name} does not exist")
                return False
            
            role = self.roles[role_name]
            if resource_pattern in role.permissions:
                role.permissions[resource_pattern] -= set(permissions)
                
                # Remove empty permission sets
                if not role.permissions[resource_pattern]:
                    del role.permissions[resource_pattern]
            
            self._audit_log("PERMISSION_REVOKED", {
                "role_name": role_name,
                "resource_pattern": resource_pattern,
                "permissions": [p.value for p in permissions]
            })
            
            self.logger.info(f"Revoked permissions from role {role_name}: {resource_pattern}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to revoke permission: {e}")
            return False
    
    def assign_role(self, user_id: str, role_name: str) -> bool:
        """Assign a role to a user"""
        try:
            if user_id not in self.users:
                self.logger.error(f"User {user_id} does not exist")
                return False
            
            if role_name not in self.roles:
                self.logger.error(f"Role {role_name} does not exist")
                return False
            
            self.users[user_id].roles.add(role_name)
            
            self._audit_log("ROLE_ASSIGNED", {
                "user_id": user_id,
                "role_name": role_name
            })
            
            self.logger.info(f"Assigned role {role_name} to user {user_id}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to assign role: {e}")
            return False
    
    def remove_role(self, user_id: str, role_name: str) -> bool:
        """Remove a role from a user"""
        try:
            if user_id not in self.users:
                self.logger.error(f"User {user_id} does not exist")
                return False
            
            self.users[user_id].roles.discard(role_name)
            
            self._audit_log("ROLE_REMOVED", {
                "user_id": user_id,
                "role_name": role_name
            })
            
            self.logger.info(f"Removed role {role_name} from user {user_id}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to remove role: {e}")
            return False
    
    def _get_effective_permissions(self, user_id: str) -> Dict[str, Set[Permission]]:
        """Get all effective permissions for a user"""
        if user_id not in self.users:
            return {}
        
        user = self.users[user_id]
        effective_permissions = {}
        
        # Start with direct permissions
        for resource_pattern, perms in user.direct_permissions.items():
            effective_permissions[resource_pattern] = perms.copy()
        
        # Add permissions from roles (including inherited roles)
        processed_roles = set()
        roles_to_process = list(user.roles)
        
        while roles_to_process:
            role_name = roles_to_process.pop(0)
            if role_name in processed_roles or role_name not in self.roles:
                continue
            
            processed_roles.add(role_name)
            role = self.roles[role_name]
            
            # Add parent roles to processing queue
            roles_to_process.extend(role.parent_roles)
            
            # Merge role permissions
            for resource_pattern, perms in role.permissions.items():
                if resource_pattern not in effective_permissions:
                    effective_permissions[resource_pattern] = set()
                effective_permissions[resource_pattern].update(perms)
        
        return effective_permissions
    
    def _match_resource_pattern(self, pattern: str, resource: Resource) -> bool:
        """Check if a resource matches a pattern"""
        resource_str = str(resource)
        
        # Handle wildcard patterns
        if pattern == "*":
            return True
        
        # Split pattern and resource into parts
        pattern_parts = pattern.split(".")
        resource_parts = resource_str.split(".")
        
        # Ensure same number of parts or pattern has wildcards
        if len(pattern_parts) != len(resource_parts):
            return False
        
        # Check each part
        for pattern_part, resource_part in zip(pattern_parts, resource_parts):
            if pattern_part != "*" and pattern_part != resource_part:
                return False
        
        return True
    
    def check_access(self, request: AccessRequest) -> AccessResult:
        """Check if a user has access to a resource"""
        try:
            if request.user_id not in self.users:
                return AccessResult(
                    granted=False,
                    reason="User not found",
                    effective_permissions=set(),
                    applied_policies=[]
                )
            
            user = self.users[request.user_id]
            
            # Check if user is active
            if not user.is_active:
                return AccessResult(
                    granted=False,
                    reason="User account is inactive",
                    effective_permissions=set(),
                    applied_policies=[]
                )
            
            # Get effective permissions
            effective_permissions = self._get_effective_permissions(request.user_id)
            
            # Find matching permissions
            granted_permissions = set()
            applied_policies = []
            
            for pattern, permissions in effective_permissions.items():
                if self._match_resource_pattern(pattern, request.resource):
                    granted_permissions.update(permissions)
                    applied_policies.append(f"Pattern: {pattern}")
            
            # Check if requested permission is granted
            access_granted = request.permission in granted_permissions
            
            # Apply additional policies
            policy_result = self._apply_access_policies(request, granted_permissions)
            if not policy_result['allowed']:
                access_granted = False
                applied_policies.extend(policy_result['policies'])
            
            # Log access attempt
            self._audit_log("ACCESS_CHECK", {
                "user_id": request.user_id,
                "resource": str(request.resource),
                "permission": request.permission.value,
                "granted": access_granted,
                "reason": "Permission granted" if access_granted else "Permission denied"
            })
            
            return AccessResult(
                granted=access_granted,
                reason="Permission granted" if access_granted else "Permission denied",
                effective_permissions=granted_permissions,
                applied_policies=applied_policies
            )
            
        except Exception as e:
            self.logger.error(f"Access check failed: {e}")
            return AccessResult(
                granted=False,
                reason=f"Internal error: {e}",
                effective_permissions=set(),
                applied_policies=[]
            )
    
    def _apply_access_policies(self, request: AccessRequest, 
                              granted_permissions: Set[Permission]) -> Dict[str, Any]:
        """Apply additional access policies"""
        policies_applied = []
        allowed = True
        
        # Time-based access policy
        current_hour = datetime.now().hour
        if current_hour < 6 or current_hour > 22:
            if Permission.ADMIN in granted_permissions:
                # Admin operations restricted during off-hours
                allowed = False
                policies_applied.append("Off-hours admin restriction")
        
        # Sensitive data access policy
        if "sensitive" in str(request.resource).lower():
            if Permission.READ in granted_permissions:
                # Additional logging for sensitive data access
                policies_applied.append("Sensitive data access logged")
        
        # High-privilege operation policy
        if request.permission in [Permission.DELETE, Permission.ADMIN]:
            # Require additional verification for high-privilege operations
            if not request.context.get('verified', False):
                allowed = False
                policies_applied.append("High-privilege operation requires verification")
        
        return {
            'allowed': allowed,
            'policies': policies_applied
        }
    
    def _audit_log(self, action: str, details: Dict[str, Any]):
        """Log audit event"""
        audit_entry = {
            'timestamp': datetime.now().isoformat(),
            'action': action,
            'details': details
        }
        self.audit_log.append(audit_entry)
    
    def get_user_permissions(self, user_id: str) -> Dict[str, Any]:
        """Get comprehensive permission report for a user"""
        if user_id not in self.users:
            return {}
        
        user = self.users[user_id]
        effective_permissions = self._get_effective_permissions(user_id)
        
        return {
            'user_id': user_id,
            'username': user.username,
            'roles': list(user.roles),
            'effective_permissions': {
                pattern: [p.value for p in perms]
                for pattern, perms in effective_permissions.items()
            },
            'direct_permissions': {
                pattern: [p.value for p in perms]
                for pattern, perms in user.direct_permissions.items()
            }
        }
    
    def export_configuration(self) -> Dict[str, Any]:
        """Export RBAC configuration"""
        return {
            'users': {
                user_id: {
                    'username': user.username,
                    'email': user.email,
                    'roles': list(user.roles),
                    'is_active': user.is_active
                }
                for user_id, user in self.users.items()
            },
            'roles': {
                role_name: {
                    'description': role.description,
                    'permissions': {
                        pattern: [p.value for p in perms]
                        for pattern, perms in role.permissions.items()
                    },
                    'parent_roles': list(role.parent_roles),
                    'is_active': role.is_active
                }
                for role_name, role in self.roles.items()
            }
        }

# Example usage
def main():
    rbac = RBACManager()
    
    # Create users
    rbac.create_user("alice", "Alice Smith", "alice@company.com", ["dba"])
    rbac.create_user("bob", "Bob Johnson", "bob@company.com", ["data_analyst"])
    rbac.create_user("charlie", "Charlie Brown", "charlie@company.com", ["app_user"])
    
    # Create custom role
    rbac.create_role("report_viewer", "Can view reports", ["readonly"])
    rbac.grant_permission("report_viewer", "reports.*", [Permission.READ])
    
    # Test access
    resource = Resource(ResourceType.TABLE, "users", "public", "app_db")
    request = AccessRequest("alice", resource, Permission.READ)
    
    result = rbac.check_access(request)
    print(f"Access Result for Alice:")
    print(f"  Granted: {result.granted}")
    print(f"  Reason: {result.reason}")
    print(f"  Effective Permissions: {[p.value for p in result.effective_permissions]}")
    
    # Get user permissions report
    permissions_report = rbac.get_user_permissions("alice")
    print(f"\nAlice's Permissions:")
    print(json.dumps(permissions_report, indent=2))

if __name__ == "__main__":
    main()

Data Encryption

1. Comprehensive Encryption Manager

#!/usr/bin/env python3
# src/security/encryption_manager.py

import os
import base64
import json
import logging
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass
from enum import Enum
import secrets
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
import hashlib
import hmac

class EncryptionAlgorithm(Enum):
    AES_256_GCM = "aes_256_gcm"
    AES_256_CBC = "aes_256_cbc"
    CHACHA20_POLY1305 = "chacha20_poly1305"
    RSA_OAEP = "rsa_oaep"
    FERNET = "fernet"

class KeyDerivationFunction(Enum):
    PBKDF2 = "pbkdf2"
    SCRYPT = "scrypt"
    ARGON2 = "argon2"

@dataclass
class EncryptionConfig:
    algorithm: EncryptionAlgorithm
    key_size: int
    kdf: KeyDerivationFunction
    iterations: int
    salt_size: int
    iv_size: int

@dataclass
class EncryptedData:
    ciphertext: bytes
    iv: Optional[bytes]
    salt: Optional[bytes]
    tag: Optional[bytes]
    algorithm: EncryptionAlgorithm
    metadata: Dict[str, Any]

class EncryptionManager:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.logger = self._setup_logging()
        self.master_key = self._derive_master_key()
        self.key_cache: Dict[str, bytes] = {}
        
        # Default encryption configurations
        self.encryption_configs = {
            'default': EncryptionConfig(
                algorithm=EncryptionAlgorithm.AES_256_GCM,
                key_size=32,
                kdf=KeyDerivationFunction.PBKDF2,
                iterations=100000,
                salt_size=16,
                iv_size=12
            ),
            'high_security': EncryptionConfig(
                algorithm=EncryptionAlgorithm.AES_256_GCM,
                key_size=32,
                kdf=KeyDerivationFunction.SCRYPT,
                iterations=32768,
                salt_size=32,
                iv_size=12
            ),
            'performance': EncryptionConfig(
                algorithm=EncryptionAlgorithm.CHACHA20_POLY1305,
                key_size=32,
                kdf=KeyDerivationFunction.PBKDF2,
                iterations=50000,
                salt_size=16,
                iv_size=12
            )
        }
    
    def _setup_logging(self) -> logging.Logger:
        logger = logging.getLogger('EncryptionManager')
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        return logger
    
    def _derive_master_key(self) -> bytes:
        """Derive master key from configuration"""
        master_password = self.config.get('master_password', '').encode()
        salt = self.config.get('master_salt', 'default_salt').encode()
        
        if not master_password:
            # Generate random master key if no password provided
            return secrets.token_bytes(32)
        
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=100000,
        )
        return kdf.derive(master_password)
    
    def derive_key(self, password: str, salt: bytes, config: EncryptionConfig) -> bytes:
        """Derive encryption key from password"""
        password_bytes = password.encode('utf-8')
        
        if config.kdf == KeyDerivationFunction.PBKDF2:
            kdf = PBKDF2HMAC(
                algorithm=hashes.SHA256(),
                length=config.key_size,
                salt=salt,
                iterations=config.iterations,
            )
            return kdf.derive(password_bytes)
        
        elif config.kdf == KeyDerivationFunction.SCRYPT:
            kdf = Scrypt(
                algorithm=hashes.SHA256(),
                length=config.key_size,
                salt=salt,
                n=config.iterations,
                r=8,
                p=1,
            )
            return kdf.derive(password_bytes)
        
        else:
            raise ValueError(f"Unsupported KDF: {config.kdf}")
    
    def generate_key(self, key_id: str, config_name: str = 'default') -> bytes:
        """Generate and store a new encryption key"""
        config = self.encryption_configs[config_name]
        key = secrets.token_bytes(config.key_size)
        
        # Encrypt key with master key for storage
        encrypted_key = self._encrypt_key(key)
        
        # Store encrypted key (in production, this would be in a secure key store)
        self._store_key(key_id, encrypted_key)
        
        # Cache the key
        self.key_cache[key_id] = key
        
        self.logger.info(f"Generated new key: {key_id}")
        return key
    
    def get_key(self, key_id: str) -> Optional[bytes]:
        """Retrieve encryption key"""
        if key_id in self.key_cache:
            return self.key_cache[key_id]
        
        # Load from storage
        encrypted_key = self._load_key(key_id)
        if encrypted_key:
            key = self._decrypt_key(encrypted_key)
            self.key_cache[key_id] = key
            return key
        
        return None
    
    def _encrypt_key(self, key: bytes) -> bytes:
        """Encrypt key with master key"""
        fernet = Fernet(base64.urlsafe_b64encode(self.master_key))
        return fernet.encrypt(key)
    
    def _decrypt_key(self, encrypted_key: bytes) -> bytes:
        """Decrypt key with master key"""
        fernet = Fernet(base64.urlsafe_b64encode(self.master_key))
        return fernet.decrypt(encrypted_key)
    
    def _store_key(self, key_id: str, encrypted_key: bytes):
        """Store encrypted key (placeholder implementation)"""
        # In production, this would store in a secure key management system
        key_file = f"/tmp/keys/{key_id}.key"
        os.makedirs(os.path.dirname(key_file), exist_ok=True)
        with open(key_file, 'wb') as f:
            f.write(encrypted_key)
    
    def _load_key(self, key_id: str) -> Optional[bytes]:
        """Load encrypted key (placeholder implementation)"""
        key_file = f"/tmp/keys/{key_id}.key"
        try:
            with open(key_file, 'rb') as f:
                return f.read()
        except FileNotFoundError:
            return None
    
    def encrypt_data(self, data: bytes, key_id: str, 
                    config_name: str = 'default') -> EncryptedData:
        """Encrypt data using specified configuration"""
        config = self.encryption_configs[config_name]
        key = self.get_key(key_id)
        
        if not key:
            raise ValueError(f"Key not found: {key_id}")
        
        if config.algorithm == EncryptionAlgorithm.AES_256_GCM:
            return self._encrypt_aes_gcm(data, key, config)
        elif config.algorithm == EncryptionAlgorithm.AES_256_CBC:
            return self._encrypt_aes_cbc(data, key, config)
        elif config.algorithm == EncryptionAlgorithm.CHACHA20_POLY1305:
            return self._encrypt_chacha20(data, key, config)
        elif config.algorithm == EncryptionAlgorithm.FERNET:
            return self._encrypt_fernet(data, key, config)
        else:
            raise ValueError(f"Unsupported algorithm: {config.algorithm}")
    
    def decrypt_data(self, encrypted_data: EncryptedData, key_id: str) -> bytes:
        """Decrypt data"""
        key = self.get_key(key_id)
        
        if not key:
            raise ValueError(f"Key not found: {key_id}")
        
        if encrypted_data.algorithm == EncryptionAlgorithm.AES_256_GCM:
            return self._decrypt_aes_gcm(encrypted_data, key)
        elif encrypted_data.algorithm == EncryptionAlgorithm.AES_256_CBC:
            return self._decrypt_aes_cbc(encrypted_data, key)
        elif encrypted_data.algorithm == EncryptionAlgorithm.CHACHA20_POLY1305:
            return self._decrypt_chacha20(encrypted_data, key)
        elif encrypted_data.algorithm == EncryptionAlgorithm.FERNET:
            return self._decrypt_fernet(encrypted_data, key)
        else:
            raise ValueError(f"Unsupported algorithm: {encrypted_data.algorithm}")
    
    def _encrypt_aes_gcm(self, data: bytes, key: bytes, 
                        config: EncryptionConfig) -> EncryptedData:
        """Encrypt using AES-256-GCM"""
        iv = secrets.token_bytes(config.iv_size)
        
        cipher = Cipher(algorithms.AES(key), modes.GCM(iv))
        encryptor = cipher.encryptor()
        
        ciphertext = encryptor.update(data) + encryptor.finalize()
        
        return EncryptedData(
            ciphertext=ciphertext,
            iv=iv,
            salt=None,
            tag=encryptor.tag,
            algorithm=config.algorithm,
            metadata={'key_size': len(key)}
        )
    
    def _decrypt_aes_gcm(self, encrypted_data: EncryptedData, key: bytes) -> bytes:
        """Decrypt using AES-256-GCM"""
        cipher = Cipher(
            algorithms.AES(key), 
            modes.GCM(encrypted_data.iv, encrypted_data.tag)
        )
        decryptor = cipher.decryptor()
        
        return decryptor.update(encrypted_data.ciphertext) + decryptor.finalize()
    
    def _encrypt_aes_cbc(self, data: bytes, key: bytes, 
                        config: EncryptionConfig) -> EncryptedData:
        """Encrypt using AES-256-CBC"""
        iv = secrets.token_bytes(16)  # AES block size
        
        # Pad data to block size
        padding_length = 16 - (len(data) % 16)
        padded_data = data + bytes([padding_length] * padding_length)
        
        cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
        encryptor = cipher.encryptor()
        
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()
        
        return EncryptedData(
            ciphertext=ciphertext,
            iv=iv,
            salt=None,
            tag=None,
            algorithm=config.algorithm,
            metadata={'key_size': len(key)}
        )
    
    def _decrypt_aes_cbc(self, encrypted_data: EncryptedData, key: bytes) -> bytes:
        """Decrypt using AES-256-CBC"""
        cipher = Cipher(algorithms.AES(key), modes.CBC(encrypted_data.iv))
        decryptor = cipher.decryptor()
        
        padded_data = decryptor.update(encrypted_data.ciphertext) + decryptor.finalize()
        
        # Remove padding
        padding_length = padded_data[-1]
        return padded_data[:-padding_length]
    
    def _encrypt_chacha20(self, data: bytes, key: bytes, 
                         config: EncryptionConfig) -> EncryptedData:
        """Encrypt using ChaCha20-Poly1305"""
        nonce = secrets.token_bytes(12)  # ChaCha20 nonce size
        
        cipher = Cipher(algorithms.ChaCha20(key, nonce), mode=None)
        encryptor = cipher.encryptor()
        
        ciphertext = encryptor.update(data) + encryptor.finalize()
        
        return EncryptedData(
            ciphertext=ciphertext,
            iv=nonce,
            salt=None,
            tag=None,
            algorithm=config.algorithm,
            metadata={'key_size': len(key)}
        )
    
    def _decrypt_chacha20(self, encrypted_data: EncryptedData, key: bytes) -> bytes:
        """Decrypt using ChaCha20-Poly1305"""
        cipher = Cipher(algorithms.ChaCha20(key, encrypted_data.iv), mode=None)
        decryptor = cipher.decryptor()
        
        return decryptor.update(encrypted_data.ciphertext) + decryptor.finalize()
    
    def _encrypt_fernet(self, data: bytes, key: bytes, 
                       config: EncryptionConfig) -> EncryptedData:
        """Encrypt using Fernet"""
        fernet = Fernet(base64.urlsafe_b64encode(key))
        ciphertext = fernet.encrypt(data)
        
        return EncryptedData(
            ciphertext=ciphertext,
            iv=None,
            salt=None,
            tag=None,
            algorithm=config.algorithm,
            metadata={'key_size': len(key)}
        )
    
    def _decrypt_fernet(self, encrypted_data: EncryptedData, key: bytes) -> bytes:
        """Decrypt using Fernet"""
        fernet = Fernet(base64.urlsafe_b64encode(key))
        return fernet.decrypt(encrypted_data.ciphertext)
    
    def encrypt_column_data(self, table_name: str, column_name: str, 
                           data: Any, data_type: str) -> str:
        """Encrypt column data with format preservation"""
        # Convert data to bytes
        if isinstance(data, str):
            data_bytes = data.encode('utf-8')
        elif isinstance(data, (int, float)):
            data_bytes = str(data).encode('utf-8')
        else:
            data_bytes = str(data).encode('utf-8')
        
        # Use column-specific key
        key_id = f"{table_name}.{column_name}"
        
        # Ensure key exists
        if not self.get_key(key_id):
            self.generate_key(key_id)
        
        # Encrypt data
        encrypted_data = self.encrypt_data(data_bytes, key_id)
        
        # Return base64-encoded ciphertext for storage
        return base64.b64encode(encrypted_data.ciphertext).decode('utf-8')
    
    def decrypt_column_data(self, table_name: str, column_name: str, 
                           encrypted_value: str, data_type: str) -> Any:
        """Decrypt column data and convert back to original type"""
        key_id = f"{table_name}.{column_name}"
        
        # Decode base64
        ciphertext = base64.b64decode(encrypted_value.encode('utf-8'))
        
        # Create EncryptedData object (simplified - in practice, store metadata)
        encrypted_data = EncryptedData(
            ciphertext=ciphertext,
            iv=None,
            salt=None,
            tag=None,
            algorithm=EncryptionAlgorithm.FERNET,
            metadata={}
        )
        
        # Decrypt
        decrypted_bytes = self.decrypt_data(encrypted_data, key_id)
        decrypted_str = decrypted_bytes.decode('utf-8')
        
        # Convert back to original type
        if data_type.lower() in ['int', 'integer', 'bigint']:
            return int(decrypted_str)
        elif data_type.lower() in ['float', 'double', 'decimal']:
            return float(decrypted_str)
        else:
            return decrypted_str
    
    def rotate_key(self, key_id: str) -> bool:
        """Rotate encryption key"""
        try:
            # Generate new key
            old_key = self.get_key(key_id)
            new_key = secrets.token_bytes(32)
            
            # Store new key with versioned ID
            new_key_id = f"{key_id}_v{int(time.time())}"
            encrypted_new_key = self._encrypt_key(new_key)
            self._store_key(new_key_id, encrypted_new_key)
            
            # Update key cache
            self.key_cache[key_id] = new_key
            
            self.logger.info(f"Rotated key: {key_id}")
            return True
            
        except Exception as e:
            self.logger.error(f"Key rotation failed for {key_id}: {e}")
            return False
    
    def generate_hash(self, data: str, salt: Optional[str] = None) -> Tuple[str, str]:
        """Generate secure hash for data"""
        if salt is None:
            salt = secrets.token_hex(16)
        
        # Use PBKDF2 for password-like data
        hash_value = hashlib.pbkdf2_hmac(
            'sha256',
            data.encode('utf-8'),
            salt.encode('utf-8'),
            100000
        )
        
        return base64.b64encode(hash_value).decode('utf-8'), salt
    
    def verify_hash(self, data: str, hash_value: str, salt: str) -> bool:
        """Verify data against hash"""
        computed_hash, _ = self.generate_hash(data, salt)
        return hmac.compare_digest(computed_hash, hash_value)

# Example usage
def main():
    config = {
        'master_password': 'secure_master_password_2024',
        'master_salt': 'encryption_salt_2024'
    }
    
    encryption_manager = EncryptionManager(config)
    
    # Generate a key for sensitive data
    key_id = "users.ssn"
    encryption_manager.generate_key(key_id)
    
    # Encrypt sensitive data
    sensitive_data = "123-45-6789"
    encrypted_ssn = encryption_manager.encrypt_column_data(
        "users", "ssn", sensitive_data, "varchar"
    )
    
    print(f"Original SSN: {sensitive_data}")
    print(f"Encrypted SSN: {encrypted_ssn}")
    
    # Decrypt data
    decrypted_ssn = encryption_manager.decrypt_column_data(
        "users", "ssn", encrypted_ssn, "varchar"
    )
    
    print(f"Decrypted SSN: {decrypted_ssn}")
    
    # Generate hash for password
    password = "user_password_123"
    hash_value, salt = encryption_manager.generate_hash(password)
    
    print(f"Password hash: {hash_value}")
    print(f"Salt: {salt}")
    
    # Verify password
    is_valid = encryption_manager.verify_hash(password, hash_value, salt)
    print(f"Password verification: {is_valid}")

if __name__ == "__main__":
    main()

Access Control Implementation

1. Database-Level Access Control

import sqlite3
import mysql.connector
import psycopg2
from typing import Dict, List, Optional, Any
import logging
from datetime import datetime, timedelta

class DatabaseAccessController:
    """Unified database access control implementation"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.logger = logging.getLogger(__name__)
        self.connections = {}
        self.access_policies = {}
        
    def create_connection(self, db_type: str, connection_params: Dict[str, Any]) -> Any:
        """Create secure database connection"""
        try:
            if db_type == 'mysql':
                connection = mysql.connector.connect(
                    **connection_params,
                    ssl_disabled=False,
                    ssl_verify_cert=True,
                    ssl_verify_identity=True,
                    autocommit=False
                )
            elif db_type == 'postgresql':
                connection = psycopg2.connect(
                    **connection_params,
                    sslmode='require',
                    connect_timeout=10
                )
            elif db_type == 'sqlite':
                connection = sqlite3.connect(
                    connection_params['database'],
                    timeout=10.0,
                    check_same_thread=False
                )
            else:
                raise ValueError(f"Unsupported database type: {db_type}")
                
            self.connections[db_type] = connection
            self.logger.info(f"Secure connection established for {db_type}")
            return connection
            
        except Exception as e:
            self.logger.error(f"Connection failed for {db_type}: {e}")
            raise
    
    def create_user_with_privileges(self, db_type: str, username: str, 
                                  password: str, privileges: List[str],
                                  databases: List[str] = None) -> bool:
        """Create database user with specific privileges"""
        try:
            connection = self.connections.get(db_type)
            if not connection:
                raise ValueError(f"No connection available for {db_type}")
                
            cursor = connection.cursor()
            
            if db_type == 'mysql':
                # Create user
                cursor.execute(f"CREATE USER '{username}'@'%' IDENTIFIED BY %s", (password,))
                
                # Grant privileges
                for db_name in (databases or ['*']):
                    for privilege in privileges:
                        if privilege.upper() == 'ALL':
                            cursor.execute(f"GRANT ALL PRIVILEGES ON {db_name}.* TO '{username}'@'%'")
                        else:
                            cursor.execute(f"GRANT {privilege} ON {db_name}.* TO '{username}'@'%'")
                
                cursor.execute("FLUSH PRIVILEGES")
                
            elif db_type == 'postgresql':
                # Create user
                cursor.execute(f"CREATE USER {username} WITH PASSWORD %s", (password,))
                
                # Grant privileges
                for db_name in (databases or []):
                    cursor.execute(f"GRANT CONNECT ON DATABASE {db_name} TO {username}")
                    for privilege in privileges:
                        if privilege.upper() == 'ALL':
                            cursor.execute(f"GRANT ALL PRIVILEGES ON DATABASE {db_name} TO {username}")
                        else:
                            cursor.execute(f"GRANT {privilege} ON DATABASE {db_name} TO {username}")
            
            connection.commit()
            self.logger.info(f"User {username} created with privileges: {privileges}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to create user {username}: {e}")
            connection.rollback()
            return False
    
    def implement_row_level_security(self, db_type: str, table_name: str,
                                   policy_name: str, policy_condition: str) -> bool:
        """Implement row-level security policies"""
        try:
            connection = self.connections.get(db_type)
            cursor = connection.cursor()
            
            if db_type == 'postgresql':
                # Enable RLS on table
                cursor.execute(f"ALTER TABLE {table_name} ENABLE ROW LEVEL SECURITY")
                
                # Create policy
                cursor.execute(f"""
                    CREATE POLICY {policy_name} ON {table_name}
                    FOR ALL TO PUBLIC
                    USING ({policy_condition})
                """)
                
            elif db_type == 'mysql':
                # MySQL doesn't have native RLS, implement with views
                view_name = f"{table_name}_secure"
                cursor.execute(f"""
                    CREATE VIEW {view_name} AS
                    SELECT * FROM {table_name}
                    WHERE {policy_condition}
                """)
            
            connection.commit()
            self.logger.info(f"Row-level security implemented for {table_name}")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to implement RLS for {table_name}: {e}")
            return False
    
    def create_security_definer_function(self, db_type: str, function_name: str,
                                       function_body: str, owner: str) -> bool:
        """Create security definer functions for controlled access"""
        try:
            connection = self.connections.get(db_type)
            cursor = connection.cursor()
            
            if db_type == 'postgresql':
                cursor.execute(f"""
                    CREATE OR REPLACE FUNCTION {function_name}()
                    RETURNS TABLE(result_data TEXT)
                    LANGUAGE plpgsql
                    SECURITY DEFINER
                    SET search_path = public
                    AS $$
                    BEGIN
                        {function_body}
                    END;
                    $$;
                """)
                
                # Set owner
                cursor.execute(f"ALTER FUNCTION {function_name}() OWNER TO {owner}")
                
            connection.commit()
            self.logger.info(f"Security definer function {function_name} created")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to create function {function_name}: {e}")
            return False

### 2. Application-Level Access Control

class ApplicationAccessControl:
    """Application-level access control and session management"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.logger = logging.getLogger(__name__)
        self.active_sessions = {}
        self.failed_attempts = {}
        
    def validate_session(self, session_id: str, user_id: str) -> bool:
        """Validate user session"""
        try:
            session = self.active_sessions.get(session_id)
            if not session:
                return False
                
            # Check session expiry
            if datetime.now() > session['expires_at']:
                del self.active_sessions[session_id]
                return False
                
            # Check user match
            if session['user_id'] != user_id:
                return False
                
            # Update last activity
            session['last_activity'] = datetime.now()
            return True
            
        except Exception as e:
            self.logger.error(f"Session validation failed: {e}")
            return False
    
    def check_rate_limiting(self, user_id: str, action: str) -> bool:
        """Implement rate limiting for database operations"""
        try:
            current_time = datetime.now()
            key = f"{user_id}:{action}"
            
            if key not in self.failed_attempts:
                self.failed_attempts[key] = []
            
            # Clean old attempts
            self.failed_attempts[key] = [
                attempt for attempt in self.failed_attempts[key]
                if current_time - attempt < timedelta(minutes=15)
            ]
            
            # Check rate limit
            max_attempts = self.config.get('rate_limit', {}).get(action, 10)
            if len(self.failed_attempts[key]) >= max_attempts:
                self.logger.warning(f"Rate limit exceeded for {user_id}:{action}")
                return False
                
            return True
            
        except Exception as e:
            self.logger.error(f"Rate limiting check failed: {e}")
            return False
    
    def log_access_attempt(self, user_id: str, action: str, 
                          success: bool, details: Dict[str, Any] = None):
        """Log access attempts for auditing"""
        try:
            log_entry = {
                'timestamp': datetime.now().isoformat(),
                'user_id': user_id,
                'action': action,
                'success': success,
                'ip_address': details.get('ip_address') if details else None,
                'user_agent': details.get('user_agent') if details else None,
                'resource': details.get('resource') if details else None
            }
            
            # Log to file or database
            self.logger.info(f"Access attempt: {log_entry}")
            
            # Track failed attempts for rate limiting
            if not success:
                key = f"{user_id}:{action}"
                if key not in self.failed_attempts:
                    self.failed_attempts[key] = []
                self.failed_attempts[key].append(datetime.now())
                
        except Exception as e:
            self.logger.error(f"Failed to log access attempt: {e}")

## Auditing and Monitoring

### 1. Comprehensive Audit System

```python
import json
import hashlib
from datetime import datetime
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum

class AuditEventType(Enum):
    LOGIN = "login"
    LOGOUT = "logout"
    QUERY = "query"
    INSERT = "insert"
    UPDATE = "update"
    DELETE = "delete"
    SCHEMA_CHANGE = "schema_change"
    PRIVILEGE_CHANGE = "privilege_change"
    BACKUP = "backup"
    RESTORE = "restore"

@dataclass
class AuditEvent:
    event_id: str
    timestamp: datetime
    event_type: AuditEventType
    user_id: str
    session_id: str
    database_name: str
    table_name: Optional[str]
    query: Optional[str]
    affected_rows: Optional[int]
    ip_address: str
    user_agent: str
    success: bool
    error_message: Optional[str]
    data_before: Optional[Dict[str, Any]]
    data_after: Optional[Dict[str, Any]]

class DatabaseAuditSystem:
    """Comprehensive database auditing system"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.logger = logging.getLogger(__name__)
        self.audit_storage = config.get('audit_storage', 'file')
        self.retention_days = config.get('retention_days', 365)
        
    def generate_event_id(self, event_data: Dict[str, Any]) -> str:
        """Generate unique event ID"""
        event_string = json.dumps(event_data, sort_keys=True, default=str)
        return hashlib.sha256(event_string.encode()).hexdigest()[:16]
    
    def log_audit_event(self, event: AuditEvent) -> bool:
        """Log audit event to storage"""
        try:
            event_dict = asdict(event)
            event_dict['timestamp'] = event.timestamp.isoformat()
            event_dict['event_type'] = event.event_type.value
            
            if self.audit_storage == 'file':
                self._log_to_file(event_dict)
            elif self.audit_storage == 'database':
                self._log_to_database(event_dict)
            elif self.audit_storage == 'syslog':
                self._log_to_syslog(event_dict)
            
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to log audit event: {e}")
            return False
    
    def _log_to_file(self, event_dict: Dict[str, Any]):
        """Log audit event to file"""
        audit_file = f"audit_{datetime.now().strftime('%Y%m%d')}.log"
        with open(audit_file, 'a') as f:
            f.write(json.dumps(event_dict) + '\n')
    
    def _log_to_database(self, event_dict: Dict[str, Any]):
        """Log audit event to database"""
        # Implementation for database logging
        pass
    
    def _log_to_syslog(self, event_dict: Dict[str, Any]):
        """Log audit event to syslog"""
        import syslog
        syslog.openlog("database_audit")
        syslog.syslog(syslog.LOG_INFO, json.dumps(event_dict))
    
    def create_audit_triggers(self, db_type: str, table_name: str) -> List[str]:
        """Create database triggers for automatic auditing"""
        triggers = []
        
        if db_type == 'postgresql':
            # Create audit table
            audit_table = f"{table_name}_audit"
            triggers.append(f"""
                CREATE TABLE IF NOT EXISTS {audit_table} (
                    audit_id SERIAL PRIMARY KEY,
                    operation CHAR(1) NOT NULL,
                    stamp TIMESTAMP NOT NULL DEFAULT NOW(),
                    userid TEXT NOT NULL DEFAULT USER,
                    old_values JSONB,
                    new_values JSONB
                );
            """)
            
            # Create trigger function
            triggers.append(f"""
                CREATE OR REPLACE FUNCTION {table_name}_audit_trigger()
                RETURNS TRIGGER AS $$
                BEGIN
                    IF TG_OP = 'DELETE' THEN
                        INSERT INTO {audit_table} (operation, old_values)
                        VALUES ('D', row_to_json(OLD));
                        RETURN OLD;
                    ELSIF TG_OP = 'UPDATE' THEN
                        INSERT INTO {audit_table} (operation, old_values, new_values)
                        VALUES ('U', row_to_json(OLD), row_to_json(NEW));
                        RETURN NEW;
                    ELSIF TG_OP = 'INSERT' THEN
                        INSERT INTO {audit_table} (operation, new_values)
                        VALUES ('I', row_to_json(NEW));
                        RETURN NEW;
                    END IF;
                    RETURN NULL;
                END;
                $$ LANGUAGE plpgsql;
            """)
            
            # Create triggers
            for operation in ['INSERT', 'UPDATE', 'DELETE']:
                triggers.append(f"""
                    CREATE TRIGGER {table_name}_{operation.lower()}_audit
                    AFTER {operation} ON {table_name}
                    FOR EACH ROW EXECUTE FUNCTION {table_name}_audit_trigger();
                """)
        
        elif db_type == 'mysql':
            # MySQL trigger implementation
            audit_table = f"{table_name}_audit"
            triggers.append(f"""
                CREATE TABLE IF NOT EXISTS {audit_table} (
                    audit_id INT AUTO_INCREMENT PRIMARY KEY,
                    operation CHAR(1) NOT NULL,
                    stamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    userid VARCHAR(255) DEFAULT USER(),
                    old_values JSON,
                    new_values JSON
                );
            """)
            
            # Create triggers for each operation
            triggers.append(f"""
                CREATE TRIGGER {table_name}_insert_audit
                AFTER INSERT ON {table_name}
                FOR EACH ROW
                INSERT INTO {audit_table} (operation, new_values)
                VALUES ('I', JSON_OBJECT({self._get_column_list(table_name, 'NEW')}));
            """)
        
        return triggers
    
    def analyze_audit_logs(self, start_date: datetime, end_date: datetime) -> Dict[str, Any]:
        """Analyze audit logs for security insights"""
        try:
            analysis = {
                'total_events': 0,
                'events_by_type': {},
                'events_by_user': {},
                'failed_attempts': 0,
                'suspicious_activities': [],
                'top_queries': [],
                'peak_hours': {}
            }
            
            # Read and analyze audit logs
            # Implementation depends on storage type
            
            return analysis
            
        except Exception as e:
            self.logger.error(f"Audit log analysis failed: {e}")
            return {}

### 2. Real-time Security Monitoring

```bash
#!/bin/bash

# Database Security Monitoring Script
# Monitors database security events and generates alerts

# Configuration
DB_HOST="localhost"
DB_PORT="3306"
DB_USER="monitor_user"
DB_PASS="monitor_password"
ALERT_EMAIL="security@company.com"
LOG_FILE="/var/log/db_security_monitor.log"
THRESHOLD_FAILED_LOGINS=5
THRESHOLD_QUERY_TIME=30

# Logging function
log_message() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}

# Check for failed login attempts
check_failed_logins() {
    log_message "Checking for failed login attempts..."
    
    failed_logins=$(mysql -h "$DB_HOST" -P "$DB_PORT" -u "$DB_USER" -p"$DB_PASS" \
        -e "SELECT COUNT(*) FROM mysql.general_log WHERE command_type = 'Connect' AND argument LIKE '%Access denied%' AND event_time > DATE_SUB(NOW(), INTERVAL 1 HOUR);" \
        -s -N 2>/dev/null)
    
    if [ "$failed_logins" -gt "$THRESHOLD_FAILED_LOGINS" ]; then
        alert_message="SECURITY ALERT: $failed_logins failed login attempts in the last hour"
        log_message "$alert_message"
        send_alert "$alert_message"
    fi
}

# Check for suspicious queries
check_suspicious_queries() {
    log_message "Checking for suspicious queries..."
    
    # Check for potential SQL injection patterns
    suspicious_patterns=(
        "UNION.*SELECT"
        "OR.*1=1"
        "DROP.*TABLE"
        "DELETE.*FROM.*WHERE.*1=1"
        "INSERT.*INTO.*VALUES.*SELECT"
    )
    
    for pattern in "${suspicious_patterns[@]}"; do
        count=$(mysql -h "$DB_HOST" -P "$DB_PORT" -u "$DB_USER" -p"$DB_PASS" \
            -e "SELECT COUNT(*) FROM mysql.general_log WHERE argument REGEXP '$pattern' AND event_time > DATE_SUB(NOW(), INTERVAL 1 HOUR);" \
            -s -N 2>/dev/null)
        
        if [ "$count" -gt 0 ]; then
            alert_message="SECURITY ALERT: $count suspicious queries detected with pattern: $pattern"
            log_message "$alert_message"
            send_alert "$alert_message"
        fi
    done
}

# Check for long-running queries
check_long_queries() {
    log_message "Checking for long-running queries..."
    
    long_queries=$(mysql -h "$DB_HOST" -P "$DB_PORT" -u "$DB_USER" -p"$DB_PASS" \
        -e "SELECT COUNT(*) FROM information_schema.processlist WHERE time > $THRESHOLD_QUERY_TIME AND command != 'Sleep';" \
        -s -N 2>/dev/null)
    
    if [ "$long_queries" -gt 0 ]; then
        alert_message="PERFORMANCE ALERT: $long_queries queries running longer than $THRESHOLD_QUERY_TIME seconds"
        log_message "$alert_message"
        send_alert "$alert_message"
    fi
}

# Check database connections
check_connections() {
    log_message "Checking database connections..."
    
    max_connections=$(mysql -h "$DB_HOST" -P "$DB_PORT" -u "$DB_USER" -p"$DB_PASS" \
        -e "SHOW VARIABLES LIKE 'max_connections';" -s -N | cut -f2)
    
    current_connections=$(mysql -h "$DB_HOST" -P "$DB_PORT" -u "$DB_USER" -p"$DB_PASS" \
        -e "SHOW STATUS LIKE 'Threads_connected';" -s -N | cut -f2)
    
    connection_percentage=$((current_connections * 100 / max_connections))
    
    if [ "$connection_percentage" -gt 80 ]; then
        alert_message="CONNECTION ALERT: Database connections at $connection_percentage% capacity ($current_connections/$max_connections)"
        log_message "$alert_message"
        send_alert "$alert_message"
    fi
}

# Check for privilege escalation attempts
check_privilege_escalation() {
    log_message "Checking for privilege escalation attempts..."
    
    privilege_changes=$(mysql -h "$DB_HOST" -P "$DB_PORT" -u "$DB_USER" -p"$DB_PASS" \
        -e "SELECT COUNT(*) FROM mysql.general_log WHERE argument REGEXP 'GRANT|REVOKE|CREATE USER|DROP USER' AND event_time > DATE_SUB(NOW(), INTERVAL 1 HOUR);" \
        -s -N 2>/dev/null)
    
    if [ "$privilege_changes" -gt 0 ]; then
        alert_message="SECURITY ALERT: $privilege_changes privilege changes detected in the last hour"
        log_message "$alert_message"
        send_alert "$alert_message"
    fi
}

# Send alert notification
send_alert() {
    local message="$1"
    
    # Send email alert
    echo "$message" | mail -s "Database Security Alert - $(hostname)" "$ALERT_EMAIL"
    
    # Send to syslog
    logger -p local0.alert "$message"
    
    # Send to monitoring system (example: Prometheus Alertmanager)
    curl -X POST http://alertmanager:9093/api/v1/alerts \
        -H "Content-Type: application/json" \
        -d "[{
            \"labels\": {
                \"alertname\": \"DatabaseSecurityAlert\",
                \"severity\": \"critical\",
                \"instance\": \"$(hostname)\"
            },
            \"annotations\": {
                \"summary\": \"$message\"
            }
        }]" 2>/dev/null
}

# Generate security report
generate_security_report() {
    log_message "Generating security report..."
    
    report_file="/tmp/db_security_report_$(date +%Y%m%d_%H%M%S).txt"
    
    cat > "$report_file" << EOF
Database Security Report - $(date)
=====================================

System Information:
- Database Host: $DB_HOST:$DB_PORT
- Report Generated: $(date)

Security Metrics:
EOF
    
    # Add failed login statistics
    echo "- Failed Logins (last 24h): $(mysql -h "$DB_HOST" -P "$DB_PORT" -u "$DB_USER" -p"$DB_PASS" \
        -e "SELECT COUNT(*) FROM mysql.general_log WHERE command_type = 'Connect' AND argument LIKE '%Access denied%' AND event_time > DATE_SUB(NOW(), INTERVAL 24 HOUR);" \
        -s -N 2>/dev/null)" >> "$report_file"
    
    # Add connection statistics
    echo "- Current Connections: $(mysql -h "$DB_HOST" -P "$DB_PORT" -u "$DB_USER" -p"$DB_PASS" \
        -e "SHOW STATUS LIKE 'Threads_connected';" -s -N | cut -f2)" >> "$report_file"
    
    # Add query statistics
    echo "- Queries (last 24h): $(mysql -h "$DB_HOST" -P "$DB_PORT" -u "$DB_USER" -p"$DB_PASS" \
        -e "SELECT COUNT(*) FROM mysql.general_log WHERE event_time > DATE_SUB(NOW(), INTERVAL 24 HOUR);" \
        -s -N 2>/dev/null)" >> "$report_file"
    
    log_message "Security report generated: $report_file"
    
    # Email the report
    mail -s "Daily Database Security Report - $(hostname)" "$ALERT_EMAIL" < "$report_file"
}

# Main monitoring loop
main() {
    log_message "Starting database security monitoring..."
    
    while true; do
        check_failed_logins
        check_suspicious_queries
        check_long_queries
        check_connections
        check_privilege_escalation
        
        # Generate daily report at midnight
        if [ "$(date +%H%M)" = "0000" ]; then
            generate_security_report
        fi
        
        # Wait 5 minutes before next check
        sleep 300
    done
}

# Run monitoring if script is executed directly
if [ "${BASH_SOURCE[0]}" = "${0}" ]; then
    main "$@"
fi

Compliance and Governance

1. Compliance Framework Implementation

from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import json
import re
from datetime import datetime, timedelta

class ComplianceStandard(Enum):
    GDPR = "gdpr"
    HIPAA = "hipaa"
    PCI_DSS = "pci_dss"
    SOX = "sox"
    ISO27001 = "iso27001"

@dataclass
class ComplianceRule:
    rule_id: str
    standard: ComplianceStandard
    category: str
    description: str
    severity: str
    check_function: str
    remediation: str

class DatabaseComplianceManager:
    """Database compliance management and validation"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.logger = logging.getLogger(__name__)
        self.compliance_rules = self._load_compliance_rules()
        self.scan_results = {}
        
    def _load_compliance_rules(self) -> List[ComplianceRule]:
        """Load compliance rules for different standards"""
        rules = []
        
        # GDPR Rules
        rules.extend([
            ComplianceRule(
                rule_id="GDPR-001",
                standard=ComplianceStandard.GDPR,
                category="Data Protection",
                description="Personal data must be encrypted at rest",
                severity="high",
                check_function="check_encryption_at_rest",
                remediation="Enable transparent data encryption (TDE) or column-level encryption"
            ),
            ComplianceRule(
                rule_id="GDPR-002",
                standard=ComplianceStandard.GDPR,
                category="Access Control",
                description="Access to personal data must be logged and auditable",
                severity="high",
                check_function="check_audit_logging",
                remediation="Enable comprehensive audit logging for all data access"
            ),
            ComplianceRule(
                rule_id="GDPR-003",
                standard=ComplianceStandard.GDPR,
                category="Data Retention",
                description="Personal data retention policies must be enforced",
                severity="medium",
                check_function="check_data_retention_policies",
                remediation="Implement automated data retention and deletion policies"
            )
        ])
        
        # HIPAA Rules
        rules.extend([
            ComplianceRule(
                rule_id="HIPAA-001",
                standard=ComplianceStandard.HIPAA,
                category="Access Control",
                description="Unique user identification required",
                severity="high",
                check_function="check_unique_user_identification",
                remediation="Ensure each user has a unique identifier and disable shared accounts"
            ),
            ComplianceRule(
                rule_id="HIPAA-002",
                standard=ComplianceStandard.HIPAA,
                category="Audit Controls",
                description="Audit logs must capture access to PHI",
                severity="high",
                check_function="check_phi_audit_logging",
                remediation="Enable detailed audit logging for all PHI access"
            )
        ])
        
        # PCI DSS Rules
        rules.extend([
            ComplianceRule(
                rule_id="PCI-001",
                standard=ComplianceStandard.PCI_DSS,
                category="Data Protection",
                description="Cardholder data must be encrypted",
                severity="critical",
                check_function="check_cardholder_data_encryption",
                remediation="Encrypt all stored cardholder data using strong cryptography"
            ),
            ComplianceRule(
                rule_id="PCI-002",
                standard=ComplianceStandard.PCI_DSS,
                category="Access Control",
                description="Restrict access to cardholder data by business need-to-know",
                severity="high",
                check_function="check_cardholder_data_access",
                remediation="Implement role-based access control for cardholder data"
            )
        ])
        
        return rules
    
    def run_compliance_scan(self, standards: List[ComplianceStandard] = None) -> Dict[str, Any]:
        """Run comprehensive compliance scan"""
        try:
            if standards is None:
                standards = [standard for standard in ComplianceStandard]
            
            scan_results = {
                'scan_id': f"scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
                'timestamp': datetime.now().isoformat(),
                'standards_checked': [s.value for s in standards],
                'results': {},
                'summary': {
                    'total_rules': 0,
                    'passed': 0,
                    'failed': 0,
                    'warnings': 0,
                    'compliance_score': 0
                }
            }
            
            for standard in standards:
                standard_results = self._scan_standard(standard)
                scan_results['results'][standard.value] = standard_results
                
                # Update summary
                scan_results['summary']['total_rules'] += len(standard_results)
                scan_results['summary']['passed'] += sum(1 for r in standard_results if r['status'] == 'passed')
                scan_results['summary']['failed'] += sum(1 for r in standard_results if r['status'] == 'failed')
                scan_results['summary']['warnings'] += sum(1 for r in standard_results if r['status'] == 'warning')
            
            # Calculate compliance score
            total_rules = scan_results['summary']['total_rules']
            if total_rules > 0:
                scan_results['summary']['compliance_score'] = (
                    scan_results['summary']['passed'] / total_rules * 100
                )
            
            self.scan_results = scan_results
            return scan_results
            
        except Exception as e:
            self.logger.error(f"Compliance scan failed: {e}")
            return {}
    
    def _scan_standard(self, standard: ComplianceStandard) -> List[Dict[str, Any]]:
        """Scan for specific compliance standard"""
        results = []
        
        for rule in self.compliance_rules:
            if rule.standard == standard:
                try:
                    # Execute check function
                    check_method = getattr(self, rule.check_function, None)
                    if check_method:
                        check_result = check_method()
                        status = 'passed' if check_result else 'failed'
                    else:
                        status = 'warning'
                        check_result = False
                    
                    results.append({
                        'rule_id': rule.rule_id,
                        'description': rule.description,
                        'category': rule.category,
                        'severity': rule.severity,
                        'status': status,
                        'details': check_result,
                        'remediation': rule.remediation if status == 'failed' else None
                    })
                    
                except Exception as e:
                    self.logger.error(f"Check failed for rule {rule.rule_id}: {e}")
                    results.append({
                        'rule_id': rule.rule_id,
                        'description': rule.description,
                        'category': rule.category,
                        'severity': rule.severity,
                        'status': 'error',
                        'details': str(e),
                        'remediation': rule.remediation
                    })
        
        return results
    
    # Compliance check functions
    def check_encryption_at_rest(self) -> bool:
        """Check if data encryption at rest is enabled"""
        # Implementation depends on database type
        return True  # Placeholder
    
    def check_audit_logging(self) -> bool:
        """Check if comprehensive audit logging is enabled"""
        # Implementation depends on database type
        return True  # Placeholder
    
    def check_data_retention_policies(self) -> bool:
        """Check if data retention policies are implemented"""
        # Check for automated data retention mechanisms
        return False  # Placeholder
    
    def check_unique_user_identification(self) -> bool:
        """Check for unique user identification"""
        # Verify no shared accounts exist
        return True  # Placeholder
    
    def check_phi_audit_logging(self) -> bool:
        """Check PHI access audit logging"""
        # Verify PHI access is logged
        return True  # Placeholder
    
    def check_cardholder_data_encryption(self) -> bool:
        """Check cardholder data encryption"""
        # Verify cardholder data is encrypted
        return False  # Placeholder
    
    def check_cardholder_data_access(self) -> bool:
        """Check cardholder data access controls"""
        # Verify access controls for cardholder data
        return True  # Placeholder
    
    def generate_compliance_report(self, output_format: str = 'json') -> str:
        """Generate compliance report"""
        try:
            if not self.scan_results:
                raise ValueError("No scan results available. Run compliance scan first.")
            
            if output_format == 'json':
                return json.dumps(self.scan_results, indent=2)
            
            elif output_format == 'html':
                return self._generate_html_report()
            
            elif output_format == 'csv':
                return self._generate_csv_report()
            
            else:
                raise ValueError(f"Unsupported output format: {output_format}")
                
        except Exception as e:
            self.logger.error(f"Report generation failed: {e}")
            return ""
    
    def _generate_html_report(self) -> str:
        """Generate HTML compliance report"""
        html_template = """
        <!DOCTYPE html>
        <html>
        <head>
            <title>Database Compliance Report</title>
            <style>
                body { font-family: Arial, sans-serif; margin: 20px; }
                .header { background-color: #f0f0f0; padding: 20px; border-radius: 5px; }
                .summary { margin: 20px 0; }
                .standard { margin: 20px 0; border: 1px solid #ddd; border-radius: 5px; }
                .standard-header { background-color: #e0e0e0; padding: 10px; font-weight: bold; }
                .rule { padding: 10px; border-bottom: 1px solid #eee; }
                .passed { color: green; }
                .failed { color: red; }
                .warning { color: orange; }
                .error { color: purple; }
            </style>
        </head>
        <body>
            <div class="header">
                <h1>Database Compliance Report</h1>
                <p>Generated: {timestamp}</p>
                <p>Scan ID: {scan_id}</p>
            </div>
            
            <div class="summary">
                <h2>Summary</h2>
                <p>Compliance Score: {compliance_score:.1f}%</p>
                <p>Total Rules: {total_rules}</p>
                <p>Passed: {passed} | Failed: {failed} | Warnings: {warnings}</p>
            </div>
            
            {standards_content}
        </body>
        </html>
        """
        
        # Generate content for each standard
        standards_content = ""
        for standard, results in self.scan_results['results'].items():
            standards_content += f'<div class="standard">'
            standards_content += f'<div class="standard-header">{standard.upper()}</div>'
            
            for rule in results:
                status_class = rule['status']
                standards_content += f'''
                <div class="rule">
                    <strong class="{status_class}">[{rule['status'].upper()}]</strong>
                    {rule['rule_id']}: {rule['description']}
                    {f"<br><em>Remediation: {rule['remediation']}</em>" if rule.get('remediation') else ""}
                </div>
                '''
            
            standards_content += '</div>'
        
        return html_template.format(
            timestamp=self.scan_results['timestamp'],
            scan_id=self.scan_results['scan_id'],
            compliance_score=self.scan_results['summary']['compliance_score'],
            total_rules=self.scan_results['summary']['total_rules'],
            passed=self.scan_results['summary']['passed'],
            failed=self.scan_results['summary']['failed'],
            warnings=self.scan_results['summary']['warnings'],
            standards_content=standards_content
        )

## Security Testing and Validation

### 1. Automated Security Testing

```python
import subprocess
import socket
import ssl
import requests
from typing import Dict, List, Any, Tuple
import time
import random
import string

class DatabaseSecurityTester:
    """Automated database security testing framework"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.logger = logging.getLogger(__name__)
        self.test_results = {}
        
    def run_security_tests(self) -> Dict[str, Any]:
        """Run comprehensive security test suite"""
        try:
            test_results = {
                'test_id': f"test_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
                'timestamp': datetime.now().isoformat(),
                'tests': {},
                'summary': {
                    'total_tests': 0,
                    'passed': 0,
                    'failed': 0,
                    'security_score': 0
                }
            }
            
            # Run different test categories
            test_categories = [
                ('authentication', self.test_authentication_security),
                ('authorization', self.test_authorization_controls),
                ('encryption', self.test_encryption_implementation),
                ('injection', self.test_sql_injection_protection),
                ('network', self.test_network_security),
                ('configuration', self.test_security_configuration)
            ]
            
            for category, test_function in test_categories:
                category_results = test_function()
                test_results['tests'][category] = category_results
                
                # Update summary
                test_results['summary']['total_tests'] += len(category_results)
                test_results['summary']['passed'] += sum(1 for r in category_results if r['status'] == 'passed')
                test_results['summary']['failed'] += sum(1 for r in category_results if r['status'] == 'failed')
            
            # Calculate security score
            total_tests = test_results['summary']['total_tests']
            if total_tests > 0:
                test_results['summary']['security_score'] = (
                    test_results['summary']['passed'] / total_tests * 100
                )
            
            self.test_results = test_results
            return test_results
            
        except Exception as e:
            self.logger.error(f"Security testing failed: {e}")
            return {}
    
    def test_authentication_security(self) -> List[Dict[str, Any]]:
        """Test authentication security controls"""
        tests = []
        
        # Test 1: Weak password policy
        tests.append(self._test_weak_passwords())
        
        # Test 2: Account lockout policy
        tests.append(self._test_account_lockout())
        
        # Test 3: Multi-factor authentication
        tests.append(self._test_mfa_enforcement())
        
        # Test 4: Session management
        tests.append(self._test_session_security())
        
        return tests
    
    def test_authorization_controls(self) -> List[Dict[str, Any]]:
        """Test authorization and access controls"""
        tests = []
        
        # Test 1: Privilege escalation
        tests.append(self._test_privilege_escalation())
        
        # Test 2: Role-based access control
        tests.append(self._test_rbac_implementation())
        
        # Test 3: Data access controls
        tests.append(self._test_data_access_controls())
        
        return tests
    
    def test_encryption_implementation(self) -> List[Dict[str, Any]]:
        """Test encryption implementation"""
        tests = []
        
        # Test 1: Data at rest encryption
        tests.append(self._test_encryption_at_rest())
        
        # Test 2: Data in transit encryption
        tests.append(self._test_encryption_in_transit())
        
        # Test 3: Key management
        tests.append(self._test_key_management())
        
        return tests
    
    def test_sql_injection_protection(self) -> List[Dict[str, Any]]:
        """Test SQL injection protection"""
        tests = []
        
        # Test various SQL injection patterns
        injection_patterns = [
            "' OR '1'='1",
            "'; DROP TABLE users; --",
            "' UNION SELECT * FROM information_schema.tables --",
            "' AND 1=1 --",
            "admin'--",
            "' OR 1=1#"
        ]
        
        for pattern in injection_patterns:
            tests.append(self._test_sql_injection_pattern(pattern))
        
        return tests
    
    def test_network_security(self) -> List[Dict[str, Any]]:
        """Test network security controls"""
        tests = []
        
        # Test 1: SSL/TLS configuration
        tests.append(self._test_ssl_configuration())
        
        # Test 2: Port security
        tests.append(self._test_port_security())
        
        # Test 3: Firewall rules
        tests.append(self._test_firewall_rules())
        
        return tests
    
    def test_security_configuration(self) -> List[Dict[str, Any]]:
        """Test security configuration"""
        tests = []
        
        # Test 1: Default accounts
        tests.append(self._test_default_accounts())
        
        # Test 2: Security parameters
        tests.append(self._test_security_parameters())
        
        # Test 3: Audit configuration
        tests.append(self._test_audit_configuration())
        
        return tests
    
    def _test_weak_passwords(self) -> Dict[str, Any]:
        """Test for weak password policies"""
        try:
            # Attempt to create user with weak password
            weak_passwords = ['123456', 'password', 'admin', 'test']
            
            for password in weak_passwords:
                # Test password creation (implementation depends on database)
                # This is a simulation
                if len(password) < 8:
                    return {
                        'test_name': 'Weak Password Policy',
                        'status': 'failed',
                        'details': f'Weak password "{password}" was accepted',
                        'recommendation': 'Implement strong password policy with minimum length, complexity requirements'
                    }
            
            return {
                'test_name': 'Weak Password Policy',
                'status': 'passed',
                'details': 'Strong password policy is enforced',
                'recommendation': None
            }
            
        except Exception as e:
            return {
                'test_name': 'Weak Password Policy',
                'status': 'error',
                'details': str(e),
                'recommendation': 'Review password policy configuration'
            }
    
    def _test_account_lockout(self) -> Dict[str, Any]:
        """Test account lockout policy"""
        try:
            # Simulate multiple failed login attempts
            # Implementation depends on database and authentication system
            
            return {
                'test_name': 'Account Lockout Policy',
                'status': 'passed',
                'details': 'Account lockout policy is properly configured',
                'recommendation': None
            }
            
        except Exception as e:
            return {
                'test_name': 'Account Lockout Policy',
                'status': 'error',
                'details': str(e),
                'recommendation': 'Configure account lockout after failed attempts'
            }
    
    def _test_mfa_enforcement(self) -> Dict[str, Any]:
        """Test multi-factor authentication enforcement"""
        try:
            # Check if MFA is enforced for privileged accounts
            # Implementation depends on authentication system
            
            return {
                'test_name': 'Multi-Factor Authentication',
                'status': 'warning',
                'details': 'MFA enforcement could not be verified',
                'recommendation': 'Implement MFA for all privileged accounts'
            }
            
        except Exception as e:
            return {
                'test_name': 'Multi-Factor Authentication',
                'status': 'error',
                'details': str(e),
                'recommendation': 'Review MFA configuration'
            }
    
    def _test_session_security(self) -> Dict[str, Any]:
        """Test session security controls"""
        try:
            # Test session timeout, secure cookies, etc.
            # Implementation depends on application framework
            
            return {
                'test_name': 'Session Security',
                'status': 'passed',
                'details': 'Session security controls are properly implemented',
                'recommendation': None
            }
            
        except Exception as e:
            return {
                'test_name': 'Session Security',
                'status': 'error',
                'details': str(e),
                'recommendation': 'Review session management configuration'
            }
    
    def _test_privilege_escalation(self) -> Dict[str, Any]:
        """Test for privilege escalation vulnerabilities"""
        try:
            # Test various privilege escalation scenarios
            # Implementation depends on database type and configuration
            
            return {
                'test_name': 'Privilege Escalation',
                'status': 'passed',
                'details': 'No privilege escalation vulnerabilities detected',
                'recommendation': None
            }
            
        except Exception as e:
            return {
                'test_name': 'Privilege Escalation',
                'status': 'error',
                'details': str(e),
                'recommendation': 'Review user privileges and access controls'
            }
    
    def _test_rbac_implementation(self) -> Dict[str, Any]:
        """Test role-based access control implementation"""
        try:
            # Verify RBAC is properly implemented
            # Check role assignments, permissions, etc.
            
            return {
                'test_name': 'Role-Based Access Control',
                'status': 'passed',
                'details': 'RBAC is properly implemented',
                'recommendation': None
            }
            
        except Exception as e:
            return {
                'test_name': 'Role-Based Access Control',
                'status': 'error',
                'details': str(e),
                'recommendation': 'Implement proper RBAC system'
            }
    
    def _test_data_access_controls(self) -> Dict[str, Any]:
        """Test data access controls"""
        try:
            # Test row-level security, column-level security, etc.
            # Implementation depends on database features
            
            return {
                'test_name': 'Data Access Controls',
                'status': 'warning',
                'details': 'Data access controls need review',
                'recommendation': 'Implement row-level and column-level security'
            }
            
        except Exception as e:
            return {
                'test_name': 'Data Access Controls',
                'status': 'error',
                'details': str(e),
                'recommendation': 'Review data access control implementation'
            }
    
    def _test_encryption_at_rest(self) -> Dict[str, Any]:
        """Test encryption at rest"""
        try:
            # Check if data is encrypted at rest
            # Implementation depends on database and storage configuration
            
            return {
                'test_name': 'Encryption at Rest',
                'status': 'passed',
                'details': 'Data encryption at rest is enabled',
                'recommendation': None
            }
            
        except Exception as e:
            return {
                'test_name': 'Encryption at Rest',
                'status': 'error',
                'details': str(e),
                'recommendation': 'Enable transparent data encryption (TDE)'
            }
    
    def _test_encryption_in_transit(self) -> Dict[str, Any]:
        """Test encryption in transit"""
        try:
            # Check SSL/TLS configuration
            host = self.config.get('host', 'localhost')
            port = self.config.get('port', 3306)
            
            # Test SSL connection
            context = ssl.create_default_context()
            with socket.create_connection((host, port), timeout=10) as sock:
                with context.wrap_socket(sock, server_hostname=host) as ssock:
                    cipher = ssock.cipher()
                    if cipher:
                        return {
                            'test_name': 'Encryption in Transit',
                            'status': 'passed',
                            'details': f'SSL/TLS enabled with cipher: {cipher[0]}',
                            'recommendation': None
                        }
            
            return {
                'test_name': 'Encryption in Transit',
                'status': 'failed',
                'details': 'SSL/TLS not properly configured',
                'recommendation': 'Enable SSL/TLS for all database connections'
            }
            
        except Exception as e:
            return {
                'test_name': 'Encryption in Transit',
                'status': 'error',
                'details': str(e),
                'recommendation': 'Configure SSL/TLS for database connections'
            }
    
    def _test_key_management(self) -> Dict[str, Any]:
        """Test key management practices"""
        try:
            # Test key rotation, storage, access controls
            # Implementation depends on key management system
            
            return {
                'test_name': 'Key Management',
                'status': 'warning',
                'details': 'Key management practices need review',
                'recommendation': 'Implement proper key management with rotation and secure storage'
            }
            
        except Exception as e:
            return {
                'test_name': 'Key Management',
                'status': 'error',
                'details': str(e),
                'recommendation': 'Review key management implementation'
            }
    
    def _test_sql_injection_pattern(self, pattern: str) -> Dict[str, Any]:
        """Test specific SQL injection pattern"""
        try:
            # Test SQL injection pattern
            # This would typically involve sending the pattern to the application
            # and checking if it's properly sanitized
            
            test_name = f"SQL Injection: {pattern[:20]}..."
            
            # Simulate testing (in real implementation, this would test actual endpoints)
            if "DROP" in pattern.upper():
                return {
                    'test_name': test_name,
                    'status': 'passed',
                    'details': 'SQL injection attempt was blocked',
                    'recommendation': None
                }
            
            return {
                'test_name': test_name,
                'status': 'passed',
                'details': 'SQL injection protection is working',
                'recommendation': None
            }
            
        except Exception as e:
            return {
                'test_name': f"SQL Injection: {pattern[:20]}...",
                'status': 'error',
                'details': str(e),
                'recommendation': 'Implement proper input validation and parameterized queries'
            }
    
    def _test_ssl_configuration(self) -> Dict[str, Any]:
        """Test SSL/TLS configuration"""
        try:
            # Test SSL configuration strength
            host = self.config.get('host', 'localhost')
            port = self.config.get('ssl_port', 443)
            
            context = ssl.create_default_context()
            with socket.create_connection((host, port), timeout=10) as sock:
                with context.wrap_socket(sock, server_hostname=host) as ssock:
                    protocol = ssock.version()
                    cipher = ssock.cipher()
                    
                    # Check for strong protocols and ciphers
                    if protocol in ['TLSv1.2', 'TLSv1.3'] and cipher:
                        return {
                            'test_name': 'SSL/TLS Configuration',
                            'status': 'passed',
                            'details': f'Strong SSL/TLS configuration: {protocol}, {cipher[0]}',
                            'recommendation': None
                        }
                    else:
                        return {
                            'test_name': 'SSL/TLS Configuration',
                            'status': 'failed',
                            'details': f'Weak SSL/TLS configuration: {protocol}',
                            'recommendation': 'Upgrade to TLS 1.2 or higher with strong ciphers'
                        }
            
        except Exception as e:
            return {
                'test_name': 'SSL/TLS Configuration',
                'status': 'error',
                'details': str(e),
                'recommendation': 'Configure SSL/TLS properly'
            }
    
    def _test_port_security(self) -> Dict[str, Any]:
        """Test port security"""
        try:
            # Test for unnecessary open ports
            host = self.config.get('host', 'localhost')
            common_ports = [22, 23, 80, 443, 3306, 5432, 1521, 1433]
            
            open_ports = []
            for port in common_ports:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.settimeout(1)
                result = sock.connect_ex((host, port))
                if result == 0:
                    open_ports.append(port)
                sock.close()
            
            # Check if only necessary ports are open
            necessary_ports = self.config.get('necessary_ports', [])
            unnecessary_ports = [p for p in open_ports if p not in necessary_ports]
            
            if unnecessary_ports:
                return {
                    'test_name': 'Port Security',
                    'status': 'warning',
                    'details': f'Unnecessary ports open: {unnecessary_ports}',
                    'recommendation': 'Close unnecessary ports and use firewall rules'
                }
            else:
                return {
                    'test_name': 'Port Security',
                    'status': 'passed',
                    'details': 'Only necessary ports are open',
                    'recommendation': None
                }
            
        except Exception as e:
            return {
                'test_name': 'Port Security',
                'status': 'error',
                'details': str(e),
                'recommendation': 'Review port configuration and firewall rules'
            }
    
    def _test_firewall_rules(self) -> Dict[str, Any]:
        """Test firewall rules"""
        try:
            # Test firewall configuration
            # This would typically involve checking iptables, ufw, or cloud security groups
            
            return {
                'test_name': 'Firewall Rules',
                'status': 'warning',
                'details': 'Firewall configuration needs manual review',
                'recommendation': 'Ensure proper firewall rules are in place'
            }
            
        except Exception as e:
            return {
                'test_name': 'Firewall Rules',
                'status': 'error',
                'details': str(e),
                'recommendation': 'Configure proper firewall rules'
            }
    
    def _test_default_accounts(self) -> Dict[str, Any]:
        """Test for default accounts"""
        try:
            # Check for default database accounts
            default_accounts = ['root', 'admin', 'sa', 'postgres', 'mysql']
            
            # This would typically query the database for user accounts
            # Implementation depends on database type
            
            return {
                'test_name': 'Default Accounts',
                'status': 'passed',
                'details': 'No default accounts with default passwords found',
                'recommendation': None
            }
            
        except Exception as e:
            return {
                'test_name': 'Default Accounts',
                'status': 'error',
                'details': str(e),
                'recommendation': 'Remove or secure default accounts'
            }
    
    def _test_security_parameters(self) -> Dict[str, Any]:
        """Test security parameters"""
        try:
            # Check database security parameters
            # Implementation depends on database type
            
            return {
                'test_name': 'Security Parameters',
                'status': 'warning',
                'details': 'Security parameters need review',
                'recommendation': 'Review and harden database security parameters'
            }
            
        except Exception as e:
            return {
                'test_name': 'Security Parameters',
                'status': 'error',
                'details': str(e),
                'recommendation': 'Configure security parameters properly'
            }
    
    def _test_audit_configuration(self) -> Dict[str, Any]:
        """Test audit configuration"""
        try:
            # Check audit logging configuration
            # Implementation depends on database type
            
            return {
                'test_name': 'Audit Configuration',
                'status': 'passed',
                'details': 'Audit logging is properly configured',
                'recommendation': None
            }
            
        except Exception as e:
            return {
                'test_name': 'Audit Configuration',
                'status': 'error',
                'details': str(e),
                'recommendation': 'Configure comprehensive audit logging'
            }

# Example usage
def main():
    config = {
        'host': 'localhost',
        'port': 3306,
        'ssl_port': 443,
        'necessary_ports': [22, 3306, 443],
        'database_type': 'mysql'
    }
    
    # Initialize security tester
    security_tester = DatabaseSecurityTester(config)
    
    # Run security tests
    test_results = security_tester.run_security_tests()
    
    print("Database Security Test Results:")
    print(f"Security Score: {test_results['summary']['security_score']:.1f}%")
    print(f"Tests Passed: {test_results['summary']['passed']}")
    print(f"Tests Failed: {test_results['summary']['failed']}")
    
    # Initialize compliance manager
    compliance_manager = DatabaseComplianceManager(config)
    
    # Run compliance scan
    compliance_results = compliance_manager.run_compliance_scan()
    
    print("\nCompliance Scan Results:")
    print(f"Compliance Score: {compliance_results['summary']['compliance_score']:.1f}%")
    print(f"Rules Passed: {compliance_results['summary']['passed']}")
    print(f"Rules Failed: {compliance_results['summary']['failed']}")
    
    # Generate reports
    security_report = json.dumps(test_results, indent=2)
    compliance_report = compliance_manager.generate_compliance_report('html')
    
    print("\nReports generated successfully!")

if __name__ == "__main__":
    main()

Summary

This comprehensive database security and access control guide covers essential aspects of protecting database systems in modern environments. The implementation includes:

Core Security Elements

  1. Multi-Layer Security Architecture: Comprehensive security model covering application, authentication, authorization, network, database, storage, and physical layers.

  2. Authentication and Authorization: Advanced multi-factor authentication system with TOTP, LDAP integration, risk-based authentication, and comprehensive RBAC implementation.

  3. Data Encryption: Complete encryption management system supporting multiple algorithms, key rotation, column-level encryption, and format-preserving encryption.

  4. Access Control Implementation: Database-level and application-level access controls with row-level security, security definer functions, session management, and rate limiting.

  5. Auditing and Monitoring: Comprehensive audit system with real-time monitoring, automated triggers, security event detection, and alert generation.

  6. Compliance and Governance: Framework supporting GDPR, HIPAA, PCI DSS, SOX, and ISO27001 with automated compliance scanning and reporting.

  7. Security Testing and Validation: Automated security testing framework covering authentication, authorization, encryption, injection protection, network security, and configuration validation.

Best Practices

  1. Defense in Depth: Implement multiple layers of security controls
  2. Principle of Least Privilege: Grant minimum necessary access rights
  3. Regular Security Assessments: Conduct periodic security scans and compliance checks
  4. Continuous Monitoring: Implement real-time security monitoring and alerting
  5. Incident Response: Maintain documented incident response procedures
  6. Security Training: Provide regular security awareness training
  7. Documentation: Maintain comprehensive security documentation and procedures

This guide provides a solid foundation for implementing robust database security measures that protect sensitive data while maintaining compliance with industry standards and regulations.

分享文章