Database Security and Access Control: A Comprehensive Guide to Protecting Your Data
Database security is a critical aspect of modern data management, encompassing multiple layers of protection to safeguard sensitive information from unauthorized access, data breaches, and malicious attacks. This comprehensive guide explores essential security strategies, implementation techniques, and best practices for securing database systems.
Table of Contents
- Security Architecture Overview
- Authentication and Authorization
- Data Encryption
- Access Control Implementation
- Auditing and Monitoring
- Compliance and Governance
- Security Testing and Validation
Security Architecture Overview
1. Multi-Layer Security Model
graph TB
A[Application Layer] --> B[Authentication Layer]
B --> C[Authorization Layer]
C --> D[Network Security Layer]
D --> E[Database Security Layer]
E --> F[Storage Encryption Layer]
F --> G[Physical Security Layer]
subgraph "Security Controls"
H[Access Control]
I[Encryption]
J[Auditing]
K[Monitoring]
end
B -.-> H
C -.-> H
E -.-> I
F -.-> I
D -.-> J
E -.-> J
A -.-> K
E -.-> K
2. Security Configuration Framework
# Database Security Configuration
database_security:
global_settings:
security_level: "high"
compliance_standards: ["SOX", "GDPR", "HIPAA", "PCI-DSS"]
audit_enabled: true
encryption_required: true
authentication:
methods:
- type: "multi_factor"
providers: ["ldap", "oauth2", "saml"]
- type: "certificate"
ca_validation: true
- type: "kerberos"
realm: "COMPANY.COM"
password_policy:
min_length: 12
complexity_required: true
expiration_days: 90
history_count: 12
lockout_attempts: 3
lockout_duration: 30
authorization:
rbac_enabled: true
principle_of_least_privilege: true
role_hierarchy: true
dynamic_permissions: true
encryption:
at_rest:
algorithm: "AES-256"
key_management: "external_hsm"
transparent_encryption: true
in_transit:
tls_version: "1.3"
certificate_validation: true
cipher_suites: ["ECDHE-RSA-AES256-GCM-SHA384"]
column_level:
sensitive_data_types: ["ssn", "credit_card", "email", "phone"]
format_preserving: true
network_security:
firewall_rules:
- source: "application_tier"
destination: "database_tier"
ports: [3306, 5432, 1521]
protocol: "tcp"
vpc_isolation: true
private_subnets: true
bastion_host_required: true
monitoring:
real_time_alerts: true
anomaly_detection: true
failed_login_tracking: true
privilege_escalation_detection: true
retention:
audit_logs: "7_years"
security_events: "2_years"
access_logs: "1_year"
Authentication and Authorization
1. Multi-Factor Authentication System
#!/usr/bin/env python3
# src/security/mfa_auth.py
import hashlib
import hmac
import time
import base64
import qrcode
import logging
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
from enum import Enum
import secrets
import pyotp
import ldap3
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
class AuthMethod(Enum):
PASSWORD = "password"
TOTP = "totp"
SMS = "sms"
EMAIL = "email"
CERTIFICATE = "certificate"
BIOMETRIC = "biometric"
@dataclass
class AuthResult:
success: bool
user_id: Optional[str]
session_token: Optional[str]
required_methods: List[AuthMethod]
error_message: Optional[str]
risk_score: float
@dataclass
class UserCredentials:
user_id: str
password_hash: str
salt: str
totp_secret: Optional[str]
certificate_thumbprint: Optional[str]
failed_attempts: int
locked_until: Optional[float]
last_login: Optional[float]
class DatabaseAuthenticator:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = self._setup_logging()
self.encryption_key = self._derive_encryption_key()
self.fernet = Fernet(self.encryption_key)
# Initialize LDAP connection if configured
self.ldap_connection = None
if config.get('ldap', {}).get('enabled'):
self._init_ldap()
def _setup_logging(self) -> logging.Logger:
logger = logging.getLogger('DatabaseAuth')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def _derive_encryption_key(self) -> bytes:
"""Derive encryption key from master password"""
master_password = self.config.get('master_password', 'default_key').encode()
salt = self.config.get('encryption_salt', 'default_salt').encode()
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
return base64.urlsafe_b64encode(kdf.derive(master_password))
def _init_ldap(self):
"""Initialize LDAP connection"""
ldap_config = self.config['ldap']
try:
server = ldap3.Server(
ldap_config['server'],
port=ldap_config.get('port', 389),
use_ssl=ldap_config.get('use_ssl', False)
)
self.ldap_connection = ldap3.Connection(
server,
user=ldap_config['bind_dn'],
password=ldap_config['bind_password'],
auto_bind=True
)
self.logger.info("LDAP connection established")
except Exception as e:
self.logger.error(f"Failed to connect to LDAP: {e}")
def hash_password(self, password: str, salt: Optional[str] = None) -> Tuple[str, str]:
"""Hash password with salt"""
if salt is None:
salt = secrets.token_hex(32)
# Use PBKDF2 with SHA-256
password_hash = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt.encode('utf-8'),
100000 # iterations
)
return base64.b64encode(password_hash).decode('utf-8'), salt
def verify_password(self, password: str, stored_hash: str, salt: str) -> bool:
"""Verify password against stored hash"""
computed_hash, _ = self.hash_password(password, salt)
return hmac.compare_digest(computed_hash, stored_hash)
def generate_totp_secret(self, user_id: str) -> Tuple[str, str]:
"""Generate TOTP secret and QR code"""
secret = pyotp.random_base32()
# Create TOTP URI
totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
name=user_id,
issuer_name=self.config.get('app_name', 'Database System')
)
# Generate QR code
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(totp_uri)
qr.make(fit=True)
return secret, totp_uri
def verify_totp(self, secret: str, token: str) -> bool:
"""Verify TOTP token"""
totp = pyotp.TOTP(secret)
return totp.verify(token, valid_window=1) # Allow 30-second window
def authenticate_ldap(self, username: str, password: str) -> bool:
"""Authenticate against LDAP"""
if not self.ldap_connection:
return False
try:
user_dn = f"uid={username},{self.config['ldap']['user_base']}"
return self.ldap_connection.authenticate(user_dn, password)
except Exception as e:
self.logger.error(f"LDAP authentication failed: {e}")
return False
def calculate_risk_score(self, user_id: str, request_info: Dict[str, Any]) -> float:
"""Calculate authentication risk score"""
risk_score = 0.0
# Check for unusual login time
current_hour = time.localtime().tm_hour
if current_hour < 6 or current_hour > 22:
risk_score += 0.2
# Check for new IP address
if request_info.get('ip_address') not in self._get_known_ips(user_id):
risk_score += 0.3
# Check for new device/user agent
if request_info.get('user_agent') not in self._get_known_devices(user_id):
risk_score += 0.2
# Check for geographic anomaly
if self._is_geographic_anomaly(user_id, request_info.get('ip_address')):
risk_score += 0.4
# Check for recent failed attempts
failed_attempts = self._get_recent_failed_attempts(user_id)
if failed_attempts > 0:
risk_score += min(failed_attempts * 0.1, 0.3)
return min(risk_score, 1.0)
def _get_known_ips(self, user_id: str) -> List[str]:
"""Get list of known IP addresses for user"""
# This would typically query a database
return []
def _get_known_devices(self, user_id: str) -> List[str]:
"""Get list of known devices for user"""
# This would typically query a database
return []
def _is_geographic_anomaly(self, user_id: str, ip_address: str) -> bool:
"""Check if login location is anomalous"""
# This would typically use IP geolocation services
return False
def _get_recent_failed_attempts(self, user_id: str) -> int:
"""Get count of recent failed login attempts"""
# This would typically query audit logs
return 0
def authenticate(self, user_id: str, credentials: Dict[str, Any],
request_info: Dict[str, Any]) -> AuthResult:
"""Main authentication method"""
try:
# Load user credentials
user_creds = self._load_user_credentials(user_id)
if not user_creds:
return AuthResult(
success=False,
user_id=None,
session_token=None,
required_methods=[],
error_message="User not found",
risk_score=1.0
)
# Check if account is locked
if user_creds.locked_until and time.time() < user_creds.locked_until:
return AuthResult(
success=False,
user_id=None,
session_token=None,
required_methods=[],
error_message="Account locked",
risk_score=1.0
)
# Calculate risk score
risk_score = self.calculate_risk_score(user_id, request_info)
# Determine required authentication methods based on risk
required_methods = self._determine_required_methods(risk_score)
# Verify provided credentials
auth_success = True
# Password authentication
if AuthMethod.PASSWORD in required_methods:
password = credentials.get('password')
if not password or not self.verify_password(
password, user_creds.password_hash, user_creds.salt
):
auth_success = False
# TOTP authentication
if AuthMethod.TOTP in required_methods:
totp_token = credentials.get('totp_token')
if not totp_token or not self.verify_totp(
user_creds.totp_secret, totp_token
):
auth_success = False
# LDAP authentication
if self.config.get('ldap', {}).get('enabled'):
password = credentials.get('password')
if password and not self.authenticate_ldap(user_id, password):
auth_success = False
if auth_success:
# Generate session token
session_token = self._generate_session_token(user_id)
# Update last login time
self._update_last_login(user_id)
# Reset failed attempts
self._reset_failed_attempts(user_id)
self.logger.info(f"Successful authentication for user: {user_id}")
return AuthResult(
success=True,
user_id=user_id,
session_token=session_token,
required_methods=required_methods,
error_message=None,
risk_score=risk_score
)
else:
# Increment failed attempts
self._increment_failed_attempts(user_id)
self.logger.warning(f"Failed authentication for user: {user_id}")
return AuthResult(
success=False,
user_id=None,
session_token=None,
required_methods=required_methods,
error_message="Authentication failed",
risk_score=risk_score
)
except Exception as e:
self.logger.error(f"Authentication error: {e}")
return AuthResult(
success=False,
user_id=None,
session_token=None,
required_methods=[],
error_message="Internal error",
risk_score=1.0
)
def _determine_required_methods(self, risk_score: float) -> List[AuthMethod]:
"""Determine required authentication methods based on risk score"""
methods = [AuthMethod.PASSWORD]
if risk_score > 0.3:
methods.append(AuthMethod.TOTP)
if risk_score > 0.7:
methods.append(AuthMethod.EMAIL)
return methods
def _load_user_credentials(self, user_id: str) -> Optional[UserCredentials]:
"""Load user credentials from database"""
# This would typically query the database
# For demo purposes, return a sample user
if user_id == "demo_user":
password_hash, salt = self.hash_password("demo_password")
return UserCredentials(
user_id=user_id,
password_hash=password_hash,
salt=salt,
totp_secret="JBSWY3DPEHPK3PXP",
certificate_thumbprint=None,
failed_attempts=0,
locked_until=None,
last_login=None
)
return None
def _generate_session_token(self, user_id: str) -> str:
"""Generate secure session token"""
token_data = {
'user_id': user_id,
'timestamp': time.time(),
'random': secrets.token_hex(16)
}
token_json = str(token_data).encode()
encrypted_token = self.fernet.encrypt(token_json)
return base64.urlsafe_b64encode(encrypted_token).decode()
def _update_last_login(self, user_id: str):
"""Update last login timestamp"""
# This would typically update the database
pass
def _reset_failed_attempts(self, user_id: str):
"""Reset failed login attempts counter"""
# This would typically update the database
pass
def _increment_failed_attempts(self, user_id: str):
"""Increment failed login attempts counter"""
# This would typically update the database
pass
# Example usage
def main():
config = {
'master_password': 'secure_master_key_2024',
'encryption_salt': 'database_security_salt',
'app_name': 'Secure Database System',
'ldap': {
'enabled': False,
'server': 'ldap.company.com',
'port': 389,
'bind_dn': 'cn=admin,dc=company,dc=com',
'bind_password': 'admin_password',
'user_base': 'ou=users,dc=company,dc=com'
}
}
authenticator = DatabaseAuthenticator(config)
# Test authentication
credentials = {
'password': 'demo_password',
'totp_token': '123456'
}
request_info = {
'ip_address': '192.168.1.100',
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
'timestamp': time.time()
}
result = authenticator.authenticate('demo_user', credentials, request_info)
print(f"Authentication Result:")
print(f" Success: {result.success}")
print(f" User ID: {result.user_id}")
print(f" Session Token: {result.session_token}")
print(f" Required Methods: {[method.value for method in result.required_methods]}")
print(f" Risk Score: {result.risk_score}")
print(f" Error: {result.error_message}")
if __name__ == "__main__":
main()
2. Role-Based Access Control (RBAC) System
#!/usr/bin/env python3
# src/security/rbac_system.py
import json
import logging
from typing import Dict, List, Set, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
import time
from datetime import datetime, timedelta
class Permission(Enum):
READ = "read"
WRITE = "write"
DELETE = "delete"
EXECUTE = "execute"
ADMIN = "admin"
GRANT = "grant"
class ResourceType(Enum):
TABLE = "table"
VIEW = "view"
PROCEDURE = "procedure"
FUNCTION = "function"
SCHEMA = "schema"
DATABASE = "database"
@dataclass
class Resource:
resource_type: ResourceType
name: str
schema: Optional[str] = None
database: Optional[str] = None
def __str__(self) -> str:
parts = []
if self.database:
parts.append(self.database)
if self.schema:
parts.append(self.schema)
parts.append(self.name)
return ".".join(parts)
@dataclass
class Role:
name: str
description: str
permissions: Dict[str, Set[Permission]] = field(default_factory=dict)
parent_roles: Set[str] = field(default_factory=set)
created_at: datetime = field(default_factory=datetime.now)
is_active: bool = True
@dataclass
class User:
user_id: str
username: str
email: str
roles: Set[str] = field(default_factory=set)
direct_permissions: Dict[str, Set[Permission]] = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.now)
last_login: Optional[datetime] = None
is_active: bool = True
session_timeout: int = 3600 # seconds
@dataclass
class AccessRequest:
user_id: str
resource: Resource
permission: Permission
context: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class AccessResult:
granted: bool
reason: str
effective_permissions: Set[Permission]
applied_policies: List[str]
class RBACManager:
def __init__(self):
self.logger = self._setup_logging()
self.users: Dict[str, User] = {}
self.roles: Dict[str, Role] = {}
self.access_policies: List[Dict[str, Any]] = []
self.audit_log: List[Dict[str, Any]] = []
# Initialize default roles
self._create_default_roles()
def _setup_logging(self) -> logging.Logger:
logger = logging.getLogger('RBACManager')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def _create_default_roles(self):
"""Create default system roles"""
# Database Administrator
dba_role = Role(
name="dba",
description="Database Administrator with full access"
)
dba_role.permissions["*"] = {Permission.READ, Permission.WRITE,
Permission.DELETE, Permission.EXECUTE,
Permission.ADMIN, Permission.GRANT}
self.roles["dba"] = dba_role
# Data Analyst
analyst_role = Role(
name="data_analyst",
description="Read-only access to data tables"
)
analyst_role.permissions["*.data.*"] = {Permission.READ}
self.roles["data_analyst"] = analyst_role
# Application User
app_user_role = Role(
name="app_user",
description="Standard application user"
)
app_user_role.permissions["app_db.public.*"] = {Permission.READ, Permission.WRITE}
self.roles["app_user"] = app_user_role
# Read-only User
readonly_role = Role(
name="readonly",
description="Read-only access to specific schemas"
)
readonly_role.permissions["*.public.*"] = {Permission.READ}
self.roles["readonly"] = readonly_role
def create_user(self, user_id: str, username: str, email: str,
roles: Optional[List[str]] = None) -> bool:
"""Create a new user"""
try:
if user_id in self.users:
self.logger.warning(f"User {user_id} already exists")
return False
user = User(
user_id=user_id,
username=username,
email=email,
roles=set(roles or [])
)
# Validate roles exist
for role_name in user.roles:
if role_name not in self.roles:
self.logger.error(f"Role {role_name} does not exist")
return False
self.users[user_id] = user
self._audit_log("USER_CREATED", {"user_id": user_id, "username": username})
self.logger.info(f"Created user: {username} ({user_id})")
return True
except Exception as e:
self.logger.error(f"Failed to create user {user_id}: {e}")
return False
def create_role(self, name: str, description: str,
parent_roles: Optional[List[str]] = None) -> bool:
"""Create a new role"""
try:
if name in self.roles:
self.logger.warning(f"Role {name} already exists")
return False
role = Role(
name=name,
description=description,
parent_roles=set(parent_roles or [])
)
# Validate parent roles exist
for parent_role in role.parent_roles:
if parent_role not in self.roles:
self.logger.error(f"Parent role {parent_role} does not exist")
return False
self.roles[name] = role
self._audit_log("ROLE_CREATED", {"role_name": name, "description": description})
self.logger.info(f"Created role: {name}")
return True
except Exception as e:
self.logger.error(f"Failed to create role {name}: {e}")
return False
def grant_permission(self, role_name: str, resource_pattern: str,
permissions: List[Permission]) -> bool:
"""Grant permissions to a role"""
try:
if role_name not in self.roles:
self.logger.error(f"Role {role_name} does not exist")
return False
role = self.roles[role_name]
if resource_pattern not in role.permissions:
role.permissions[resource_pattern] = set()
role.permissions[resource_pattern].update(permissions)
self._audit_log("PERMISSION_GRANTED", {
"role_name": role_name,
"resource_pattern": resource_pattern,
"permissions": [p.value for p in permissions]
})
self.logger.info(f"Granted permissions to role {role_name}: {resource_pattern}")
return True
except Exception as e:
self.logger.error(f"Failed to grant permission: {e}")
return False
def revoke_permission(self, role_name: str, resource_pattern: str,
permissions: List[Permission]) -> bool:
"""Revoke permissions from a role"""
try:
if role_name not in self.roles:
self.logger.error(f"Role {role_name} does not exist")
return False
role = self.roles[role_name]
if resource_pattern in role.permissions:
role.permissions[resource_pattern] -= set(permissions)
# Remove empty permission sets
if not role.permissions[resource_pattern]:
del role.permissions[resource_pattern]
self._audit_log("PERMISSION_REVOKED", {
"role_name": role_name,
"resource_pattern": resource_pattern,
"permissions": [p.value for p in permissions]
})
self.logger.info(f"Revoked permissions from role {role_name}: {resource_pattern}")
return True
except Exception as e:
self.logger.error(f"Failed to revoke permission: {e}")
return False
def assign_role(self, user_id: str, role_name: str) -> bool:
"""Assign a role to a user"""
try:
if user_id not in self.users:
self.logger.error(f"User {user_id} does not exist")
return False
if role_name not in self.roles:
self.logger.error(f"Role {role_name} does not exist")
return False
self.users[user_id].roles.add(role_name)
self._audit_log("ROLE_ASSIGNED", {
"user_id": user_id,
"role_name": role_name
})
self.logger.info(f"Assigned role {role_name} to user {user_id}")
return True
except Exception as e:
self.logger.error(f"Failed to assign role: {e}")
return False
def remove_role(self, user_id: str, role_name: str) -> bool:
"""Remove a role from a user"""
try:
if user_id not in self.users:
self.logger.error(f"User {user_id} does not exist")
return False
self.users[user_id].roles.discard(role_name)
self._audit_log("ROLE_REMOVED", {
"user_id": user_id,
"role_name": role_name
})
self.logger.info(f"Removed role {role_name} from user {user_id}")
return True
except Exception as e:
self.logger.error(f"Failed to remove role: {e}")
return False
def _get_effective_permissions(self, user_id: str) -> Dict[str, Set[Permission]]:
"""Get all effective permissions for a user"""
if user_id not in self.users:
return {}
user = self.users[user_id]
effective_permissions = {}
# Start with direct permissions
for resource_pattern, perms in user.direct_permissions.items():
effective_permissions[resource_pattern] = perms.copy()
# Add permissions from roles (including inherited roles)
processed_roles = set()
roles_to_process = list(user.roles)
while roles_to_process:
role_name = roles_to_process.pop(0)
if role_name in processed_roles or role_name not in self.roles:
continue
processed_roles.add(role_name)
role = self.roles[role_name]
# Add parent roles to processing queue
roles_to_process.extend(role.parent_roles)
# Merge role permissions
for resource_pattern, perms in role.permissions.items():
if resource_pattern not in effective_permissions:
effective_permissions[resource_pattern] = set()
effective_permissions[resource_pattern].update(perms)
return effective_permissions
def _match_resource_pattern(self, pattern: str, resource: Resource) -> bool:
"""Check if a resource matches a pattern"""
resource_str = str(resource)
# Handle wildcard patterns
if pattern == "*":
return True
# Split pattern and resource into parts
pattern_parts = pattern.split(".")
resource_parts = resource_str.split(".")
# Ensure same number of parts or pattern has wildcards
if len(pattern_parts) != len(resource_parts):
return False
# Check each part
for pattern_part, resource_part in zip(pattern_parts, resource_parts):
if pattern_part != "*" and pattern_part != resource_part:
return False
return True
def check_access(self, request: AccessRequest) -> AccessResult:
"""Check if a user has access to a resource"""
try:
if request.user_id not in self.users:
return AccessResult(
granted=False,
reason="User not found",
effective_permissions=set(),
applied_policies=[]
)
user = self.users[request.user_id]
# Check if user is active
if not user.is_active:
return AccessResult(
granted=False,
reason="User account is inactive",
effective_permissions=set(),
applied_policies=[]
)
# Get effective permissions
effective_permissions = self._get_effective_permissions(request.user_id)
# Find matching permissions
granted_permissions = set()
applied_policies = []
for pattern, permissions in effective_permissions.items():
if self._match_resource_pattern(pattern, request.resource):
granted_permissions.update(permissions)
applied_policies.append(f"Pattern: {pattern}")
# Check if requested permission is granted
access_granted = request.permission in granted_permissions
# Apply additional policies
policy_result = self._apply_access_policies(request, granted_permissions)
if not policy_result['allowed']:
access_granted = False
applied_policies.extend(policy_result['policies'])
# Log access attempt
self._audit_log("ACCESS_CHECK", {
"user_id": request.user_id,
"resource": str(request.resource),
"permission": request.permission.value,
"granted": access_granted,
"reason": "Permission granted" if access_granted else "Permission denied"
})
return AccessResult(
granted=access_granted,
reason="Permission granted" if access_granted else "Permission denied",
effective_permissions=granted_permissions,
applied_policies=applied_policies
)
except Exception as e:
self.logger.error(f"Access check failed: {e}")
return AccessResult(
granted=False,
reason=f"Internal error: {e}",
effective_permissions=set(),
applied_policies=[]
)
def _apply_access_policies(self, request: AccessRequest,
granted_permissions: Set[Permission]) -> Dict[str, Any]:
"""Apply additional access policies"""
policies_applied = []
allowed = True
# Time-based access policy
current_hour = datetime.now().hour
if current_hour < 6 or current_hour > 22:
if Permission.ADMIN in granted_permissions:
# Admin operations restricted during off-hours
allowed = False
policies_applied.append("Off-hours admin restriction")
# Sensitive data access policy
if "sensitive" in str(request.resource).lower():
if Permission.READ in granted_permissions:
# Additional logging for sensitive data access
policies_applied.append("Sensitive data access logged")
# High-privilege operation policy
if request.permission in [Permission.DELETE, Permission.ADMIN]:
# Require additional verification for high-privilege operations
if not request.context.get('verified', False):
allowed = False
policies_applied.append("High-privilege operation requires verification")
return {
'allowed': allowed,
'policies': policies_applied
}
def _audit_log(self, action: str, details: Dict[str, Any]):
"""Log audit event"""
audit_entry = {
'timestamp': datetime.now().isoformat(),
'action': action,
'details': details
}
self.audit_log.append(audit_entry)
def get_user_permissions(self, user_id: str) -> Dict[str, Any]:
"""Get comprehensive permission report for a user"""
if user_id not in self.users:
return {}
user = self.users[user_id]
effective_permissions = self._get_effective_permissions(user_id)
return {
'user_id': user_id,
'username': user.username,
'roles': list(user.roles),
'effective_permissions': {
pattern: [p.value for p in perms]
for pattern, perms in effective_permissions.items()
},
'direct_permissions': {
pattern: [p.value for p in perms]
for pattern, perms in user.direct_permissions.items()
}
}
def export_configuration(self) -> Dict[str, Any]:
"""Export RBAC configuration"""
return {
'users': {
user_id: {
'username': user.username,
'email': user.email,
'roles': list(user.roles),
'is_active': user.is_active
}
for user_id, user in self.users.items()
},
'roles': {
role_name: {
'description': role.description,
'permissions': {
pattern: [p.value for p in perms]
for pattern, perms in role.permissions.items()
},
'parent_roles': list(role.parent_roles),
'is_active': role.is_active
}
for role_name, role in self.roles.items()
}
}
# Example usage
def main():
rbac = RBACManager()
# Create users
rbac.create_user("alice", "Alice Smith", "alice@company.com", ["dba"])
rbac.create_user("bob", "Bob Johnson", "bob@company.com", ["data_analyst"])
rbac.create_user("charlie", "Charlie Brown", "charlie@company.com", ["app_user"])
# Create custom role
rbac.create_role("report_viewer", "Can view reports", ["readonly"])
rbac.grant_permission("report_viewer", "reports.*", [Permission.READ])
# Test access
resource = Resource(ResourceType.TABLE, "users", "public", "app_db")
request = AccessRequest("alice", resource, Permission.READ)
result = rbac.check_access(request)
print(f"Access Result for Alice:")
print(f" Granted: {result.granted}")
print(f" Reason: {result.reason}")
print(f" Effective Permissions: {[p.value for p in result.effective_permissions]}")
# Get user permissions report
permissions_report = rbac.get_user_permissions("alice")
print(f"\nAlice's Permissions:")
print(json.dumps(permissions_report, indent=2))
if __name__ == "__main__":
main()
Data Encryption
1. Comprehensive Encryption Manager
#!/usr/bin/env python3
# src/security/encryption_manager.py
import os
import base64
import json
import logging
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass
from enum import Enum
import secrets
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
import hashlib
import hmac
class EncryptionAlgorithm(Enum):
AES_256_GCM = "aes_256_gcm"
AES_256_CBC = "aes_256_cbc"
CHACHA20_POLY1305 = "chacha20_poly1305"
RSA_OAEP = "rsa_oaep"
FERNET = "fernet"
class KeyDerivationFunction(Enum):
PBKDF2 = "pbkdf2"
SCRYPT = "scrypt"
ARGON2 = "argon2"
@dataclass
class EncryptionConfig:
algorithm: EncryptionAlgorithm
key_size: int
kdf: KeyDerivationFunction
iterations: int
salt_size: int
iv_size: int
@dataclass
class EncryptedData:
ciphertext: bytes
iv: Optional[bytes]
salt: Optional[bytes]
tag: Optional[bytes]
algorithm: EncryptionAlgorithm
metadata: Dict[str, Any]
class EncryptionManager:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = self._setup_logging()
self.master_key = self._derive_master_key()
self.key_cache: Dict[str, bytes] = {}
# Default encryption configurations
self.encryption_configs = {
'default': EncryptionConfig(
algorithm=EncryptionAlgorithm.AES_256_GCM,
key_size=32,
kdf=KeyDerivationFunction.PBKDF2,
iterations=100000,
salt_size=16,
iv_size=12
),
'high_security': EncryptionConfig(
algorithm=EncryptionAlgorithm.AES_256_GCM,
key_size=32,
kdf=KeyDerivationFunction.SCRYPT,
iterations=32768,
salt_size=32,
iv_size=12
),
'performance': EncryptionConfig(
algorithm=EncryptionAlgorithm.CHACHA20_POLY1305,
key_size=32,
kdf=KeyDerivationFunction.PBKDF2,
iterations=50000,
salt_size=16,
iv_size=12
)
}
def _setup_logging(self) -> logging.Logger:
logger = logging.getLogger('EncryptionManager')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def _derive_master_key(self) -> bytes:
"""Derive master key from configuration"""
master_password = self.config.get('master_password', '').encode()
salt = self.config.get('master_salt', 'default_salt').encode()
if not master_password:
# Generate random master key if no password provided
return secrets.token_bytes(32)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
return kdf.derive(master_password)
def derive_key(self, password: str, salt: bytes, config: EncryptionConfig) -> bytes:
"""Derive encryption key from password"""
password_bytes = password.encode('utf-8')
if config.kdf == KeyDerivationFunction.PBKDF2:
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=config.key_size,
salt=salt,
iterations=config.iterations,
)
return kdf.derive(password_bytes)
elif config.kdf == KeyDerivationFunction.SCRYPT:
kdf = Scrypt(
algorithm=hashes.SHA256(),
length=config.key_size,
salt=salt,
n=config.iterations,
r=8,
p=1,
)
return kdf.derive(password_bytes)
else:
raise ValueError(f"Unsupported KDF: {config.kdf}")
def generate_key(self, key_id: str, config_name: str = 'default') -> bytes:
"""Generate and store a new encryption key"""
config = self.encryption_configs[config_name]
key = secrets.token_bytes(config.key_size)
# Encrypt key with master key for storage
encrypted_key = self._encrypt_key(key)
# Store encrypted key (in production, this would be in a secure key store)
self._store_key(key_id, encrypted_key)
# Cache the key
self.key_cache[key_id] = key
self.logger.info(f"Generated new key: {key_id}")
return key
def get_key(self, key_id: str) -> Optional[bytes]:
"""Retrieve encryption key"""
if key_id in self.key_cache:
return self.key_cache[key_id]
# Load from storage
encrypted_key = self._load_key(key_id)
if encrypted_key:
key = self._decrypt_key(encrypted_key)
self.key_cache[key_id] = key
return key
return None
def _encrypt_key(self, key: bytes) -> bytes:
"""Encrypt key with master key"""
fernet = Fernet(base64.urlsafe_b64encode(self.master_key))
return fernet.encrypt(key)
def _decrypt_key(self, encrypted_key: bytes) -> bytes:
"""Decrypt key with master key"""
fernet = Fernet(base64.urlsafe_b64encode(self.master_key))
return fernet.decrypt(encrypted_key)
def _store_key(self, key_id: str, encrypted_key: bytes):
"""Store encrypted key (placeholder implementation)"""
# In production, this would store in a secure key management system
key_file = f"/tmp/keys/{key_id}.key"
os.makedirs(os.path.dirname(key_file), exist_ok=True)
with open(key_file, 'wb') as f:
f.write(encrypted_key)
def _load_key(self, key_id: str) -> Optional[bytes]:
"""Load encrypted key (placeholder implementation)"""
key_file = f"/tmp/keys/{key_id}.key"
try:
with open(key_file, 'rb') as f:
return f.read()
except FileNotFoundError:
return None
def encrypt_data(self, data: bytes, key_id: str,
config_name: str = 'default') -> EncryptedData:
"""Encrypt data using specified configuration"""
config = self.encryption_configs[config_name]
key = self.get_key(key_id)
if not key:
raise ValueError(f"Key not found: {key_id}")
if config.algorithm == EncryptionAlgorithm.AES_256_GCM:
return self._encrypt_aes_gcm(data, key, config)
elif config.algorithm == EncryptionAlgorithm.AES_256_CBC:
return self._encrypt_aes_cbc(data, key, config)
elif config.algorithm == EncryptionAlgorithm.CHACHA20_POLY1305:
return self._encrypt_chacha20(data, key, config)
elif config.algorithm == EncryptionAlgorithm.FERNET:
return self._encrypt_fernet(data, key, config)
else:
raise ValueError(f"Unsupported algorithm: {config.algorithm}")
def decrypt_data(self, encrypted_data: EncryptedData, key_id: str) -> bytes:
"""Decrypt data"""
key = self.get_key(key_id)
if not key:
raise ValueError(f"Key not found: {key_id}")
if encrypted_data.algorithm == EncryptionAlgorithm.AES_256_GCM:
return self._decrypt_aes_gcm(encrypted_data, key)
elif encrypted_data.algorithm == EncryptionAlgorithm.AES_256_CBC:
return self._decrypt_aes_cbc(encrypted_data, key)
elif encrypted_data.algorithm == EncryptionAlgorithm.CHACHA20_POLY1305:
return self._decrypt_chacha20(encrypted_data, key)
elif encrypted_data.algorithm == EncryptionAlgorithm.FERNET:
return self._decrypt_fernet(encrypted_data, key)
else:
raise ValueError(f"Unsupported algorithm: {encrypted_data.algorithm}")
def _encrypt_aes_gcm(self, data: bytes, key: bytes,
config: EncryptionConfig) -> EncryptedData:
"""Encrypt using AES-256-GCM"""
iv = secrets.token_bytes(config.iv_size)
cipher = Cipher(algorithms.AES(key), modes.GCM(iv))
encryptor = cipher.encryptor()
ciphertext = encryptor.update(data) + encryptor.finalize()
return EncryptedData(
ciphertext=ciphertext,
iv=iv,
salt=None,
tag=encryptor.tag,
algorithm=config.algorithm,
metadata={'key_size': len(key)}
)
def _decrypt_aes_gcm(self, encrypted_data: EncryptedData, key: bytes) -> bytes:
"""Decrypt using AES-256-GCM"""
cipher = Cipher(
algorithms.AES(key),
modes.GCM(encrypted_data.iv, encrypted_data.tag)
)
decryptor = cipher.decryptor()
return decryptor.update(encrypted_data.ciphertext) + decryptor.finalize()
def _encrypt_aes_cbc(self, data: bytes, key: bytes,
config: EncryptionConfig) -> EncryptedData:
"""Encrypt using AES-256-CBC"""
iv = secrets.token_bytes(16) # AES block size
# Pad data to block size
padding_length = 16 - (len(data) % 16)
padded_data = data + bytes([padding_length] * padding_length)
cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
encryptor = cipher.encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
return EncryptedData(
ciphertext=ciphertext,
iv=iv,
salt=None,
tag=None,
algorithm=config.algorithm,
metadata={'key_size': len(key)}
)
def _decrypt_aes_cbc(self, encrypted_data: EncryptedData, key: bytes) -> bytes:
"""Decrypt using AES-256-CBC"""
cipher = Cipher(algorithms.AES(key), modes.CBC(encrypted_data.iv))
decryptor = cipher.decryptor()
padded_data = decryptor.update(encrypted_data.ciphertext) + decryptor.finalize()
# Remove padding
padding_length = padded_data[-1]
return padded_data[:-padding_length]
def _encrypt_chacha20(self, data: bytes, key: bytes,
config: EncryptionConfig) -> EncryptedData:
"""Encrypt using ChaCha20-Poly1305"""
nonce = secrets.token_bytes(12) # ChaCha20 nonce size
cipher = Cipher(algorithms.ChaCha20(key, nonce), mode=None)
encryptor = cipher.encryptor()
ciphertext = encryptor.update(data) + encryptor.finalize()
return EncryptedData(
ciphertext=ciphertext,
iv=nonce,
salt=None,
tag=None,
algorithm=config.algorithm,
metadata={'key_size': len(key)}
)
def _decrypt_chacha20(self, encrypted_data: EncryptedData, key: bytes) -> bytes:
"""Decrypt using ChaCha20-Poly1305"""
cipher = Cipher(algorithms.ChaCha20(key, encrypted_data.iv), mode=None)
decryptor = cipher.decryptor()
return decryptor.update(encrypted_data.ciphertext) + decryptor.finalize()
def _encrypt_fernet(self, data: bytes, key: bytes,
config: EncryptionConfig) -> EncryptedData:
"""Encrypt using Fernet"""
fernet = Fernet(base64.urlsafe_b64encode(key))
ciphertext = fernet.encrypt(data)
return EncryptedData(
ciphertext=ciphertext,
iv=None,
salt=None,
tag=None,
algorithm=config.algorithm,
metadata={'key_size': len(key)}
)
def _decrypt_fernet(self, encrypted_data: EncryptedData, key: bytes) -> bytes:
"""Decrypt using Fernet"""
fernet = Fernet(base64.urlsafe_b64encode(key))
return fernet.decrypt(encrypted_data.ciphertext)
def encrypt_column_data(self, table_name: str, column_name: str,
data: Any, data_type: str) -> str:
"""Encrypt column data with format preservation"""
# Convert data to bytes
if isinstance(data, str):
data_bytes = data.encode('utf-8')
elif isinstance(data, (int, float)):
data_bytes = str(data).encode('utf-8')
else:
data_bytes = str(data).encode('utf-8')
# Use column-specific key
key_id = f"{table_name}.{column_name}"
# Ensure key exists
if not self.get_key(key_id):
self.generate_key(key_id)
# Encrypt data
encrypted_data = self.encrypt_data(data_bytes, key_id)
# Return base64-encoded ciphertext for storage
return base64.b64encode(encrypted_data.ciphertext).decode('utf-8')
def decrypt_column_data(self, table_name: str, column_name: str,
encrypted_value: str, data_type: str) -> Any:
"""Decrypt column data and convert back to original type"""
key_id = f"{table_name}.{column_name}"
# Decode base64
ciphertext = base64.b64decode(encrypted_value.encode('utf-8'))
# Create EncryptedData object (simplified - in practice, store metadata)
encrypted_data = EncryptedData(
ciphertext=ciphertext,
iv=None,
salt=None,
tag=None,
algorithm=EncryptionAlgorithm.FERNET,
metadata={}
)
# Decrypt
decrypted_bytes = self.decrypt_data(encrypted_data, key_id)
decrypted_str = decrypted_bytes.decode('utf-8')
# Convert back to original type
if data_type.lower() in ['int', 'integer', 'bigint']:
return int(decrypted_str)
elif data_type.lower() in ['float', 'double', 'decimal']:
return float(decrypted_str)
else:
return decrypted_str
def rotate_key(self, key_id: str) -> bool:
"""Rotate encryption key"""
try:
# Generate new key
old_key = self.get_key(key_id)
new_key = secrets.token_bytes(32)
# Store new key with versioned ID
new_key_id = f"{key_id}_v{int(time.time())}"
encrypted_new_key = self._encrypt_key(new_key)
self._store_key(new_key_id, encrypted_new_key)
# Update key cache
self.key_cache[key_id] = new_key
self.logger.info(f"Rotated key: {key_id}")
return True
except Exception as e:
self.logger.error(f"Key rotation failed for {key_id}: {e}")
return False
def generate_hash(self, data: str, salt: Optional[str] = None) -> Tuple[str, str]:
"""Generate secure hash for data"""
if salt is None:
salt = secrets.token_hex(16)
# Use PBKDF2 for password-like data
hash_value = hashlib.pbkdf2_hmac(
'sha256',
data.encode('utf-8'),
salt.encode('utf-8'),
100000
)
return base64.b64encode(hash_value).decode('utf-8'), salt
def verify_hash(self, data: str, hash_value: str, salt: str) -> bool:
"""Verify data against hash"""
computed_hash, _ = self.generate_hash(data, salt)
return hmac.compare_digest(computed_hash, hash_value)
# Example usage
def main():
config = {
'master_password': 'secure_master_password_2024',
'master_salt': 'encryption_salt_2024'
}
encryption_manager = EncryptionManager(config)
# Generate a key for sensitive data
key_id = "users.ssn"
encryption_manager.generate_key(key_id)
# Encrypt sensitive data
sensitive_data = "123-45-6789"
encrypted_ssn = encryption_manager.encrypt_column_data(
"users", "ssn", sensitive_data, "varchar"
)
print(f"Original SSN: {sensitive_data}")
print(f"Encrypted SSN: {encrypted_ssn}")
# Decrypt data
decrypted_ssn = encryption_manager.decrypt_column_data(
"users", "ssn", encrypted_ssn, "varchar"
)
print(f"Decrypted SSN: {decrypted_ssn}")
# Generate hash for password
password = "user_password_123"
hash_value, salt = encryption_manager.generate_hash(password)
print(f"Password hash: {hash_value}")
print(f"Salt: {salt}")
# Verify password
is_valid = encryption_manager.verify_hash(password, hash_value, salt)
print(f"Password verification: {is_valid}")
if __name__ == "__main__":
main()
Access Control Implementation
1. Database-Level Access Control
import sqlite3
import mysql.connector
import psycopg2
from typing import Dict, List, Optional, Any
import logging
from datetime import datetime, timedelta
class DatabaseAccessController:
"""Unified database access control implementation"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = logging.getLogger(__name__)
self.connections = {}
self.access_policies = {}
def create_connection(self, db_type: str, connection_params: Dict[str, Any]) -> Any:
"""Create secure database connection"""
try:
if db_type == 'mysql':
connection = mysql.connector.connect(
**connection_params,
ssl_disabled=False,
ssl_verify_cert=True,
ssl_verify_identity=True,
autocommit=False
)
elif db_type == 'postgresql':
connection = psycopg2.connect(
**connection_params,
sslmode='require',
connect_timeout=10
)
elif db_type == 'sqlite':
connection = sqlite3.connect(
connection_params['database'],
timeout=10.0,
check_same_thread=False
)
else:
raise ValueError(f"Unsupported database type: {db_type}")
self.connections[db_type] = connection
self.logger.info(f"Secure connection established for {db_type}")
return connection
except Exception as e:
self.logger.error(f"Connection failed for {db_type}: {e}")
raise
def create_user_with_privileges(self, db_type: str, username: str,
password: str, privileges: List[str],
databases: List[str] = None) -> bool:
"""Create database user with specific privileges"""
try:
connection = self.connections.get(db_type)
if not connection:
raise ValueError(f"No connection available for {db_type}")
cursor = connection.cursor()
if db_type == 'mysql':
# Create user
cursor.execute(f"CREATE USER '{username}'@'%' IDENTIFIED BY %s", (password,))
# Grant privileges
for db_name in (databases or ['*']):
for privilege in privileges:
if privilege.upper() == 'ALL':
cursor.execute(f"GRANT ALL PRIVILEGES ON {db_name}.* TO '{username}'@'%'")
else:
cursor.execute(f"GRANT {privilege} ON {db_name}.* TO '{username}'@'%'")
cursor.execute("FLUSH PRIVILEGES")
elif db_type == 'postgresql':
# Create user
cursor.execute(f"CREATE USER {username} WITH PASSWORD %s", (password,))
# Grant privileges
for db_name in (databases or []):
cursor.execute(f"GRANT CONNECT ON DATABASE {db_name} TO {username}")
for privilege in privileges:
if privilege.upper() == 'ALL':
cursor.execute(f"GRANT ALL PRIVILEGES ON DATABASE {db_name} TO {username}")
else:
cursor.execute(f"GRANT {privilege} ON DATABASE {db_name} TO {username}")
connection.commit()
self.logger.info(f"User {username} created with privileges: {privileges}")
return True
except Exception as e:
self.logger.error(f"Failed to create user {username}: {e}")
connection.rollback()
return False
def implement_row_level_security(self, db_type: str, table_name: str,
policy_name: str, policy_condition: str) -> bool:
"""Implement row-level security policies"""
try:
connection = self.connections.get(db_type)
cursor = connection.cursor()
if db_type == 'postgresql':
# Enable RLS on table
cursor.execute(f"ALTER TABLE {table_name} ENABLE ROW LEVEL SECURITY")
# Create policy
cursor.execute(f"""
CREATE POLICY {policy_name} ON {table_name}
FOR ALL TO PUBLIC
USING ({policy_condition})
""")
elif db_type == 'mysql':
# MySQL doesn't have native RLS, implement with views
view_name = f"{table_name}_secure"
cursor.execute(f"""
CREATE VIEW {view_name} AS
SELECT * FROM {table_name}
WHERE {policy_condition}
""")
connection.commit()
self.logger.info(f"Row-level security implemented for {table_name}")
return True
except Exception as e:
self.logger.error(f"Failed to implement RLS for {table_name}: {e}")
return False
def create_security_definer_function(self, db_type: str, function_name: str,
function_body: str, owner: str) -> bool:
"""Create security definer functions for controlled access"""
try:
connection = self.connections.get(db_type)
cursor = connection.cursor()
if db_type == 'postgresql':
cursor.execute(f"""
CREATE OR REPLACE FUNCTION {function_name}()
RETURNS TABLE(result_data TEXT)
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = public
AS $$
BEGIN
{function_body}
END;
$$;
""")
# Set owner
cursor.execute(f"ALTER FUNCTION {function_name}() OWNER TO {owner}")
connection.commit()
self.logger.info(f"Security definer function {function_name} created")
return True
except Exception as e:
self.logger.error(f"Failed to create function {function_name}: {e}")
return False
### 2. Application-Level Access Control
class ApplicationAccessControl:
"""Application-level access control and session management"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = logging.getLogger(__name__)
self.active_sessions = {}
self.failed_attempts = {}
def validate_session(self, session_id: str, user_id: str) -> bool:
"""Validate user session"""
try:
session = self.active_sessions.get(session_id)
if not session:
return False
# Check session expiry
if datetime.now() > session['expires_at']:
del self.active_sessions[session_id]
return False
# Check user match
if session['user_id'] != user_id:
return False
# Update last activity
session['last_activity'] = datetime.now()
return True
except Exception as e:
self.logger.error(f"Session validation failed: {e}")
return False
def check_rate_limiting(self, user_id: str, action: str) -> bool:
"""Implement rate limiting for database operations"""
try:
current_time = datetime.now()
key = f"{user_id}:{action}"
if key not in self.failed_attempts:
self.failed_attempts[key] = []
# Clean old attempts
self.failed_attempts[key] = [
attempt for attempt in self.failed_attempts[key]
if current_time - attempt < timedelta(minutes=15)
]
# Check rate limit
max_attempts = self.config.get('rate_limit', {}).get(action, 10)
if len(self.failed_attempts[key]) >= max_attempts:
self.logger.warning(f"Rate limit exceeded for {user_id}:{action}")
return False
return True
except Exception as e:
self.logger.error(f"Rate limiting check failed: {e}")
return False
def log_access_attempt(self, user_id: str, action: str,
success: bool, details: Dict[str, Any] = None):
"""Log access attempts for auditing"""
try:
log_entry = {
'timestamp': datetime.now().isoformat(),
'user_id': user_id,
'action': action,
'success': success,
'ip_address': details.get('ip_address') if details else None,
'user_agent': details.get('user_agent') if details else None,
'resource': details.get('resource') if details else None
}
# Log to file or database
self.logger.info(f"Access attempt: {log_entry}")
# Track failed attempts for rate limiting
if not success:
key = f"{user_id}:{action}"
if key not in self.failed_attempts:
self.failed_attempts[key] = []
self.failed_attempts[key].append(datetime.now())
except Exception as e:
self.logger.error(f"Failed to log access attempt: {e}")
## Auditing and Monitoring
### 1. Comprehensive Audit System
```python
import json
import hashlib
from datetime import datetime
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
class AuditEventType(Enum):
LOGIN = "login"
LOGOUT = "logout"
QUERY = "query"
INSERT = "insert"
UPDATE = "update"
DELETE = "delete"
SCHEMA_CHANGE = "schema_change"
PRIVILEGE_CHANGE = "privilege_change"
BACKUP = "backup"
RESTORE = "restore"
@dataclass
class AuditEvent:
event_id: str
timestamp: datetime
event_type: AuditEventType
user_id: str
session_id: str
database_name: str
table_name: Optional[str]
query: Optional[str]
affected_rows: Optional[int]
ip_address: str
user_agent: str
success: bool
error_message: Optional[str]
data_before: Optional[Dict[str, Any]]
data_after: Optional[Dict[str, Any]]
class DatabaseAuditSystem:
"""Comprehensive database auditing system"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = logging.getLogger(__name__)
self.audit_storage = config.get('audit_storage', 'file')
self.retention_days = config.get('retention_days', 365)
def generate_event_id(self, event_data: Dict[str, Any]) -> str:
"""Generate unique event ID"""
event_string = json.dumps(event_data, sort_keys=True, default=str)
return hashlib.sha256(event_string.encode()).hexdigest()[:16]
def log_audit_event(self, event: AuditEvent) -> bool:
"""Log audit event to storage"""
try:
event_dict = asdict(event)
event_dict['timestamp'] = event.timestamp.isoformat()
event_dict['event_type'] = event.event_type.value
if self.audit_storage == 'file':
self._log_to_file(event_dict)
elif self.audit_storage == 'database':
self._log_to_database(event_dict)
elif self.audit_storage == 'syslog':
self._log_to_syslog(event_dict)
return True
except Exception as e:
self.logger.error(f"Failed to log audit event: {e}")
return False
def _log_to_file(self, event_dict: Dict[str, Any]):
"""Log audit event to file"""
audit_file = f"audit_{datetime.now().strftime('%Y%m%d')}.log"
with open(audit_file, 'a') as f:
f.write(json.dumps(event_dict) + '\n')
def _log_to_database(self, event_dict: Dict[str, Any]):
"""Log audit event to database"""
# Implementation for database logging
pass
def _log_to_syslog(self, event_dict: Dict[str, Any]):
"""Log audit event to syslog"""
import syslog
syslog.openlog("database_audit")
syslog.syslog(syslog.LOG_INFO, json.dumps(event_dict))
def create_audit_triggers(self, db_type: str, table_name: str) -> List[str]:
"""Create database triggers for automatic auditing"""
triggers = []
if db_type == 'postgresql':
# Create audit table
audit_table = f"{table_name}_audit"
triggers.append(f"""
CREATE TABLE IF NOT EXISTS {audit_table} (
audit_id SERIAL PRIMARY KEY,
operation CHAR(1) NOT NULL,
stamp TIMESTAMP NOT NULL DEFAULT NOW(),
userid TEXT NOT NULL DEFAULT USER,
old_values JSONB,
new_values JSONB
);
""")
# Create trigger function
triggers.append(f"""
CREATE OR REPLACE FUNCTION {table_name}_audit_trigger()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'DELETE' THEN
INSERT INTO {audit_table} (operation, old_values)
VALUES ('D', row_to_json(OLD));
RETURN OLD;
ELSIF TG_OP = 'UPDATE' THEN
INSERT INTO {audit_table} (operation, old_values, new_values)
VALUES ('U', row_to_json(OLD), row_to_json(NEW));
RETURN NEW;
ELSIF TG_OP = 'INSERT' THEN
INSERT INTO {audit_table} (operation, new_values)
VALUES ('I', row_to_json(NEW));
RETURN NEW;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
""")
# Create triggers
for operation in ['INSERT', 'UPDATE', 'DELETE']:
triggers.append(f"""
CREATE TRIGGER {table_name}_{operation.lower()}_audit
AFTER {operation} ON {table_name}
FOR EACH ROW EXECUTE FUNCTION {table_name}_audit_trigger();
""")
elif db_type == 'mysql':
# MySQL trigger implementation
audit_table = f"{table_name}_audit"
triggers.append(f"""
CREATE TABLE IF NOT EXISTS {audit_table} (
audit_id INT AUTO_INCREMENT PRIMARY KEY,
operation CHAR(1) NOT NULL,
stamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
userid VARCHAR(255) DEFAULT USER(),
old_values JSON,
new_values JSON
);
""")
# Create triggers for each operation
triggers.append(f"""
CREATE TRIGGER {table_name}_insert_audit
AFTER INSERT ON {table_name}
FOR EACH ROW
INSERT INTO {audit_table} (operation, new_values)
VALUES ('I', JSON_OBJECT({self._get_column_list(table_name, 'NEW')}));
""")
return triggers
def analyze_audit_logs(self, start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""Analyze audit logs for security insights"""
try:
analysis = {
'total_events': 0,
'events_by_type': {},
'events_by_user': {},
'failed_attempts': 0,
'suspicious_activities': [],
'top_queries': [],
'peak_hours': {}
}
# Read and analyze audit logs
# Implementation depends on storage type
return analysis
except Exception as e:
self.logger.error(f"Audit log analysis failed: {e}")
return {}
### 2. Real-time Security Monitoring
```bash
#!/bin/bash
# Database Security Monitoring Script
# Monitors database security events and generates alerts
# Configuration
DB_HOST="localhost"
DB_PORT="3306"
DB_USER="monitor_user"
DB_PASS="monitor_password"
ALERT_EMAIL="security@company.com"
LOG_FILE="/var/log/db_security_monitor.log"
THRESHOLD_FAILED_LOGINS=5
THRESHOLD_QUERY_TIME=30
# Logging function
log_message() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
# Check for failed login attempts
check_failed_logins() {
log_message "Checking for failed login attempts..."
failed_logins=$(mysql -h "$DB_HOST" -P "$DB_PORT" -u "$DB_USER" -p"$DB_PASS" \
-e "SELECT COUNT(*) FROM mysql.general_log WHERE command_type = 'Connect' AND argument LIKE '%Access denied%' AND event_time > DATE_SUB(NOW(), INTERVAL 1 HOUR);" \
-s -N 2>/dev/null)
if [ "$failed_logins" -gt "$THRESHOLD_FAILED_LOGINS" ]; then
alert_message="SECURITY ALERT: $failed_logins failed login attempts in the last hour"
log_message "$alert_message"
send_alert "$alert_message"
fi
}
# Check for suspicious queries
check_suspicious_queries() {
log_message "Checking for suspicious queries..."
# Check for potential SQL injection patterns
suspicious_patterns=(
"UNION.*SELECT"
"OR.*1=1"
"DROP.*TABLE"
"DELETE.*FROM.*WHERE.*1=1"
"INSERT.*INTO.*VALUES.*SELECT"
)
for pattern in "${suspicious_patterns[@]}"; do
count=$(mysql -h "$DB_HOST" -P "$DB_PORT" -u "$DB_USER" -p"$DB_PASS" \
-e "SELECT COUNT(*) FROM mysql.general_log WHERE argument REGEXP '$pattern' AND event_time > DATE_SUB(NOW(), INTERVAL 1 HOUR);" \
-s -N 2>/dev/null)
if [ "$count" -gt 0 ]; then
alert_message="SECURITY ALERT: $count suspicious queries detected with pattern: $pattern"
log_message "$alert_message"
send_alert "$alert_message"
fi
done
}
# Check for long-running queries
check_long_queries() {
log_message "Checking for long-running queries..."
long_queries=$(mysql -h "$DB_HOST" -P "$DB_PORT" -u "$DB_USER" -p"$DB_PASS" \
-e "SELECT COUNT(*) FROM information_schema.processlist WHERE time > $THRESHOLD_QUERY_TIME AND command != 'Sleep';" \
-s -N 2>/dev/null)
if [ "$long_queries" -gt 0 ]; then
alert_message="PERFORMANCE ALERT: $long_queries queries running longer than $THRESHOLD_QUERY_TIME seconds"
log_message "$alert_message"
send_alert "$alert_message"
fi
}
# Check database connections
check_connections() {
log_message "Checking database connections..."
max_connections=$(mysql -h "$DB_HOST" -P "$DB_PORT" -u "$DB_USER" -p"$DB_PASS" \
-e "SHOW VARIABLES LIKE 'max_connections';" -s -N | cut -f2)
current_connections=$(mysql -h "$DB_HOST" -P "$DB_PORT" -u "$DB_USER" -p"$DB_PASS" \
-e "SHOW STATUS LIKE 'Threads_connected';" -s -N | cut -f2)
connection_percentage=$((current_connections * 100 / max_connections))
if [ "$connection_percentage" -gt 80 ]; then
alert_message="CONNECTION ALERT: Database connections at $connection_percentage% capacity ($current_connections/$max_connections)"
log_message "$alert_message"
send_alert "$alert_message"
fi
}
# Check for privilege escalation attempts
check_privilege_escalation() {
log_message "Checking for privilege escalation attempts..."
privilege_changes=$(mysql -h "$DB_HOST" -P "$DB_PORT" -u "$DB_USER" -p"$DB_PASS" \
-e "SELECT COUNT(*) FROM mysql.general_log WHERE argument REGEXP 'GRANT|REVOKE|CREATE USER|DROP USER' AND event_time > DATE_SUB(NOW(), INTERVAL 1 HOUR);" \
-s -N 2>/dev/null)
if [ "$privilege_changes" -gt 0 ]; then
alert_message="SECURITY ALERT: $privilege_changes privilege changes detected in the last hour"
log_message "$alert_message"
send_alert "$alert_message"
fi
}
# Send alert notification
send_alert() {
local message="$1"
# Send email alert
echo "$message" | mail -s "Database Security Alert - $(hostname)" "$ALERT_EMAIL"
# Send to syslog
logger -p local0.alert "$message"
# Send to monitoring system (example: Prometheus Alertmanager)
curl -X POST http://alertmanager:9093/api/v1/alerts \
-H "Content-Type: application/json" \
-d "[{
\"labels\": {
\"alertname\": \"DatabaseSecurityAlert\",
\"severity\": \"critical\",
\"instance\": \"$(hostname)\"
},
\"annotations\": {
\"summary\": \"$message\"
}
}]" 2>/dev/null
}
# Generate security report
generate_security_report() {
log_message "Generating security report..."
report_file="/tmp/db_security_report_$(date +%Y%m%d_%H%M%S).txt"
cat > "$report_file" << EOF
Database Security Report - $(date)
=====================================
System Information:
- Database Host: $DB_HOST:$DB_PORT
- Report Generated: $(date)
Security Metrics:
EOF
# Add failed login statistics
echo "- Failed Logins (last 24h): $(mysql -h "$DB_HOST" -P "$DB_PORT" -u "$DB_USER" -p"$DB_PASS" \
-e "SELECT COUNT(*) FROM mysql.general_log WHERE command_type = 'Connect' AND argument LIKE '%Access denied%' AND event_time > DATE_SUB(NOW(), INTERVAL 24 HOUR);" \
-s -N 2>/dev/null)" >> "$report_file"
# Add connection statistics
echo "- Current Connections: $(mysql -h "$DB_HOST" -P "$DB_PORT" -u "$DB_USER" -p"$DB_PASS" \
-e "SHOW STATUS LIKE 'Threads_connected';" -s -N | cut -f2)" >> "$report_file"
# Add query statistics
echo "- Queries (last 24h): $(mysql -h "$DB_HOST" -P "$DB_PORT" -u "$DB_USER" -p"$DB_PASS" \
-e "SELECT COUNT(*) FROM mysql.general_log WHERE event_time > DATE_SUB(NOW(), INTERVAL 24 HOUR);" \
-s -N 2>/dev/null)" >> "$report_file"
log_message "Security report generated: $report_file"
# Email the report
mail -s "Daily Database Security Report - $(hostname)" "$ALERT_EMAIL" < "$report_file"
}
# Main monitoring loop
main() {
log_message "Starting database security monitoring..."
while true; do
check_failed_logins
check_suspicious_queries
check_long_queries
check_connections
check_privilege_escalation
# Generate daily report at midnight
if [ "$(date +%H%M)" = "0000" ]; then
generate_security_report
fi
# Wait 5 minutes before next check
sleep 300
done
}
# Run monitoring if script is executed directly
if [ "${BASH_SOURCE[0]}" = "${0}" ]; then
main "$@"
fi
Compliance and Governance
1. Compliance Framework Implementation
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import json
import re
from datetime import datetime, timedelta
class ComplianceStandard(Enum):
GDPR = "gdpr"
HIPAA = "hipaa"
PCI_DSS = "pci_dss"
SOX = "sox"
ISO27001 = "iso27001"
@dataclass
class ComplianceRule:
rule_id: str
standard: ComplianceStandard
category: str
description: str
severity: str
check_function: str
remediation: str
class DatabaseComplianceManager:
"""Database compliance management and validation"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = logging.getLogger(__name__)
self.compliance_rules = self._load_compliance_rules()
self.scan_results = {}
def _load_compliance_rules(self) -> List[ComplianceRule]:
"""Load compliance rules for different standards"""
rules = []
# GDPR Rules
rules.extend([
ComplianceRule(
rule_id="GDPR-001",
standard=ComplianceStandard.GDPR,
category="Data Protection",
description="Personal data must be encrypted at rest",
severity="high",
check_function="check_encryption_at_rest",
remediation="Enable transparent data encryption (TDE) or column-level encryption"
),
ComplianceRule(
rule_id="GDPR-002",
standard=ComplianceStandard.GDPR,
category="Access Control",
description="Access to personal data must be logged and auditable",
severity="high",
check_function="check_audit_logging",
remediation="Enable comprehensive audit logging for all data access"
),
ComplianceRule(
rule_id="GDPR-003",
standard=ComplianceStandard.GDPR,
category="Data Retention",
description="Personal data retention policies must be enforced",
severity="medium",
check_function="check_data_retention_policies",
remediation="Implement automated data retention and deletion policies"
)
])
# HIPAA Rules
rules.extend([
ComplianceRule(
rule_id="HIPAA-001",
standard=ComplianceStandard.HIPAA,
category="Access Control",
description="Unique user identification required",
severity="high",
check_function="check_unique_user_identification",
remediation="Ensure each user has a unique identifier and disable shared accounts"
),
ComplianceRule(
rule_id="HIPAA-002",
standard=ComplianceStandard.HIPAA,
category="Audit Controls",
description="Audit logs must capture access to PHI",
severity="high",
check_function="check_phi_audit_logging",
remediation="Enable detailed audit logging for all PHI access"
)
])
# PCI DSS Rules
rules.extend([
ComplianceRule(
rule_id="PCI-001",
standard=ComplianceStandard.PCI_DSS,
category="Data Protection",
description="Cardholder data must be encrypted",
severity="critical",
check_function="check_cardholder_data_encryption",
remediation="Encrypt all stored cardholder data using strong cryptography"
),
ComplianceRule(
rule_id="PCI-002",
standard=ComplianceStandard.PCI_DSS,
category="Access Control",
description="Restrict access to cardholder data by business need-to-know",
severity="high",
check_function="check_cardholder_data_access",
remediation="Implement role-based access control for cardholder data"
)
])
return rules
def run_compliance_scan(self, standards: List[ComplianceStandard] = None) -> Dict[str, Any]:
"""Run comprehensive compliance scan"""
try:
if standards is None:
standards = [standard for standard in ComplianceStandard]
scan_results = {
'scan_id': f"scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
'timestamp': datetime.now().isoformat(),
'standards_checked': [s.value for s in standards],
'results': {},
'summary': {
'total_rules': 0,
'passed': 0,
'failed': 0,
'warnings': 0,
'compliance_score': 0
}
}
for standard in standards:
standard_results = self._scan_standard(standard)
scan_results['results'][standard.value] = standard_results
# Update summary
scan_results['summary']['total_rules'] += len(standard_results)
scan_results['summary']['passed'] += sum(1 for r in standard_results if r['status'] == 'passed')
scan_results['summary']['failed'] += sum(1 for r in standard_results if r['status'] == 'failed')
scan_results['summary']['warnings'] += sum(1 for r in standard_results if r['status'] == 'warning')
# Calculate compliance score
total_rules = scan_results['summary']['total_rules']
if total_rules > 0:
scan_results['summary']['compliance_score'] = (
scan_results['summary']['passed'] / total_rules * 100
)
self.scan_results = scan_results
return scan_results
except Exception as e:
self.logger.error(f"Compliance scan failed: {e}")
return {}
def _scan_standard(self, standard: ComplianceStandard) -> List[Dict[str, Any]]:
"""Scan for specific compliance standard"""
results = []
for rule in self.compliance_rules:
if rule.standard == standard:
try:
# Execute check function
check_method = getattr(self, rule.check_function, None)
if check_method:
check_result = check_method()
status = 'passed' if check_result else 'failed'
else:
status = 'warning'
check_result = False
results.append({
'rule_id': rule.rule_id,
'description': rule.description,
'category': rule.category,
'severity': rule.severity,
'status': status,
'details': check_result,
'remediation': rule.remediation if status == 'failed' else None
})
except Exception as e:
self.logger.error(f"Check failed for rule {rule.rule_id}: {e}")
results.append({
'rule_id': rule.rule_id,
'description': rule.description,
'category': rule.category,
'severity': rule.severity,
'status': 'error',
'details': str(e),
'remediation': rule.remediation
})
return results
# Compliance check functions
def check_encryption_at_rest(self) -> bool:
"""Check if data encryption at rest is enabled"""
# Implementation depends on database type
return True # Placeholder
def check_audit_logging(self) -> bool:
"""Check if comprehensive audit logging is enabled"""
# Implementation depends on database type
return True # Placeholder
def check_data_retention_policies(self) -> bool:
"""Check if data retention policies are implemented"""
# Check for automated data retention mechanisms
return False # Placeholder
def check_unique_user_identification(self) -> bool:
"""Check for unique user identification"""
# Verify no shared accounts exist
return True # Placeholder
def check_phi_audit_logging(self) -> bool:
"""Check PHI access audit logging"""
# Verify PHI access is logged
return True # Placeholder
def check_cardholder_data_encryption(self) -> bool:
"""Check cardholder data encryption"""
# Verify cardholder data is encrypted
return False # Placeholder
def check_cardholder_data_access(self) -> bool:
"""Check cardholder data access controls"""
# Verify access controls for cardholder data
return True # Placeholder
def generate_compliance_report(self, output_format: str = 'json') -> str:
"""Generate compliance report"""
try:
if not self.scan_results:
raise ValueError("No scan results available. Run compliance scan first.")
if output_format == 'json':
return json.dumps(self.scan_results, indent=2)
elif output_format == 'html':
return self._generate_html_report()
elif output_format == 'csv':
return self._generate_csv_report()
else:
raise ValueError(f"Unsupported output format: {output_format}")
except Exception as e:
self.logger.error(f"Report generation failed: {e}")
return ""
def _generate_html_report(self) -> str:
"""Generate HTML compliance report"""
html_template = """
<!DOCTYPE html>
<html>
<head>
<title>Database Compliance Report</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.header { background-color: #f0f0f0; padding: 20px; border-radius: 5px; }
.summary { margin: 20px 0; }
.standard { margin: 20px 0; border: 1px solid #ddd; border-radius: 5px; }
.standard-header { background-color: #e0e0e0; padding: 10px; font-weight: bold; }
.rule { padding: 10px; border-bottom: 1px solid #eee; }
.passed { color: green; }
.failed { color: red; }
.warning { color: orange; }
.error { color: purple; }
</style>
</head>
<body>
<div class="header">
<h1>Database Compliance Report</h1>
<p>Generated: {timestamp}</p>
<p>Scan ID: {scan_id}</p>
</div>
<div class="summary">
<h2>Summary</h2>
<p>Compliance Score: {compliance_score:.1f}%</p>
<p>Total Rules: {total_rules}</p>
<p>Passed: {passed} | Failed: {failed} | Warnings: {warnings}</p>
</div>
{standards_content}
</body>
</html>
"""
# Generate content for each standard
standards_content = ""
for standard, results in self.scan_results['results'].items():
standards_content += f'<div class="standard">'
standards_content += f'<div class="standard-header">{standard.upper()}</div>'
for rule in results:
status_class = rule['status']
standards_content += f'''
<div class="rule">
<strong class="{status_class}">[{rule['status'].upper()}]</strong>
{rule['rule_id']}: {rule['description']}
{f"<br><em>Remediation: {rule['remediation']}</em>" if rule.get('remediation') else ""}
</div>
'''
standards_content += '</div>'
return html_template.format(
timestamp=self.scan_results['timestamp'],
scan_id=self.scan_results['scan_id'],
compliance_score=self.scan_results['summary']['compliance_score'],
total_rules=self.scan_results['summary']['total_rules'],
passed=self.scan_results['summary']['passed'],
failed=self.scan_results['summary']['failed'],
warnings=self.scan_results['summary']['warnings'],
standards_content=standards_content
)
## Security Testing and Validation
### 1. Automated Security Testing
```python
import subprocess
import socket
import ssl
import requests
from typing import Dict, List, Any, Tuple
import time
import random
import string
class DatabaseSecurityTester:
"""Automated database security testing framework"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = logging.getLogger(__name__)
self.test_results = {}
def run_security_tests(self) -> Dict[str, Any]:
"""Run comprehensive security test suite"""
try:
test_results = {
'test_id': f"test_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
'timestamp': datetime.now().isoformat(),
'tests': {},
'summary': {
'total_tests': 0,
'passed': 0,
'failed': 0,
'security_score': 0
}
}
# Run different test categories
test_categories = [
('authentication', self.test_authentication_security),
('authorization', self.test_authorization_controls),
('encryption', self.test_encryption_implementation),
('injection', self.test_sql_injection_protection),
('network', self.test_network_security),
('configuration', self.test_security_configuration)
]
for category, test_function in test_categories:
category_results = test_function()
test_results['tests'][category] = category_results
# Update summary
test_results['summary']['total_tests'] += len(category_results)
test_results['summary']['passed'] += sum(1 for r in category_results if r['status'] == 'passed')
test_results['summary']['failed'] += sum(1 for r in category_results if r['status'] == 'failed')
# Calculate security score
total_tests = test_results['summary']['total_tests']
if total_tests > 0:
test_results['summary']['security_score'] = (
test_results['summary']['passed'] / total_tests * 100
)
self.test_results = test_results
return test_results
except Exception as e:
self.logger.error(f"Security testing failed: {e}")
return {}
def test_authentication_security(self) -> List[Dict[str, Any]]:
"""Test authentication security controls"""
tests = []
# Test 1: Weak password policy
tests.append(self._test_weak_passwords())
# Test 2: Account lockout policy
tests.append(self._test_account_lockout())
# Test 3: Multi-factor authentication
tests.append(self._test_mfa_enforcement())
# Test 4: Session management
tests.append(self._test_session_security())
return tests
def test_authorization_controls(self) -> List[Dict[str, Any]]:
"""Test authorization and access controls"""
tests = []
# Test 1: Privilege escalation
tests.append(self._test_privilege_escalation())
# Test 2: Role-based access control
tests.append(self._test_rbac_implementation())
# Test 3: Data access controls
tests.append(self._test_data_access_controls())
return tests
def test_encryption_implementation(self) -> List[Dict[str, Any]]:
"""Test encryption implementation"""
tests = []
# Test 1: Data at rest encryption
tests.append(self._test_encryption_at_rest())
# Test 2: Data in transit encryption
tests.append(self._test_encryption_in_transit())
# Test 3: Key management
tests.append(self._test_key_management())
return tests
def test_sql_injection_protection(self) -> List[Dict[str, Any]]:
"""Test SQL injection protection"""
tests = []
# Test various SQL injection patterns
injection_patterns = [
"' OR '1'='1",
"'; DROP TABLE users; --",
"' UNION SELECT * FROM information_schema.tables --",
"' AND 1=1 --",
"admin'--",
"' OR 1=1#"
]
for pattern in injection_patterns:
tests.append(self._test_sql_injection_pattern(pattern))
return tests
def test_network_security(self) -> List[Dict[str, Any]]:
"""Test network security controls"""
tests = []
# Test 1: SSL/TLS configuration
tests.append(self._test_ssl_configuration())
# Test 2: Port security
tests.append(self._test_port_security())
# Test 3: Firewall rules
tests.append(self._test_firewall_rules())
return tests
def test_security_configuration(self) -> List[Dict[str, Any]]:
"""Test security configuration"""
tests = []
# Test 1: Default accounts
tests.append(self._test_default_accounts())
# Test 2: Security parameters
tests.append(self._test_security_parameters())
# Test 3: Audit configuration
tests.append(self._test_audit_configuration())
return tests
def _test_weak_passwords(self) -> Dict[str, Any]:
"""Test for weak password policies"""
try:
# Attempt to create user with weak password
weak_passwords = ['123456', 'password', 'admin', 'test']
for password in weak_passwords:
# Test password creation (implementation depends on database)
# This is a simulation
if len(password) < 8:
return {
'test_name': 'Weak Password Policy',
'status': 'failed',
'details': f'Weak password "{password}" was accepted',
'recommendation': 'Implement strong password policy with minimum length, complexity requirements'
}
return {
'test_name': 'Weak Password Policy',
'status': 'passed',
'details': 'Strong password policy is enforced',
'recommendation': None
}
except Exception as e:
return {
'test_name': 'Weak Password Policy',
'status': 'error',
'details': str(e),
'recommendation': 'Review password policy configuration'
}
def _test_account_lockout(self) -> Dict[str, Any]:
"""Test account lockout policy"""
try:
# Simulate multiple failed login attempts
# Implementation depends on database and authentication system
return {
'test_name': 'Account Lockout Policy',
'status': 'passed',
'details': 'Account lockout policy is properly configured',
'recommendation': None
}
except Exception as e:
return {
'test_name': 'Account Lockout Policy',
'status': 'error',
'details': str(e),
'recommendation': 'Configure account lockout after failed attempts'
}
def _test_mfa_enforcement(self) -> Dict[str, Any]:
"""Test multi-factor authentication enforcement"""
try:
# Check if MFA is enforced for privileged accounts
# Implementation depends on authentication system
return {
'test_name': 'Multi-Factor Authentication',
'status': 'warning',
'details': 'MFA enforcement could not be verified',
'recommendation': 'Implement MFA for all privileged accounts'
}
except Exception as e:
return {
'test_name': 'Multi-Factor Authentication',
'status': 'error',
'details': str(e),
'recommendation': 'Review MFA configuration'
}
def _test_session_security(self) -> Dict[str, Any]:
"""Test session security controls"""
try:
# Test session timeout, secure cookies, etc.
# Implementation depends on application framework
return {
'test_name': 'Session Security',
'status': 'passed',
'details': 'Session security controls are properly implemented',
'recommendation': None
}
except Exception as e:
return {
'test_name': 'Session Security',
'status': 'error',
'details': str(e),
'recommendation': 'Review session management configuration'
}
def _test_privilege_escalation(self) -> Dict[str, Any]:
"""Test for privilege escalation vulnerabilities"""
try:
# Test various privilege escalation scenarios
# Implementation depends on database type and configuration
return {
'test_name': 'Privilege Escalation',
'status': 'passed',
'details': 'No privilege escalation vulnerabilities detected',
'recommendation': None
}
except Exception as e:
return {
'test_name': 'Privilege Escalation',
'status': 'error',
'details': str(e),
'recommendation': 'Review user privileges and access controls'
}
def _test_rbac_implementation(self) -> Dict[str, Any]:
"""Test role-based access control implementation"""
try:
# Verify RBAC is properly implemented
# Check role assignments, permissions, etc.
return {
'test_name': 'Role-Based Access Control',
'status': 'passed',
'details': 'RBAC is properly implemented',
'recommendation': None
}
except Exception as e:
return {
'test_name': 'Role-Based Access Control',
'status': 'error',
'details': str(e),
'recommendation': 'Implement proper RBAC system'
}
def _test_data_access_controls(self) -> Dict[str, Any]:
"""Test data access controls"""
try:
# Test row-level security, column-level security, etc.
# Implementation depends on database features
return {
'test_name': 'Data Access Controls',
'status': 'warning',
'details': 'Data access controls need review',
'recommendation': 'Implement row-level and column-level security'
}
except Exception as e:
return {
'test_name': 'Data Access Controls',
'status': 'error',
'details': str(e),
'recommendation': 'Review data access control implementation'
}
def _test_encryption_at_rest(self) -> Dict[str, Any]:
"""Test encryption at rest"""
try:
# Check if data is encrypted at rest
# Implementation depends on database and storage configuration
return {
'test_name': 'Encryption at Rest',
'status': 'passed',
'details': 'Data encryption at rest is enabled',
'recommendation': None
}
except Exception as e:
return {
'test_name': 'Encryption at Rest',
'status': 'error',
'details': str(e),
'recommendation': 'Enable transparent data encryption (TDE)'
}
def _test_encryption_in_transit(self) -> Dict[str, Any]:
"""Test encryption in transit"""
try:
# Check SSL/TLS configuration
host = self.config.get('host', 'localhost')
port = self.config.get('port', 3306)
# Test SSL connection
context = ssl.create_default_context()
with socket.create_connection((host, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=host) as ssock:
cipher = ssock.cipher()
if cipher:
return {
'test_name': 'Encryption in Transit',
'status': 'passed',
'details': f'SSL/TLS enabled with cipher: {cipher[0]}',
'recommendation': None
}
return {
'test_name': 'Encryption in Transit',
'status': 'failed',
'details': 'SSL/TLS not properly configured',
'recommendation': 'Enable SSL/TLS for all database connections'
}
except Exception as e:
return {
'test_name': 'Encryption in Transit',
'status': 'error',
'details': str(e),
'recommendation': 'Configure SSL/TLS for database connections'
}
def _test_key_management(self) -> Dict[str, Any]:
"""Test key management practices"""
try:
# Test key rotation, storage, access controls
# Implementation depends on key management system
return {
'test_name': 'Key Management',
'status': 'warning',
'details': 'Key management practices need review',
'recommendation': 'Implement proper key management with rotation and secure storage'
}
except Exception as e:
return {
'test_name': 'Key Management',
'status': 'error',
'details': str(e),
'recommendation': 'Review key management implementation'
}
def _test_sql_injection_pattern(self, pattern: str) -> Dict[str, Any]:
"""Test specific SQL injection pattern"""
try:
# Test SQL injection pattern
# This would typically involve sending the pattern to the application
# and checking if it's properly sanitized
test_name = f"SQL Injection: {pattern[:20]}..."
# Simulate testing (in real implementation, this would test actual endpoints)
if "DROP" in pattern.upper():
return {
'test_name': test_name,
'status': 'passed',
'details': 'SQL injection attempt was blocked',
'recommendation': None
}
return {
'test_name': test_name,
'status': 'passed',
'details': 'SQL injection protection is working',
'recommendation': None
}
except Exception as e:
return {
'test_name': f"SQL Injection: {pattern[:20]}...",
'status': 'error',
'details': str(e),
'recommendation': 'Implement proper input validation and parameterized queries'
}
def _test_ssl_configuration(self) -> Dict[str, Any]:
"""Test SSL/TLS configuration"""
try:
# Test SSL configuration strength
host = self.config.get('host', 'localhost')
port = self.config.get('ssl_port', 443)
context = ssl.create_default_context()
with socket.create_connection((host, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=host) as ssock:
protocol = ssock.version()
cipher = ssock.cipher()
# Check for strong protocols and ciphers
if protocol in ['TLSv1.2', 'TLSv1.3'] and cipher:
return {
'test_name': 'SSL/TLS Configuration',
'status': 'passed',
'details': f'Strong SSL/TLS configuration: {protocol}, {cipher[0]}',
'recommendation': None
}
else:
return {
'test_name': 'SSL/TLS Configuration',
'status': 'failed',
'details': f'Weak SSL/TLS configuration: {protocol}',
'recommendation': 'Upgrade to TLS 1.2 or higher with strong ciphers'
}
except Exception as e:
return {
'test_name': 'SSL/TLS Configuration',
'status': 'error',
'details': str(e),
'recommendation': 'Configure SSL/TLS properly'
}
def _test_port_security(self) -> Dict[str, Any]:
"""Test port security"""
try:
# Test for unnecessary open ports
host = self.config.get('host', 'localhost')
common_ports = [22, 23, 80, 443, 3306, 5432, 1521, 1433]
open_ports = []
for port in common_ports:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((host, port))
if result == 0:
open_ports.append(port)
sock.close()
# Check if only necessary ports are open
necessary_ports = self.config.get('necessary_ports', [])
unnecessary_ports = [p for p in open_ports if p not in necessary_ports]
if unnecessary_ports:
return {
'test_name': 'Port Security',
'status': 'warning',
'details': f'Unnecessary ports open: {unnecessary_ports}',
'recommendation': 'Close unnecessary ports and use firewall rules'
}
else:
return {
'test_name': 'Port Security',
'status': 'passed',
'details': 'Only necessary ports are open',
'recommendation': None
}
except Exception as e:
return {
'test_name': 'Port Security',
'status': 'error',
'details': str(e),
'recommendation': 'Review port configuration and firewall rules'
}
def _test_firewall_rules(self) -> Dict[str, Any]:
"""Test firewall rules"""
try:
# Test firewall configuration
# This would typically involve checking iptables, ufw, or cloud security groups
return {
'test_name': 'Firewall Rules',
'status': 'warning',
'details': 'Firewall configuration needs manual review',
'recommendation': 'Ensure proper firewall rules are in place'
}
except Exception as e:
return {
'test_name': 'Firewall Rules',
'status': 'error',
'details': str(e),
'recommendation': 'Configure proper firewall rules'
}
def _test_default_accounts(self) -> Dict[str, Any]:
"""Test for default accounts"""
try:
# Check for default database accounts
default_accounts = ['root', 'admin', 'sa', 'postgres', 'mysql']
# This would typically query the database for user accounts
# Implementation depends on database type
return {
'test_name': 'Default Accounts',
'status': 'passed',
'details': 'No default accounts with default passwords found',
'recommendation': None
}
except Exception as e:
return {
'test_name': 'Default Accounts',
'status': 'error',
'details': str(e),
'recommendation': 'Remove or secure default accounts'
}
def _test_security_parameters(self) -> Dict[str, Any]:
"""Test security parameters"""
try:
# Check database security parameters
# Implementation depends on database type
return {
'test_name': 'Security Parameters',
'status': 'warning',
'details': 'Security parameters need review',
'recommendation': 'Review and harden database security parameters'
}
except Exception as e:
return {
'test_name': 'Security Parameters',
'status': 'error',
'details': str(e),
'recommendation': 'Configure security parameters properly'
}
def _test_audit_configuration(self) -> Dict[str, Any]:
"""Test audit configuration"""
try:
# Check audit logging configuration
# Implementation depends on database type
return {
'test_name': 'Audit Configuration',
'status': 'passed',
'details': 'Audit logging is properly configured',
'recommendation': None
}
except Exception as e:
return {
'test_name': 'Audit Configuration',
'status': 'error',
'details': str(e),
'recommendation': 'Configure comprehensive audit logging'
}
# Example usage
def main():
config = {
'host': 'localhost',
'port': 3306,
'ssl_port': 443,
'necessary_ports': [22, 3306, 443],
'database_type': 'mysql'
}
# Initialize security tester
security_tester = DatabaseSecurityTester(config)
# Run security tests
test_results = security_tester.run_security_tests()
print("Database Security Test Results:")
print(f"Security Score: {test_results['summary']['security_score']:.1f}%")
print(f"Tests Passed: {test_results['summary']['passed']}")
print(f"Tests Failed: {test_results['summary']['failed']}")
# Initialize compliance manager
compliance_manager = DatabaseComplianceManager(config)
# Run compliance scan
compliance_results = compliance_manager.run_compliance_scan()
print("\nCompliance Scan Results:")
print(f"Compliance Score: {compliance_results['summary']['compliance_score']:.1f}%")
print(f"Rules Passed: {compliance_results['summary']['passed']}")
print(f"Rules Failed: {compliance_results['summary']['failed']}")
# Generate reports
security_report = json.dumps(test_results, indent=2)
compliance_report = compliance_manager.generate_compliance_report('html')
print("\nReports generated successfully!")
if __name__ == "__main__":
main()
Summary
This comprehensive database security and access control guide covers essential aspects of protecting database systems in modern environments. The implementation includes:
Core Security Elements
-
Multi-Layer Security Architecture: Comprehensive security model covering application, authentication, authorization, network, database, storage, and physical layers.
-
Authentication and Authorization: Advanced multi-factor authentication system with TOTP, LDAP integration, risk-based authentication, and comprehensive RBAC implementation.
-
Data Encryption: Complete encryption management system supporting multiple algorithms, key rotation, column-level encryption, and format-preserving encryption.
-
Access Control Implementation: Database-level and application-level access controls with row-level security, security definer functions, session management, and rate limiting.
-
Auditing and Monitoring: Comprehensive audit system with real-time monitoring, automated triggers, security event detection, and alert generation.
-
Compliance and Governance: Framework supporting GDPR, HIPAA, PCI DSS, SOX, and ISO27001 with automated compliance scanning and reporting.
-
Security Testing and Validation: Automated security testing framework covering authentication, authorization, encryption, injection protection, network security, and configuration validation.
Best Practices
- Defense in Depth: Implement multiple layers of security controls
- Principle of Least Privilege: Grant minimum necessary access rights
- Regular Security Assessments: Conduct periodic security scans and compliance checks
- Continuous Monitoring: Implement real-time security monitoring and alerting
- Incident Response: Maintain documented incident response procedures
- Security Training: Provide regular security awareness training
- Documentation: Maintain comprehensive security documentation and procedures
This guide provides a solid foundation for implementing robust database security measures that protect sensitive data while maintaining compliance with industry standards and regulations.