数据库安全与权限管理:从基础防护到企业级安全策略的完整指南
引言
数据库安全是现代企业信息安全体系的核心组成部分。随着数据泄露事件频发和监管要求日趋严格,建立完善的数据库安全防护体系变得至关重要。本文将深入探讨数据库安全的各个层面,从基础的访问控制到高级的安全策略实施,为企业提供全面的数据库安全解决方案。
数据库安全架构概述
安全防护层次模型
graph TB
subgraph "数据库安全架构"
A[网络安全层] --> B[主机安全层]
B --> C[数据库安全层]
C --> D[应用安全层]
D --> E[数据安全层]
subgraph "网络安全"
F[防火墙]
G[VPN]
H[网络隔离]
end
subgraph "主机安全"
I[操作系统加固]
J[访问控制]
K[日志审计]
end
subgraph "数据库安全"
L[身份认证]
M[权限管理]
N[数据加密]
O[审计监控]
end
subgraph "应用安全"
P[SQL注入防护]
Q[连接池安全]
R[API安全]
end
subgraph "数据安全"
S[敏感数据识别]
T[数据脱敏]
U[数据分类分级]
end
end
身份认证与访问控制
1. 多层身份认证系统
#!/usr/bin/env python3
# scripts/database_auth_manager.py
import hashlib
import secrets
import time
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import mysql.connector
import psycopg2
from cryptography.fernet import Fernet
import ldap3
class DatabaseAuthManager:
def __init__(self, config_file: str):
with open(config_file, 'r') as f:
self.config = json.load(f)
self.logger = self._setup_logging()
self.encryption_key = Fernet.generate_key()
self.cipher_suite = Fernet(self.encryption_key)
# 连接数据库
self.db_connections = self._initialize_db_connections()
# LDAP配置
self.ldap_server = self.config.get('ldap', {}).get('server')
self.ldap_base_dn = self.config.get('ldap', {}).get('base_dn')
def _setup_logging(self) -> logging.Logger:
"""设置日志记录"""
logger = logging.getLogger('DatabaseAuthManager')
logger.setLevel(logging.INFO)
handler = logging.FileHandler('/var/log/database_auth.log')
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def _initialize_db_connections(self) -> Dict:
"""初始化数据库连接"""
connections = {}
for db_name, db_config in self.config['databases'].items():
try:
if db_config['type'] == 'mysql':
conn = mysql.connector.connect(
host=db_config['host'],
port=db_config['port'],
user=db_config['admin_user'],
password=db_config['admin_password'],
database=db_config['database']
)
elif db_config['type'] == 'postgresql':
conn = psycopg2.connect(
host=db_config['host'],
port=db_config['port'],
user=db_config['admin_user'],
password=db_config['admin_password'],
database=db_config['database']
)
connections[db_name] = conn
self.logger.info(f"成功连接到数据库: {db_name}")
except Exception as e:
self.logger.error(f"连接数据库 {db_name} 失败: {e}")
return connections
def create_user_account(self, username: str, password: str, email: str,
role: str, database: str) -> bool:
"""创建用户账户"""
try:
# 生成密码哈希
salt = secrets.token_hex(16)
password_hash = hashlib.pbkdf2_hmac('sha256',
password.encode('utf-8'),
salt.encode('utf-8'),
100000)
# 在认证数据库中创建用户记录
auth_conn = self.db_connections.get('auth_db')
if not auth_conn:
raise Exception("认证数据库连接不可用")
cursor = auth_conn.cursor()
# 检查用户是否已存在
cursor.execute(
"SELECT username FROM users WHERE username = %s",
(username,)
)
if cursor.fetchone():
self.logger.warning(f"用户 {username} 已存在")
return False
# 插入用户记录
cursor.execute("""
INSERT INTO users (username, password_hash, salt, email, role,
created_at, last_login, is_active, failed_attempts)
VALUES (%s, %s, %s, %s, %s, %s, NULL, TRUE, 0)
""", (username, password_hash.hex(), salt, email, role, datetime.now()))
auth_conn.commit()
# 在目标数据库中创建数据库用户
self._create_database_user(username, password, role, database)
self.logger.info(f"成功创建用户: {username}")
return True
except Exception as e:
self.logger.error(f"创建用户 {username} 失败: {e}")
return False
def _create_database_user(self, username: str, password: str,
role: str, database: str):
"""在数据库中创建用户"""
conn = self.db_connections.get(database)
if not conn:
raise Exception(f"数据库 {database} 连接不可用")
cursor = conn.cursor()
# 根据数据库类型执行不同的创建用户命令
db_config = self.config['databases'][database]
if db_config['type'] == 'mysql':
# MySQL用户创建
cursor.execute(f"CREATE USER '{username}'@'%' IDENTIFIED BY '{password}'")
# 根据角色分配权限
if role == 'admin':
cursor.execute(f"GRANT ALL PRIVILEGES ON *.* TO '{username}'@'%'")
elif role == 'developer':
cursor.execute(f"GRANT SELECT, INSERT, UPDATE, DELETE ON {db_config['database']}.* TO '{username}'@'%'")
elif role == 'readonly':
cursor.execute(f"GRANT SELECT ON {db_config['database']}.* TO '{username}'@'%'")
cursor.execute("FLUSH PRIVILEGES")
elif db_config['type'] == 'postgresql':
# PostgreSQL用户创建
cursor.execute(f"CREATE USER {username} WITH PASSWORD '{password}'")
# 根据角色分配权限
if role == 'admin':
cursor.execute(f"ALTER USER {username} CREATEDB CREATEROLE")
cursor.execute(f"GRANT ALL PRIVILEGES ON DATABASE {db_config['database']} TO {username}")
elif role == 'developer':
cursor.execute(f"GRANT CONNECT ON DATABASE {db_config['database']} TO {username}")
cursor.execute(f"GRANT USAGE ON SCHEMA public TO {username}")
cursor.execute(f"GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO {username}")
elif role == 'readonly':
cursor.execute(f"GRANT CONNECT ON DATABASE {db_config['database']} TO {username}")
cursor.execute(f"GRANT USAGE ON SCHEMA public TO {username}")
cursor.execute(f"GRANT SELECT ON ALL TABLES IN SCHEMA public TO {username}")
conn.commit()
def authenticate_user(self, username: str, password: str,
client_ip: str = None) -> Tuple[bool, Dict]:
"""用户身份认证"""
try:
auth_conn = self.db_connections.get('auth_db')
cursor = auth_conn.cursor(dictionary=True)
# 获取用户信息
cursor.execute("""
SELECT username, password_hash, salt, role, is_active,
failed_attempts, last_failed_attempt
FROM users WHERE username = %s
""", (username,))
user = cursor.fetchone()
if not user:
self.logger.warning(f"用户 {username} 不存在,IP: {client_ip}")
return False, {"error": "用户不存在"}
if not user['is_active']:
self.logger.warning(f"用户 {username} 已被禁用,IP: {client_ip}")
return False, {"error": "用户已被禁用"}
# 检查账户锁定状态
if user['failed_attempts'] >= 5:
last_failed = user['last_failed_attempt']
if last_failed and (datetime.now() - last_failed).seconds < 1800: # 30分钟锁定
self.logger.warning(f"用户 {username} 账户被锁定,IP: {client_ip}")
return False, {"error": "账户已被锁定"}
# 验证密码
password_hash = hashlib.pbkdf2_hmac('sha256',
password.encode('utf-8'),
user['salt'].encode('utf-8'),
100000)
if password_hash.hex() != user['password_hash']:
# 记录失败尝试
cursor.execute("""
UPDATE users SET failed_attempts = failed_attempts + 1,
last_failed_attempt = %s
WHERE username = %s
""", (datetime.now(), username))
auth_conn.commit()
self.logger.warning(f"用户 {username} 密码错误,IP: {client_ip}")
return False, {"error": "密码错误"}
# 认证成功,重置失败计数并更新最后登录时间
cursor.execute("""
UPDATE users SET failed_attempts = 0,
last_login = %s,
last_failed_attempt = NULL
WHERE username = %s
""", (datetime.now(), username))
auth_conn.commit()
# 记录登录日志
self._log_user_activity(username, 'login', client_ip, True)
self.logger.info(f"用户 {username} 认证成功,IP: {client_ip}")
return True, {
"username": user['username'],
"role": user['role'],
"session_token": self._generate_session_token(username)
}
except Exception as e:
self.logger.error(f"认证用户 {username} 时发生错误: {e}")
return False, {"error": "认证服务异常"}
def _generate_session_token(self, username: str) -> str:
"""生成会话令牌"""
token_data = {
"username": username,
"timestamp": datetime.now().isoformat(),
"random": secrets.token_hex(16)
}
token_json = json.dumps(token_data)
encrypted_token = self.cipher_suite.encrypt(token_json.encode())
return encrypted_token.decode()
def validate_session_token(self, token: str) -> Tuple[bool, str]:
"""验证会话令牌"""
try:
decrypted_token = self.cipher_suite.decrypt(token.encode())
token_data = json.loads(decrypted_token.decode())
# 检查令牌时效性(24小时)
token_time = datetime.fromisoformat(token_data['timestamp'])
if (datetime.now() - token_time).total_seconds() > 86400:
return False, "令牌已过期"
return True, token_data['username']
except Exception as e:
self.logger.error(f"验证会话令牌失败: {e}")
return False, "无效令牌"
def ldap_authenticate(self, username: str, password: str) -> Tuple[bool, Dict]:
"""LDAP身份认证"""
if not self.ldap_server:
return False, {"error": "LDAP未配置"}
try:
server = ldap3.Server(self.ldap_server)
user_dn = f"uid={username},{self.ldap_base_dn}"
conn = ldap3.Connection(server, user_dn, password, auto_bind=True)
# 获取用户属性
conn.search(user_dn, '(objectClass=*)', attributes=['cn', 'mail', 'memberOf'])
if conn.entries:
user_info = conn.entries[0]
groups = user_info.memberOf.values if hasattr(user_info, 'memberOf') else []
# 根据LDAP组映射数据库角色
role = self._map_ldap_groups_to_role(groups)
self.logger.info(f"LDAP认证成功: {username}")
return True, {
"username": username,
"email": str(user_info.mail) if hasattr(user_info, 'mail') else "",
"role": role,
"groups": groups
}
return False, {"error": "LDAP认证失败"}
except Exception as e:
self.logger.error(f"LDAP认证错误: {e}")
return False, {"error": "LDAP服务异常"}
def _map_ldap_groups_to_role(self, groups: List[str]) -> str:
"""映射LDAP组到数据库角色"""
group_role_mapping = self.config.get('ldap', {}).get('group_mapping', {})
for group in groups:
if group in group_role_mapping:
return group_role_mapping[group]
return 'readonly' # 默认只读权限
def _log_user_activity(self, username: str, activity: str,
client_ip: str, success: bool):
"""记录用户活动日志"""
try:
auth_conn = self.db_connections.get('auth_db')
cursor = auth_conn.cursor()
cursor.execute("""
INSERT INTO user_activity_log (username, activity, client_ip,
success, timestamp)
VALUES (%s, %s, %s, %s, %s)
""", (username, activity, client_ip, success, datetime.now()))
auth_conn.commit()
except Exception as e:
self.logger.error(f"记录用户活动日志失败: {e}")
def get_user_permissions(self, username: str, database: str) -> List[str]:
"""获取用户权限列表"""
try:
conn = self.db_connections.get(database)
cursor = conn.cursor()
db_config = self.config['databases'][database]
if db_config['type'] == 'mysql':
cursor.execute("""
SELECT PRIVILEGE_TYPE, TABLE_SCHEMA, TABLE_NAME
FROM information_schema.USER_PRIVILEGES
WHERE GRANTEE = %s
UNION
SELECT PRIVILEGE_TYPE, TABLE_SCHEMA, TABLE_NAME
FROM information_schema.SCHEMA_PRIVILEGES
WHERE GRANTEE = %s
UNION
SELECT PRIVILEGE_TYPE, TABLE_SCHEMA, TABLE_NAME
FROM information_schema.TABLE_PRIVILEGES
WHERE GRANTEE = %s
""", (f"'{username}'@'%'", f"'{username}'@'%'", f"'{username}'@'%'"))
elif db_config['type'] == 'postgresql':
cursor.execute("""
SELECT privilege_type, table_schema, table_name
FROM information_schema.table_privileges
WHERE grantee = %s
""", (username,))
permissions = cursor.fetchall()
return [f"{perm[0]} on {perm[1]}.{perm[2]}" for perm in permissions]
except Exception as e:
self.logger.error(f"获取用户 {username} 权限失败: {e}")
return []
def revoke_user_access(self, username: str, database: str) -> bool:
"""撤销用户访问权限"""
try:
conn = self.db_connections.get(database)
cursor = conn.cursor()
db_config = self.config['databases'][database]
if db_config['type'] == 'mysql':
cursor.execute(f"DROP USER '{username}'@'%'")
cursor.execute("FLUSH PRIVILEGES")
elif db_config['type'] == 'postgresql':
cursor.execute(f"DROP USER {username}")
conn.commit()
# 在认证数据库中标记用户为非活跃
auth_conn = self.db_connections.get('auth_db')
auth_cursor = auth_conn.cursor()
auth_cursor.execute(
"UPDATE users SET is_active = FALSE WHERE username = %s",
(username,)
)
auth_conn.commit()
self.logger.info(f"成功撤销用户 {username} 的访问权限")
return True
except Exception as e:
self.logger.error(f"撤销用户 {username} 权限失败: {e}")
return False
def main():
# 配置文件示例
config = {
"databases": {
"auth_db": {
"type": "mysql",
"host": "localhost",
"port": 3306,
"database": "auth_system",
"admin_user": "admin",
"admin_password": "admin_password"
},
"production_db": {
"type": "postgresql",
"host": "localhost",
"port": 5432,
"database": "production",
"admin_user": "postgres",
"admin_password": "postgres_password"
}
},
"ldap": {
"server": "ldap://ldap.company.com",
"base_dn": "ou=users,dc=company,dc=com",
"group_mapping": {
"cn=dba,ou=groups,dc=company,dc=com": "admin",
"cn=developers,ou=groups,dc=company,dc=com": "developer",
"cn=analysts,ou=groups,dc=company,dc=com": "readonly"
}
}
}
# 保存配置文件
with open('/tmp/auth_config.json', 'w') as f:
json.dump(config, f, indent=2)
# 初始化认证管理器
auth_manager = DatabaseAuthManager('/tmp/auth_config.json')
# 示例:创建用户
auth_manager.create_user_account(
username="john_doe",
password="secure_password123",
email="john@company.com",
role="developer",
database="production_db"
)
# 示例:用户认证
success, result = auth_manager.authenticate_user("john_doe", "secure_password123", "192.168.1.100")
if success:
print(f"认证成功: {result}")
else:
print(f"认证失败: {result}")
if __name__ == "__main__":
main()
2. 基于角色的访问控制(RBAC)
-- 创建角色权限管理系统
-- 创建角色表
CREATE TABLE roles (
role_id INT PRIMARY KEY AUTO_INCREMENT,
role_name VARCHAR(50) UNIQUE NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- 创建权限表
CREATE TABLE permissions (
permission_id INT PRIMARY KEY AUTO_INCREMENT,
permission_name VARCHAR(100) UNIQUE NOT NULL,
resource_type VARCHAR(50) NOT NULL,
action VARCHAR(50) NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 创建角色权限关联表
CREATE TABLE role_permissions (
role_id INT,
permission_id INT,
granted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
granted_by VARCHAR(50),
PRIMARY KEY (role_id, permission_id),
FOREIGN KEY (role_id) REFERENCES roles(role_id) ON DELETE CASCADE,
FOREIGN KEY (permission_id) REFERENCES permissions(permission_id) ON DELETE CASCADE
);
-- 创建用户角色关联表
CREATE TABLE user_roles (
user_id INT,
role_id INT,
assigned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
assigned_by VARCHAR(50),
expires_at TIMESTAMP NULL,
is_active BOOLEAN DEFAULT TRUE,
PRIMARY KEY (user_id, role_id),
FOREIGN KEY (user_id) REFERENCES users(user_id) ON DELETE CASCADE,
FOREIGN KEY (role_id) REFERENCES roles(role_id) ON DELETE CASCADE
);
-- 插入基础角色
INSERT INTO roles (role_name, description) VALUES
('super_admin', '超级管理员,拥有所有权限'),
('dba', '数据库管理员,负责数据库维护和管理'),
('developer', '开发人员,可以进行数据的增删改查'),
('analyst', '数据分析师,只能查询和导出数据'),
('readonly', '只读用户,仅能查看数据'),
('auditor', '审计员,可以查看日志和审计信息');
-- 插入基础权限
INSERT INTO permissions (permission_name, resource_type, action, description) VALUES
-- 数据库级权限
('database.create', 'database', 'create', '创建数据库'),
('database.drop', 'database', 'drop', '删除数据库'),
('database.alter', 'database', 'alter', '修改数据库结构'),
-- 表级权限
('table.create', 'table', 'create', '创建表'),
('table.drop', 'table', 'drop', '删除表'),
('table.alter', 'table', 'alter', '修改表结构'),
('table.select', 'table', 'select', '查询表数据'),
('table.insert', 'table', 'insert', '插入表数据'),
('table.update', 'table', 'update', '更新表数据'),
('table.delete', 'table', 'delete', '删除表数据'),
-- 用户管理权限
('user.create', 'user', 'create', '创建用户'),
('user.drop', 'user', 'drop', '删除用户'),
('user.grant', 'user', 'grant', '授予权限'),
('user.revoke', 'user', 'revoke', '撤销权限'),
-- 系统管理权限
('system.backup', 'system', 'backup', '数据库备份'),
('system.restore', 'system', 'restore', '数据库恢复'),
('system.monitor', 'system', 'monitor', '系统监控'),
('system.audit', 'system', 'audit', '审计日志查看');
-- 为角色分配权限
-- 超级管理员拥有所有权限
INSERT INTO role_permissions (role_id, permission_id, granted_by)
SELECT r.role_id, p.permission_id, 'system'
FROM roles r, permissions p
WHERE r.role_name = 'super_admin';
-- DBA权限
INSERT INTO role_permissions (role_id, permission_id, granted_by)
SELECT r.role_id, p.permission_id, 'system'
FROM roles r, permissions p
WHERE r.role_name = 'dba'
AND p.permission_name IN (
'database.create', 'database.alter',
'table.create', 'table.drop', 'table.alter',
'table.select', 'table.insert', 'table.update', 'table.delete',
'user.create', 'user.drop', 'user.grant', 'user.revoke',
'system.backup', 'system.restore', 'system.monitor'
);
-- 开发人员权限
INSERT INTO role_permissions (role_id, permission_id, granted_by)
SELECT r.role_id, p.permission_id, 'system'
FROM roles r, permissions p
WHERE r.role_name = 'developer'
AND p.permission_name IN (
'table.select', 'table.insert', 'table.update', 'table.delete',
'table.create', 'table.alter'
);
-- 分析师权限
INSERT INTO role_permissions (role_id, permission_id, granted_by)
SELECT r.role_id, p.permission_id, 'system'
FROM roles r, permissions p
WHERE r.role_name = 'analyst'
AND p.permission_name IN ('table.select');
-- 只读用户权限
INSERT INTO role_permissions (role_id, permission_id, granted_by)
SELECT r.role_id, p.permission_id, 'system'
FROM roles r, permissions p
WHERE r.role_name = 'readonly'
AND p.permission_name IN ('table.select');
-- 审计员权限
INSERT INTO role_permissions (role_id, permission_id, granted_by)
SELECT r.role_id, p.permission_id, 'system'
FROM roles r, permissions p
WHERE r.role_name = 'auditor'
AND p.permission_name IN ('table.select', 'system.audit');
-- 创建权限检查函数
DELIMITER //
CREATE FUNCTION check_user_permission(
p_username VARCHAR(50),
p_permission VARCHAR(100)
) RETURNS BOOLEAN
READS SQL DATA
DETERMINISTIC
BEGIN
DECLARE permission_count INT DEFAULT 0;
SELECT COUNT(*) INTO permission_count
FROM users u
JOIN user_roles ur ON u.user_id = ur.user_id
JOIN role_permissions rp ON ur.role_id = rp.role_id
JOIN permissions p ON rp.permission_id = p.permission_id
WHERE u.username = p_username
AND p.permission_name = p_permission
AND ur.is_active = TRUE
AND (ur.expires_at IS NULL OR ur.expires_at > NOW())
AND u.is_active = TRUE;
RETURN permission_count > 0;
END //
DELIMITER ;
-- 创建用户权限视图
CREATE VIEW user_permissions_view AS
SELECT
u.username,
u.email,
r.role_name,
p.permission_name,
p.resource_type,
p.action,
ur.assigned_at,
ur.expires_at,
ur.is_active
FROM users u
JOIN user_roles ur ON u.user_id = ur.user_id
JOIN roles r ON ur.role_id = r.role_id
JOIN role_permissions rp ON r.role_id = rp.role_id
JOIN permissions p ON rp.permission_id = p.permission_id
WHERE u.is_active = TRUE
AND ur.is_active = TRUE
AND (ur.expires_at IS NULL OR ur.expires_at > NOW());
数据加密与脱敏
1. 数据加密实现
#!/usr/bin/env python3
# scripts/database_encryption.py
import os
import json
import base64
import hashlib
from typing import Dict, Any, Optional, List
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import mysql.connector
import psycopg2
class DatabaseEncryption:
def __init__(self, config_file: str):
with open(config_file, 'r') as f:
self.config = json.load(f)
# 初始化加密密钥
self.master_key = self._derive_master_key()
self.field_keys = self._generate_field_keys()
# 数据库连接
self.db_connections = self._initialize_db_connections()
def _derive_master_key(self) -> bytes:
"""派生主密钥"""
password = self.config['encryption']['master_password'].encode()
salt = self.config['encryption']['salt'].encode()
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
return kdf.derive(password)
def _generate_field_keys(self) -> Dict[str, Fernet]:
"""为不同字段生成加密密钥"""
field_keys = {}
for field_name in self.config['encryption']['encrypted_fields']:
# 为每个字段生成唯一的密钥
field_salt = hashlib.sha256(f"{field_name}_{self.master_key.hex()}".encode()).digest()
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=field_salt,
iterations=100000,
)
field_key = kdf.derive(self.master_key)
field_keys[field_name] = Fernet(base64.urlsafe_b64encode(field_key))
return field_keys
def _initialize_db_connections(self) -> Dict:
"""初始化数据库连接"""
connections = {}
for db_name, db_config in self.config['databases'].items():
try:
if db_config['type'] == 'mysql':
conn = mysql.connector.connect(
host=db_config['host'],
port=db_config['port'],
user=db_config['user'],
password=db_config['password'],
database=db_config['database']
)
elif db_config['type'] == 'postgresql':
conn = psycopg2.connect(
host=db_config['host'],
port=db_config['port'],
user=db_config['user'],
password=db_config['password'],
database=db_config['database']
)
connections[db_name] = conn
except Exception as e:
print(f"连接数据库 {db_name} 失败: {e}")
return connections
def encrypt_field(self, field_name: str, value: str) -> str:
"""加密字段值"""
if field_name not in self.field_keys:
raise ValueError(f"字段 {field_name} 未配置加密")
if value is None:
return None
encrypted_value = self.field_keys[field_name].encrypt(value.encode())
return base64.b64encode(encrypted_value).decode()
def decrypt_field(self, field_name: str, encrypted_value: str) -> str:
"""解密字段值"""
if field_name not in self.field_keys:
raise ValueError(f"字段 {field_name} 未配置加密")
if encrypted_value is None:
return None
try:
encrypted_bytes = base64.b64decode(encrypted_value.encode())
decrypted_value = self.field_keys[field_name].decrypt(encrypted_bytes)
return decrypted_value.decode()
except Exception as e:
raise ValueError(f"解密失败: {e}")
def encrypt_sensitive_data(self, table_name: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""加密敏感数据"""
encrypted_data = data.copy()
table_config = self.config['tables'].get(table_name, {})
encrypted_fields = table_config.get('encrypted_fields', [])
for field_name in encrypted_fields:
if field_name in encrypted_data and encrypted_data[field_name] is not None:
encrypted_data[field_name] = self.encrypt_field(field_name, str(encrypted_data[field_name]))
return encrypted_data
def decrypt_sensitive_data(self, table_name: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""解密敏感数据"""
decrypted_data = data.copy()
table_config = self.config['tables'].get(table_name, {})
encrypted_fields = table_config.get('encrypted_fields', [])
for field_name in encrypted_fields:
if field_name in decrypted_data and decrypted_data[field_name] is not None:
decrypted_data[field_name] = self.decrypt_field(field_name, decrypted_data[field_name])
return decrypted_data
def create_encrypted_table(self, database: str, table_name: str, schema: Dict[str, str]):
"""创建支持加密的表"""
conn = self.db_connections[database]
cursor = conn.cursor()
# 构建CREATE TABLE语句
columns = []
for column_name, column_type in schema.items():
# 如果是加密字段,使用TEXT类型存储加密数据
if column_name in self.config['tables'].get(table_name, {}).get('encrypted_fields', []):
columns.append(f"{column_name} TEXT")
else:
columns.append(f"{column_name} {column_type}")
create_sql = f"CREATE TABLE {table_name} ({', '.join(columns)})"
cursor.execute(create_sql)
conn.commit()
print(f"创建加密表 {table_name} 成功")
def insert_encrypted_data(self, database: str, table_name: str, data: Dict[str, Any]):
"""插入加密数据"""
conn = self.db_connections[database]
cursor = conn.cursor()
# 加密敏感字段
encrypted_data = self.encrypt_sensitive_data(table_name, data)
# 构建INSERT语句
columns = list(encrypted_data.keys())
placeholders = ['%s'] * len(columns)
values = list(encrypted_data.values())
insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({', '.join(placeholders)})"
cursor.execute(insert_sql, values)
conn.commit()
print(f"插入加密数据到表 {table_name} 成功")
def select_encrypted_data(self, database: str, table_name: str,
where_clause: str = "", decrypt: bool = True) -> List[Dict[str, Any]]:
"""查询并解密数据"""
conn = self.db_connections[database]
cursor = conn.cursor(dictionary=True)
select_sql = f"SELECT * FROM {table_name}"
if where_clause:
select_sql += f" WHERE {where_clause}"
cursor.execute(select_sql)
results = cursor.fetchall()
if decrypt:
# 解密敏感字段
decrypted_results = []
for row in results:
decrypted_row = self.decrypt_sensitive_data(table_name, row)
decrypted_results.append(decrypted_row)
return decrypted_results
return results
def mask_sensitive_data(self, value: str, mask_type: str = 'partial') -> str:
"""数据脱敏"""
if value is None:
return None
if mask_type == 'full':
return '*' * len(value)
elif mask_type == 'partial':
if len(value) <= 4:
return '*' * len(value)
else:
return value[:2] + '*' * (len(value) - 4) + value[-2:]
elif mask_type == 'email':
if '@' in value:
local, domain = value.split('@', 1)
masked_local = local[0] + '*' * (len(local) - 1) if len(local) > 1 else '*'
return f"{masked_local}@{domain}"
else:
return self.mask_sensitive_data(value, 'partial')
elif mask_type == 'phone':
if len(value) >= 7:
return value[:3] + '*' * (len(value) - 6) + value[-3:]
else:
return '*' * len(value)
elif mask_type == 'id_card':
if len(value) >= 8:
return value[:4] + '*' * (len(value) - 8) + value[-4:]
else:
return '*' * len(value)
return value
def get_masked_data(self, database: str, table_name: str,
where_clause: str = "") -> List[Dict[str, Any]]:
"""获取脱敏数据"""
# 先获取解密数据
decrypted_data = self.select_encrypted_data(database, table_name, where_clause, decrypt=True)
# 应用脱敏规则
table_config = self.config['tables'].get(table_name, {})
masking_rules = table_config.get('masking_rules', {})
masked_data = []
for row in decrypted_data:
masked_row = row.copy()
for field_name, mask_type in masking_rules.items():
if field_name in masked_row and masked_row[field_name] is not None:
masked_row[field_name] = self.mask_sensitive_data(str(masked_row[field_name]), mask_type)
masked_data.append(masked_row)
return masked_data
def rotate_encryption_keys(self):
"""轮换加密密钥"""
print("开始密钥轮换...")
# 生成新的主密钥
new_master_key = os.urandom(32)
# 为每个加密字段生成新密钥
new_field_keys = {}
for field_name in self.config['encryption']['encrypted_fields']:
field_salt = hashlib.sha256(f"{field_name}_{new_master_key.hex()}".encode()).digest()
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=field_salt,
iterations=100000,
)
field_key = kdf.derive(new_master_key)
new_field_keys[field_name] = Fernet(base64.urlsafe_b64encode(field_key))
# 重新加密所有数据
for database_name in self.db_connections.keys():
for table_name, table_config in self.config['tables'].items():
encrypted_fields = table_config.get('encrypted_fields', [])
if not encrypted_fields:
continue
print(f"重新加密表 {table_name} 的数据...")
# 获取所有数据并解密
old_data = self.select_encrypted_data(database_name, table_name, decrypt=True)
# 使用新密钥重新加密
conn = self.db_connections[database_name]
cursor = conn.cursor()
for row in old_data:
# 使用新密钥加密
for field_name in encrypted_fields:
if field_name in row and row[field_name] is not None:
encrypted_value = new_field_keys[field_name].encrypt(str(row[field_name]).encode())
row[field_name] = base64.b64encode(encrypted_value).decode()
# 更新数据库记录
set_clause = ', '.join([f"{k} = %s" for k in row.keys() if k != 'id'])
update_sql = f"UPDATE {table_name} SET {set_clause} WHERE id = %s"
values = [row[k] for k in row.keys() if k != 'id'] + [row['id']]
cursor.execute(update_sql, values)
conn.commit()
# 更新密钥
self.master_key = new_master_key
self.field_keys = new_field_keys
print("密钥轮换完成")
def main():
# 配置文件示例
config = {
"encryption": {
"master_password": "your_very_secure_master_password",
"salt": "your_unique_salt_value",
"encrypted_fields": ["email", "phone", "id_card", "credit_card"]
},
"databases": {
"user_db": {
"type": "mysql",
"host": "localhost",
"port": 3306,
"user": "app_user",
"password": "app_password",
"database": "user_data"
}
},
"tables": {
"users": {
"encrypted_fields": ["email", "phone", "id_card"],
"masking_rules": {
"email": "email",
"phone": "phone",
"id_card": "id_card"
}
},
"payments": {
"encrypted_fields": ["credit_card"],
"masking_rules": {
"credit_card": "partial"
}
}
}
}
# 保存配置文件
with open('/tmp/encryption_config.json', 'w') as f:
json.dump(config, f, indent=2)
# 初始化加密管理器
encryption = DatabaseEncryption('/tmp/encryption_config.json')
# 示例:创建加密表
user_schema = {
"id": "INT PRIMARY KEY AUTO_INCREMENT",
"username": "VARCHAR(50) NOT NULL",
"email": "VARCHAR(100)",
"phone": "VARCHAR(20)",
"id_card": "VARCHAR(20)",
"created_at": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"
}
encryption.create_encrypted_table("user_db", "users", user_schema)
# 示例:插入加密数据
user_data = {
"username": "john_doe",
"email": "john.doe@example.com",
"phone": "13800138000",
"id_card": "110101199001011234"
}
encryption.insert_encrypted_data("user_db", "users", user_data)
# 示例:查询脱敏数据
masked_data = encryption.get_masked_data("user_db", "users")
print("脱敏数据:", masked_data)
if __name__ == "__main__":
main()
审计日志与监控
1. 数据库审计系统
#!/bin/bash
# scripts/database_audit_system.sh
set -euo pipefail
# 配置参数
AUDIT_DB_HOST="${AUDIT_DB_HOST:-localhost}"
AUDIT_DB_PORT="${AUDIT_DB_PORT:-3306}"
AUDIT_DB_USER="${AUDIT_DB_USER:-audit_user}"
AUDIT_DB_PASSWORD="${AUDIT_DB_PASSWORD:-audit_password}"
AUDIT_DB_NAME="${AUDIT_DB_NAME:-audit_system}"
LOG_DIR="${LOG_DIR:-/var/log/database_audit}"
ALERT_EMAIL="${ALERT_EMAIL:-admin@company.com}"
RETENTION_DAYS="${RETENTION_DAYS:-90}"
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >&2
}
# 创建审计日志目录
create_log_directory() {
mkdir -p "$LOG_DIR"
chmod 750 "$LOG_DIR"
# 创建日志轮转配置
cat > /etc/logrotate.d/database_audit << EOF
$LOG_DIR/*.log {
daily
rotate 30
compress
delaycompress
missingok
notifempty
create 640 mysql mysql
postrotate
/usr/bin/systemctl reload rsyslog > /dev/null 2>&1 || true
endrotate
}
EOF
log "审计日志目录创建完成: $LOG_DIR"
}
# 创建审计数据库表结构
create_audit_tables() {
log "创建审计数据库表结构"
mysql -h"$AUDIT_DB_HOST" -P"$AUDIT_DB_PORT" -u"$AUDIT_DB_USER" -p"$AUDIT_DB_PASSWORD" "$AUDIT_DB_NAME" << 'EOF'
-- 创建连接审计表
CREATE TABLE IF NOT EXISTS connection_audit (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
server_host VARCHAR(255),
server_port INT,
username VARCHAR(100),
client_host VARCHAR(255),
client_port INT,
connection_type ENUM('CONNECT', 'DISCONNECT', 'FAILED_CONNECT'),
connection_id BIGINT,
database_name VARCHAR(100),
ssl_used BOOLEAN,
error_code INT,
error_message TEXT,
INDEX idx_timestamp (timestamp),
INDEX idx_username (username),
INDEX idx_client_host (client_host),
INDEX idx_connection_type (connection_type)
);
-- 创建查询审计表
CREATE TABLE IF NOT EXISTS query_audit (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
connection_id BIGINT,
username VARCHAR(100),
client_host VARCHAR(255),
database_name VARCHAR(100),
command_type VARCHAR(50),
sql_text LONGTEXT,
affected_rows BIGINT,
execution_time_ms BIGINT,
error_code INT,
error_message TEXT,
query_hash VARCHAR(64),
INDEX idx_timestamp (timestamp),
INDEX idx_username (username),
INDEX idx_command_type (command_type),
INDEX idx_query_hash (query_hash),
INDEX idx_execution_time (execution_time_ms)
);
-- 创建权限变更审计表
CREATE TABLE IF NOT EXISTS privilege_audit (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
admin_username VARCHAR(100),
admin_host VARCHAR(255),
target_username VARCHAR(100),
operation_type ENUM('GRANT', 'REVOKE', 'CREATE_USER', 'DROP_USER', 'ALTER_USER'),
privilege_type VARCHAR(100),
object_type VARCHAR(50),
object_name VARCHAR(255),
with_grant_option BOOLEAN,
sql_text TEXT,
INDEX idx_timestamp (timestamp),
INDEX idx_admin_username (admin_username),
INDEX idx_target_username (target_username),
INDEX idx_operation_type (operation_type)
);
-- 创建数据变更审计表
CREATE TABLE IF NOT EXISTS data_change_audit (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
username VARCHAR(100),
client_host VARCHAR(255),
database_name VARCHAR(100),
table_name VARCHAR(100),
operation_type ENUM('INSERT', 'UPDATE', 'DELETE'),
primary_key_values JSON,
old_values JSON,
new_values JSON,
affected_columns JSON,
transaction_id BIGINT,
INDEX idx_timestamp (timestamp),
INDEX idx_username (username),
INDEX idx_table_name (table_name),
INDEX idx_operation_type (operation_type)
);
-- 创建安全事件审计表
CREATE TABLE IF NOT EXISTS security_audit (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
event_type VARCHAR(50),
severity ENUM('LOW', 'MEDIUM', 'HIGH', 'CRITICAL'),
username VARCHAR(100),
client_host VARCHAR(255),
database_name VARCHAR(100),
description TEXT,
additional_info JSON,
resolved BOOLEAN DEFAULT FALSE,
resolved_at TIMESTAMP NULL,
resolved_by VARCHAR(100),
INDEX idx_timestamp (timestamp),
INDEX idx_event_type (event_type),
INDEX idx_severity (severity),
INDEX idx_resolved (resolved)
);
-- 创建审计配置表
CREATE TABLE IF NOT EXISTS audit_config (
id INT PRIMARY KEY AUTO_INCREMENT,
config_key VARCHAR(100) UNIQUE,
config_value TEXT,
description TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
updated_by VARCHAR(100)
);
-- 插入默认配置
INSERT IGNORE INTO audit_config (config_key, config_value, description, updated_by) VALUES
('audit_enabled', 'true', '是否启用审计功能', 'system'),
('log_connections', 'true', '是否记录连接事件', 'system'),
('log_queries', 'true', '是否记录查询语句', 'system'),
('log_privilege_changes', 'true', '是否记录权限变更', 'system'),
('log_data_changes', 'false', '是否记录数据变更(性能影响较大)', 'system'),
('slow_query_threshold_ms', '1000', '慢查询阈值(毫秒)', 'system'),
('retention_days', '90', '审计日志保留天数', 'system'),
('alert_failed_login_threshold', '5', '登录失败告警阈值', 'system'),
('alert_privilege_escalation', 'true', '是否告警权限提升', 'system');
EOF
log "审计数据库表结构创建完成"
}
# 配置MySQL审计插件
configure_mysql_audit() {
log "配置MySQL审计插件"
# 检查是否已安装audit插件
local plugin_status=$(mysql -h"$AUDIT_DB_HOST" -P"$AUDIT_DB_PORT" -u"$AUDIT_DB_USER" -p"$AUDIT_DB_PASSWORD" -e "SHOW PLUGINS LIKE 'audit_log';" --skip-column-names | wc -l)
if [ "$plugin_status" -eq 0 ]; then
log "安装MySQL审计插件"
mysql -h"$AUDIT_DB_HOST" -P"$AUDIT_DB_PORT" -u"$AUDIT_DB_USER" -p"$AUDIT_DB_PASSWORD" -e "INSTALL PLUGIN audit_log SONAME 'audit_log.so';"
fi
# 配置审计参数
cat >> /etc/mysql/mysql.conf.d/audit.cnf << EOF
[mysqld]
# 审计日志配置
audit_log_policy=ALL
audit_log_format=JSON
audit_log_file=$LOG_DIR/mysql_audit.log
audit_log_rotate_on_size=100M
audit_log_rotations=10
# 连接审计
audit_log_connection_policy=ALL
audit_log_statement_policy=ALL
# 排除系统用户
audit_log_exclude_accounts='mysql.session@localhost,mysql.sys@localhost'
# 包含数据库
audit_log_include_databases=''
# 性能优化
audit_log_buffer_size=1M
audit_log_flush=ON
EOF
log "MySQL审计配置完成,需要重启MySQL服务"
}
# 配置PostgreSQL审计
configure_postgresql_audit() {
log "配置PostgreSQL审计"
# 安装pgaudit扩展
sudo -u postgres psql -c "CREATE EXTENSION IF NOT EXISTS pgaudit;"
# 配置postgresql.conf
cat >> /etc/postgresql/*/main/postgresql.conf << EOF
# pgAudit配置
shared_preload_libraries = 'pgaudit'
pgaudit.log = 'all'
pgaudit.log_catalog = on
pgaudit.log_client = on
pgaudit.log_level = log
pgaudit.log_parameter = on
pgaudit.log_relation = on
pgaudit.log_statement_once = off
# 日志配置
log_destination = 'csvlog'
logging_collector = on
log_directory = '$LOG_DIR'
log_filename = 'postgresql_audit_%Y%m%d_%H%M%S.log'
log_rotation_age = 1d
log_rotation_size = 100MB
log_min_duration_statement = 1000
log_connections = on
log_disconnections = on
log_line_prefix = '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h '
EOF
log "PostgreSQL审计配置完成,需要重启PostgreSQL服务"
}
# 创建审计日志解析脚本
create_log_parser() {
cat > "$LOG_DIR/parse_audit_logs.py" << 'EOF'
#!/usr/bin/env python3
import json
import re
import sys
import mysql.connector
from datetime import datetime
import argparse
class AuditLogParser:
def __init__(self, db_config):
self.db_config = db_config
self.conn = mysql.connector.connect(**db_config)
self.cursor = self.conn.cursor()
def parse_mysql_audit_log(self, log_file):
"""解析MySQL审计日志"""
with open(log_file, 'r') as f:
for line in f:
try:
log_entry = json.loads(line.strip())
self.process_mysql_log_entry(log_entry)
except json.JSONDecodeError:
continue
def process_mysql_log_entry(self, entry):
"""处理MySQL日志条目"""
timestamp = datetime.strptime(entry['timestamp'], '%Y-%m-%dT%H:%M:%S')
if entry['class'] == 'connection':
self.insert_connection_audit(
timestamp=timestamp,
server_host=entry.get('server_host', ''),
server_port=entry.get('server_port', 0),
username=entry.get('user', ''),
client_host=entry.get('host', ''),
connection_type=entry.get('event', ''),
connection_id=entry.get('connection_id', 0),
database_name=entry.get('db', ''),
error_code=entry.get('error_code', 0)
)
elif entry['class'] == 'general':
self.insert_query_audit(
timestamp=timestamp,
connection_id=entry.get('connection_id', 0),
username=entry.get('user', ''),
client_host=entry.get('host', ''),
database_name=entry.get('db', ''),
command_type=entry.get('command_class', ''),
sql_text=entry.get('sql_text', ''),
error_code=entry.get('error_code', 0)
)
def insert_connection_audit(self, **kwargs):
"""插入连接审计记录"""
sql = """
INSERT INTO connection_audit
(timestamp, server_host, server_port, username, client_host,
connection_type, connection_id, database_name, error_code)
VALUES (%(timestamp)s, %(server_host)s, %(server_port)s, %(username)s,
%(client_host)s, %(connection_type)s, %(connection_id)s,
%(database_name)s, %(error_code)s)
"""
self.cursor.execute(sql, kwargs)
self.conn.commit()
def insert_query_audit(self, **kwargs):
"""插入查询审计记录"""
sql = """
INSERT INTO query_audit
(timestamp, connection_id, username, client_host, database_name,
command_type, sql_text, error_code)
VALUES (%(timestamp)s, %(connection_id)s, %(username)s, %(client_host)s,
%(database_name)s, %(command_type)s, %(sql_text)s, %(error_code)s)
"""
self.cursor.execute(sql, kwargs)
self.conn.commit()
def main():
parser = argparse.ArgumentParser(description='解析数据库审计日志')
parser.add_argument('--log-file', required=True, help='审计日志文件路径')
parser.add_argument('--db-host', default='localhost', help='审计数据库主机')
parser.add_argument('--db-port', type=int, default=3306, help='审计数据库端口')
parser.add_argument('--db-user', required=True, help='审计数据库用户')
parser.add_argument('--db-password', required=True, help='审计数据库密码')
parser.add_argument('--db-name', default='audit_system', help='审计数据库名称')
args = parser.parse_args()
db_config = {
'host': args.db_host,
'port': args.db_port,
'user': args.db_user,
'password': args.db_password,
'database': args.db_name
}
parser = AuditLogParser(db_config)
parser.parse_mysql_audit_log(args.log_file)
print(f"审计日志解析完成: {args.log_file}")
if __name__ == "__main__":
main()
EOF
chmod +x "$LOG_DIR/parse_audit_logs.py"
log "审计日志解析脚本创建完成"
}
# 创建安全监控脚本
create_security_monitor() {
cat > "$LOG_DIR/security_monitor.py" << 'EOF'
#!/usr/bin/env python3
import mysql.connector
import smtplib
import json
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import logging
class SecurityMonitor:
def __init__(self, config_file):
with open(config_file, 'r') as f:
self.config = json.load(f)
self.db_conn = mysql.connector.connect(**self.config['database'])
self.cursor = self.db_conn.cursor(dictionary=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/security_monitor.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def check_failed_logins(self):
"""检查登录失败次数"""
threshold = int(self.get_config_value('alert_failed_login_threshold', 5))
# 查询最近1小时内的登录失败
self.cursor.execute("""
SELECT username, client_host, COUNT(*) as failed_count
FROM connection_audit
WHERE connection_type = 'FAILED_CONNECT'
AND timestamp > DATE_SUB(NOW(), INTERVAL 1 HOUR)
GROUP BY username, client_host
HAVING failed_count >= %s
""", (threshold,))
failed_logins = self.cursor.fetchall()
for login in failed_logins:
self.create_security_event(
event_type='FAILED_LOGIN_THRESHOLD',
severity='HIGH',
username=login['username'],
client_host=login['client_host'],
description=f"用户 {login['username']} 从 {login['client_host']} 在1小时内登录失败 {login['failed_count']} 次"
)
def check_privilege_escalation(self):
"""检查权限提升"""
if self.get_config_value('alert_privilege_escalation', 'true') != 'true':
return
# 查询最近24小时内的权限变更
self.cursor.execute("""
SELECT *
FROM privilege_audit
WHERE operation_type IN ('GRANT', 'CREATE_USER')
AND privilege_type IN ('ALL PRIVILEGES', 'SUPER', 'GRANT OPTION')
AND timestamp > DATE_SUB(NOW(), INTERVAL 24 HOUR)
""")
privilege_changes = self.cursor.fetchall()
for change in privilege_changes:
self.create_security_event(
event_type='PRIVILEGE_ESCALATION',
severity='CRITICAL',
username=change['target_username'],
client_host=change['admin_host'],
description=f"管理员 {change['admin_username']} 为用户 {change['target_username']} 授予了高级权限: {change['privilege_type']}"
)
def check_suspicious_queries(self):
"""检查可疑查询"""
suspicious_patterns = [
r'UNION.*SELECT.*FROM.*information_schema',
r'SELECT.*FROM.*mysql\.user',
r'DROP\s+DATABASE',
r'TRUNCATE\s+TABLE',
r'DELETE\s+FROM.*WHERE.*1\s*=\s*1',
r'UPDATE.*SET.*WHERE.*1\s*=\s*1'
]
for pattern in suspicious_patterns:
self.cursor.execute("""
SELECT *
FROM query_audit
WHERE sql_text REGEXP %s
AND timestamp > DATE_SUB(NOW(), INTERVAL 1 HOUR)
""", (pattern,))
suspicious_queries = self.cursor.fetchall()
for query in suspicious_queries:
self.create_security_event(
event_type='SUSPICIOUS_QUERY',
severity='HIGH',
username=query['username'],
client_host=query['client_host'],
description=f"检测到可疑查询: {query['sql_text'][:200]}..."
)
def check_unusual_access_patterns(self):
"""检查异常访问模式"""
# 检查非工作时间访问
self.cursor.execute("""
SELECT username, client_host, COUNT(*) as access_count
FROM connection_audit
WHERE connection_type = 'CONNECT'
AND (HOUR(timestamp) < 8 OR HOUR(timestamp) > 18)
AND WEEKDAY(timestamp) < 5
AND timestamp > DATE_SUB(NOW(), INTERVAL 24 HOUR)
GROUP BY username, client_host
HAVING access_count > 10
""")
unusual_access = self.cursor.fetchall()
for access in unusual_access:
self.create_security_event(
event_type='UNUSUAL_ACCESS_TIME',
severity='MEDIUM',
username=access['username'],
client_host=access['client_host'],
description=f"用户 {access['username']} 在非工作时间从 {access['client_host']} 访问数据库 {access['access_count']} 次"
)
# 检查异常IP访问
self.cursor.execute("""
SELECT username, client_host, COUNT(*) as new_ip_count
FROM connection_audit ca1
WHERE connection_type = 'CONNECT'
AND timestamp > DATE_SUB(NOW(), INTERVAL 24 HOUR)
AND NOT EXISTS (
SELECT 1 FROM connection_audit ca2
WHERE ca2.username = ca1.username
AND ca2.client_host = ca1.client_host
AND ca2.timestamp < DATE_SUB(NOW(), INTERVAL 7 DAY)
)
GROUP BY username, client_host
""")
new_ip_access = self.cursor.fetchall()
for access in new_ip_access:
self.create_security_event(
event_type='NEW_IP_ACCESS',
severity='MEDIUM',
username=access['username'],
client_host=access['client_host'],
description=f"用户 {access['username']} 从新IP地址 {access['client_host']} 访问数据库"
)
def create_security_event(self, event_type, severity, username, client_host, description):
"""创建安全事件"""
self.cursor.execute("""
INSERT INTO security_audit
(event_type, severity, username, client_host, description)
VALUES (%s, %s, %s, %s, %s)
""", (event_type, severity, username, client_host, description))
self.db_conn.commit()
self.logger.warning(f"安全事件: {event_type} - {description}")
# 发送告警邮件
if severity in ['HIGH', 'CRITICAL']:
self.send_alert_email(event_type, severity, description)
def send_alert_email(self, event_type, severity, description):
"""发送告警邮件"""
try:
smtp_config = self.config['smtp']
msg = MIMEMultipart()
msg['From'] = smtp_config['from_email']
msg['To'] = smtp_config['to_email']
msg['Subject'] = f"数据库安全告警 - {severity} - {event_type}"
body = f"""
数据库安全告警
事件类型: {event_type}
严重级别: {severity}
时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
描述: {description}
请及时处理此安全事件。
"""
msg.attach(MIMEText(body, 'plain', 'utf-8'))
server = smtplib.SMTP(smtp_config['server'], smtp_config['port'])
if smtp_config.get('use_tls', False):
server.starttls()
if smtp_config.get('username'):
server.login(smtp_config['username'], smtp_config['password'])
server.send_message(msg)
server.quit()
self.logger.info(f"告警邮件发送成功: {event_type}")
except Exception as e:
self.logger.error(f"发送告警邮件失败: {e}")
def get_config_value(self, key, default_value):
"""获取配置值"""
self.cursor.execute(
"SELECT config_value FROM audit_config WHERE config_key = %s",
(key,)
)
result = self.cursor.fetchone()
return result['config_value'] if result else default_value
def run_security_checks(self):
"""运行所有安全检查"""
self.logger.info("开始安全检查")
try:
self.check_failed_logins()
self.check_privilege_escalation()
self.check_suspicious_queries()
self.check_unusual_access_patterns()
self.logger.info("安全检查完成")
except Exception as e:
self.logger.error(f"安全检查过程中发生错误: {e}")
def main():
config = {
"database": {
"host": "localhost",
"port": 3306,
"user": "audit_user",
"password": "audit_password",
"database": "audit_system"
},
"smtp": {
"server": "smtp.company.com",
"port": 587,
"use_tls": True,
"username": "alert@company.com",
"password": "smtp_password",
"from_email": "alert@company.com",
"to_email": "admin@company.com"
}
}
with open('/tmp/security_monitor_config.json', 'w') as f:
json.dump(config, f, indent=2)
monitor = SecurityMonitor('/tmp/security_monitor_config.json')
monitor.run_security_checks()
if __name__ == "__main__":
main()
EOF
chmod +x "$LOG_DIR/security_monitor.py"
log "安全监控脚本创建完成"
}
# 创建审计报告生成脚本
create_audit_report() {
cat > "$LOG_DIR/generate_audit_report.sh" << 'EOF'
#!/bin/bash
REPORT_DATE="${1:-$(date -d 'yesterday' '+%Y-%m-%d')}"
REPORT_DIR="/var/reports/database_audit"
REPORT_FILE="$REPORT_DIR/audit_report_$REPORT_DATE.html"
mkdir -p "$REPORT_DIR"
# 生成HTML报告
cat > "$REPORT_FILE" << HTML_START
<!DOCTYPE html>
<html>
<head>
<title>数据库审计报告 - $REPORT_DATE</title>
<meta charset="UTF-8">
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.header { background-color: #f0f0f0; padding: 20px; border-radius: 5px; }
.section { margin: 20px 0; }
.metric { display: inline-block; margin: 10px; padding: 15px; background-color: #e8f4f8; border-radius: 5px; }
table { border-collapse: collapse; width: 100%; margin: 10px 0; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
.alert { color: red; font-weight: bold; }
.warning { color: orange; font-weight: bold; }
.info { color: blue; }
</style>
</head>
<body>
<div class="header">
<h1>数据库审计报告</h1>
<p>报告日期: $REPORT_DATE</p>
<p>生成时间: $(date)</p>
</div>
HTML_START
# 添加统计信息
mysql -h"$AUDIT_DB_HOST" -P"$AUDIT_DB_PORT" -u"$AUDIT_DB_USER" -p"$AUDIT_DB_PASSWORD" "$AUDIT_DB_NAME" -H << 'SQL_STATS' >> "$REPORT_FILE"
<div class="section">
<h2>统计概览</h2>
<div class="metric">
<h3>连接统计</h3>
<p>总连接数:
SELECT COUNT(*) as total_connections FROM connection_audit WHERE DATE(timestamp) = CURDATE() - INTERVAL 1 DAY;
</p>
<p>失败连接数:
SELECT COUNT(*) as failed_connections FROM connection_audit WHERE DATE(timestamp) = CURDATE() - INTERVAL 1 DAY AND connection_type = 'FAILED_CONNECT';
</p>
</div>
<div class="metric">
<h3>查询统计</h3>
<p>总查询数:
SELECT COUNT(*) as total_queries FROM query_audit WHERE DATE(timestamp) = CURDATE() - INTERVAL 1 DAY;
</p>
<p>慢查询数:
SELECT COUNT(*) as slow_queries FROM query_audit WHERE DATE(timestamp) = CURDATE() - INTERVAL 1 DAY AND execution_time_ms > 1000;
</p>
</div>
<div class="metric">
<h3>安全事件</h3>
<p>安全事件总数:
SELECT COUNT(*) as security_events FROM security_audit WHERE DATE(timestamp) = CURDATE() - INTERVAL 1 DAY;
</p>
<p>高危事件数:
SELECT COUNT(*) as critical_events FROM security_audit WHERE DATE(timestamp) = CURDATE() - INTERVAL 1 DAY AND severity IN ('HIGH', 'CRITICAL');
</p>
</div>
</div>
SQL_STATS
# 添加详细报告
cat >> "$REPORT_FILE" << 'HTML_END'
<div class="section">
<h2>详细分析</h2>
<h3>Top 10 活跃用户</h3>
<table>
<tr><th>用户名</th><th>连接次数</th><th>查询次数</th></tr>
HTML_END
mysql -h"$AUDIT_DB_HOST" -P"$AUDIT_DB_PORT" -u"$AUDIT_DB_USER" -p"$AUDIT_DB_PASSWORD" "$AUDIT_DB_NAME" -H << 'SQL_USERS' >> "$REPORT_FILE"
SELECT
ca.username,
COUNT(DISTINCT ca.id) as connection_count,
COALESCE(qa.query_count, 0) as query_count
FROM connection_audit ca
LEFT JOIN (
SELECT username, COUNT(*) as query_count
FROM query_audit
WHERE DATE(timestamp) = CURDATE() - INTERVAL 1 DAY
GROUP BY username
) qa ON ca.username = qa.username
WHERE DATE(ca.timestamp) = CURDATE() - INTERVAL 1 DAY
GROUP BY ca.username
ORDER BY connection_count DESC
LIMIT 10;
SQL_USERS
cat >> "$REPORT_FILE" << 'HTML_CLOSE'
</table>
</div>
</body>
</html>
HTML_CLOSE
echo "审计报告生成完成: $REPORT_FILE"
EOF
chmod +x "$LOG_DIR/generate_audit_report.sh"
log "审计报告生成脚本创建完成"
}
# 清理过期审计日志
cleanup_old_logs() {
log "清理过期审计日志"
# 清理文件系统日志
find "$LOG_DIR" -name "*.log" -type f -mtime +$RETENTION_DAYS -delete
# 清理数据库审计记录
mysql -h"$AUDIT_DB_HOST" -P"$AUDIT_DB_PORT" -u"$AUDIT_DB_USER" -p"$AUDIT_DB_PASSWORD" "$AUDIT_DB_NAME" << EOF
DELETE FROM connection_audit WHERE timestamp < DATE_SUB(NOW(), INTERVAL $RETENTION_DAYS DAY);
DELETE FROM query_audit WHERE timestamp < DATE_SUB(NOW(), INTERVAL $RETENTION_DAYS DAY);
DELETE FROM privilege_audit WHERE timestamp < DATE_SUB(NOW(), INTERVAL $RETENTION_DAYS DAY);
DELETE FROM data_change_audit WHERE timestamp < DATE_SUB(NOW(), INTERVAL $RETENTION_DAYS DAY);
DELETE FROM security_audit WHERE timestamp < DATE_SUB(NOW(), INTERVAL $RETENTION_DAYS DAY) AND resolved = TRUE;
EOF
log "过期审计日志清理完成"
}
# 创建定时任务
setup_cron_jobs() {
log "设置定时任务"
# 创建cron配置
cat > /tmp/database_audit_cron << EOF
# 每5分钟运行安全监控
*/5 * * * * $LOG_DIR/security_monitor.py
# 每小时解析审计日志
0 * * * * $LOG_DIR/parse_audit_logs.py --log-file $LOG_DIR/mysql_audit.log --db-user $AUDIT_DB_USER --db-password $AUDIT_DB_PASSWORD
# 每天生成审计报告
0 6 * * * $LOG_DIR/generate_audit_report.sh
# 每周清理过期日志
0 2 * * 0 $0 cleanup_old_logs
EOF
crontab /tmp/database_audit_cron
rm /tmp/database_audit_cron
log "定时任务设置完成"
}
# 主函数
main() {
case "${1:-install}" in
"install")
log "开始安装数据库审计系统"
create_log_directory
create_audit_tables
configure_mysql_audit
create_log_parser
create_security_monitor
create_audit_report
setup_cron_jobs
log "数据库审计系统安装完成"
;;
"cleanup")
cleanup_old_logs
;;
"report")
"$LOG_DIR/generate_audit_report.sh" "${2:-}"
;;
"monitor")
"$LOG_DIR/security_monitor.py"
;;
*)
echo "用法: $0 {install|cleanup|report|monitor}"
echo " install - 安装审计系统"
echo " cleanup - 清理过期日志"
echo " report - 生成审计报告"
echo " monitor - 运行安全监控"
exit 1
;;
esac
}
main "$@"
漏洞防护与安全加固
1. SQL注入防护
#!/usr/bin/env python3
# scripts/sql_injection_protection.py
import re
import logging
from typing import List, Dict, Any, Tuple
import mysql.connector
import psycopg2
from sqlparse import parse, tokens
import hashlib
class SQLInjectionProtector:
def __init__(self):
self.logger = self._setup_logging()
# SQL注入检测规则
self.injection_patterns = [
# 联合查询注入
r'(?i)\bunion\s+select\b',
r'(?i)\bunion\s+all\s+select\b',
# 布尔盲注
r'(?i)\b(and|or)\s+\d+\s*=\s*\d+',
r'(?i)\b(and|or)\s+[\'"]?\w+[\'"]?\s*=\s*[\'"]?\w+[\'"]?',
# 时间盲注
r'(?i)\bsleep\s*\(',
r'(?i)\bwaitfor\s+delay\b',
r'(?i)\bbenchmark\s*\(',
# 错误注入
r'(?i)\bextractvalue\s*\(',
r'(?i)\bupdatexml\s*\(',
# 堆叠查询
r';\s*drop\s+',
r';\s*delete\s+',
r';\s*insert\s+',
r';\s*update\s+',
# 注释绕过
r'/\*.*?\*/',
r'--\s+',
r'#.*$',
# 函数注入
r'(?i)\bload_file\s*\(',
r'(?i)\binto\s+outfile\b',
r'(?i)\binto\s+dumpfile\b',
# 信息泄露
r'(?i)\binformation_schema\b',
r'(?i)\bmysql\.user\b',
r'(?i)\bpg_user\b',
# 特殊字符
r'[\'";].*[\'";]',
r'\bchar\s*\(\s*\d+\s*\)',
r'\bhex\s*\(',
r'\bascii\s*\(',
]
# 编译正则表达式
self.compiled_patterns = [re.compile(pattern) for pattern in self.injection_patterns]
# 白名单关键字
self.whitelist_keywords = {
'SELECT', 'FROM', 'WHERE', 'ORDER', 'BY', 'GROUP', 'HAVING',
'INSERT', 'INTO', 'VALUES', 'UPDATE', 'SET', 'DELETE',
'JOIN', 'LEFT', 'RIGHT', 'INNER', 'OUTER', 'ON',
'AND', 'OR', 'NOT', 'IN', 'EXISTS', 'BETWEEN',
'LIKE', 'IS', 'NULL', 'ASC', 'DESC', 'LIMIT', 'OFFSET'
}
def _setup_logging(self) -> logging.Logger:
"""设置日志记录"""
logger = logging.getLogger('SQLInjectionProtector')
logger.setLevel(logging.INFO)
handler = logging.FileHandler('/var/log/sql_injection_protection.log')
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def detect_sql_injection(self, sql_query: str, parameters: List[Any] = None) -> Tuple[bool, List[str]]:
"""检测SQL注入攻击"""
threats = []
# 基本模式匹配检测
for pattern in self.compiled_patterns:
if pattern.search(sql_query):
threats.append(f"检测到可疑模式: {pattern.pattern}")
# 语法分析检测
syntax_threats = self._analyze_sql_syntax(sql_query)
threats.extend(syntax_threats)
# 参数化查询检测
if parameters:
param_threats = self._check_parameters(parameters)
threats.extend(param_threats)
# 长度检测
if len(sql_query) > 10000:
threats.append("SQL查询长度异常")
# 关键字频率检测
keyword_threats = self._check_keyword_frequency(sql_query)
threats.extend(keyword_threats)
is_malicious = len(threats) > 0
if is_malicious:
self.logger.warning(f"检测到SQL注入攻击: {sql_query[:200]}...")
for threat in threats:
self.logger.warning(f"威胁详情: {threat}")
return is_malicious, threats
def _analyze_sql_syntax(self, sql_query: str) -> List[str]:
"""分析SQL语法结构"""
threats = []
try:
parsed = parse(sql_query)
for statement in parsed:
# 检查是否包含多个语句(堆叠查询)
if len([token for token in statement.flatten() if token.ttype is tokens.Keyword and token.value.upper() in ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP']]) > 1:
threats.append("检测到堆叠查询")
# 检查UNION查询
union_count = len([token for token in statement.flatten() if token.ttype is tokens.Keyword and token.value.upper() == 'UNION'])
if union_count > 0:
threats.append(f"检测到UNION查询 ({union_count}个)")
# 检查子查询嵌套深度
subquery_depth = self._count_subquery_depth(statement)
if subquery_depth > 3:
threats.append(f"子查询嵌套过深 (深度: {subquery_depth})")
except Exception as e:
threats.append(f"SQL语法解析异常: {str(e)}")
return threats
def _count_subquery_depth(self, statement, depth=0) -> int:
"""计算子查询嵌套深度"""
max_depth = depth
for token in statement.tokens:
if hasattr(token, 'tokens'):
if any(t.ttype is tokens.Keyword and t.value.upper() == 'SELECT' for t in token.flatten()):
max_depth = max(max_depth, self._count_subquery_depth(token, depth + 1))
return max_depth
def _check_parameters(self, parameters: List[Any]) -> List[str]:
"""检查参数是否包含恶意内容"""
threats = []
for i, param in enumerate(parameters):
if isinstance(param, str):
# 检查参数中的SQL关键字
param_upper = param.upper()
sql_keywords = ['SELECT', 'UNION', 'DROP', 'DELETE', 'INSERT', 'UPDATE']
for keyword in sql_keywords:
if keyword in param_upper:
threats.append(f"参数 {i} 包含SQL关键字: {keyword}")
# 检查特殊字符
if re.search(r'[\'";]', param):
threats.append(f"参数 {i} 包含特殊字符")
# 检查编码绕过
if re.search(r'%[0-9a-fA-F]{2}', param):
threats.append(f"参数 {i} 包含URL编码")
return threats
def _check_keyword_frequency(self, sql_query: str) -> List[str]:
"""检查关键字频率异常"""
threats = []
# 统计关键字出现次数
keyword_counts = {}
words = re.findall(r'\b\w+\b', sql_query.upper())
for word in words:
if word in self.whitelist_keywords:
keyword_counts[word] = keyword_counts.get(word, 0) + 1
# 检查异常频率
for keyword, count in keyword_counts.items():
if keyword == 'UNION' and count > 1:
threats.append(f"UNION关键字出现次数异常: {count}")
elif keyword in ['SELECT', 'INSERT', 'UPDATE', 'DELETE'] and count > 2:
threats.append(f"{keyword}关键字出现次数异常: {count}")
return threats
def sanitize_input(self, user_input: str) -> str:
"""清理用户输入"""
if not isinstance(user_input, str):
return str(user_input)
# 移除危险字符
sanitized = re.sub(r'[\'";\\]', '', user_input)
# 移除SQL注释
sanitized = re.sub(r'/\*.*?\*/', '', sanitized, flags=re.DOTALL)
sanitized = re.sub(r'--.*$', '', sanitized, flags=re.MULTILINE)
sanitized = re.sub(r'#.*$', '', sanitized, flags=re.MULTILINE)
# 限制长度
if len(sanitized) > 1000:
sanitized = sanitized[:1000]
return sanitized.strip()
def create_safe_query(self, base_query: str, parameters: Dict[str, Any]) -> Tuple[str, List[Any]]:
"""创建安全的参数化查询"""
# 验证基础查询
is_malicious, threats = self.detect_sql_injection(base_query)
if is_malicious:
raise ValueError(f"基础查询包含恶意内容: {threats}")
# 清理参数
safe_params = []
for key, value in parameters.items():
if isinstance(value, str):
safe_value = self.sanitize_input(value)
safe_params.append(safe_value)
else:
safe_params.append(value)
return base_query, safe_params
def validate_table_name(self, table_name: str, allowed_tables: List[str]) -> bool:
"""验证表名是否在允许列表中"""
# 清理表名
clean_table_name = re.sub(r'[^a-zA-Z0-9_]', '', table_name)
# 检查是否在白名单中
return clean_table_name in allowed_tables
def validate_column_name(self, column_name: str, allowed_columns: List[str]) -> bool:
"""验证列名是否在允许列表中"""
# 清理列名
clean_column_name = re.sub(r'[^a-zA-Z0-9_]', '', column_name)
# 检查是否在白名单中
return clean_column_name in allowed_columns
def create_prepared_statement(self, connection, query: str, parameters: List[Any]):
"""创建预处理语句"""
try:
if isinstance(connection, mysql.connector.MySQLConnection):
cursor = connection.cursor(prepared=True)
cursor.execute(query, parameters)
return cursor
elif hasattr(connection, 'cursor'): # PostgreSQL
cursor = connection.cursor()
cursor.execute(query, parameters)
return cursor
else:
raise ValueError("不支持的数据库连接类型")
except Exception as e:
self.logger.error(f"创建预处理语句失败: {e}")
raise
def log_security_event(self, event_type: str, sql_query: str,
client_ip: str, user_id: str, threats: List[str]):
"""记录安全事件"""
event_data = {
'timestamp': datetime.now().isoformat(),
'event_type': event_type,
'sql_query': sql_query[:500], # 限制长度
'client_ip': client_ip,
'user_id': user_id,
'threats': threats,
'query_hash': hashlib.md5(sql_query.encode()).hexdigest()
}
self.logger.warning(f"安全事件: {event_data}")
# 这里可以集成到SIEM系统或发送告警
def main():
# 示例用法
protector = SQLInjectionProtector()
# 测试恶意查询
malicious_queries = [
"SELECT * FROM users WHERE id = 1 UNION SELECT username, password FROM admin",
"SELECT * FROM products WHERE name = 'test' OR 1=1--",
"SELECT * FROM users WHERE id = 1; DROP TABLE users;",
"SELECT * FROM users WHERE id = 1 AND SLEEP(5)",
"SELECT * FROM users WHERE id = extractvalue(1, concat(0x7e, (SELECT password FROM admin LIMIT 1), 0x7e))"
]
for query in malicious_queries:
is_malicious, threats = protector.detect_sql_injection(query)
print(f"查询: {query[:50]}...")
print(f"恶意: {is_malicious}")
print(f"威胁: {threats}")
print("-" * 50)
# 测试安全查询
safe_query = "SELECT * FROM users WHERE id = %s AND status = %s"
safe_params = [1, 'active']
is_malicious, threats = protector.detect_sql_injection(safe_query, safe_params)
print(f"安全查询恶意检测: {is_malicious}")
# 创建安全查询
try:
clean_query, clean_params = protector.create_safe_query(
"SELECT * FROM users WHERE name = %s",
{"name": "John'; DROP TABLE users; --"}
)
print(f"清理后的查询: {clean_query}")
print(f"清理后的参数: {clean_params}")
except ValueError as e:
print(f"查询被拒绝: {e}")
if __name__ == "__main__":
main()
总结
数据库安全是一个多层次、全方位的防护体系,需要从以下几个方面进行综合考虑:
核心安全要素
-
身份认证与访问控制
- 强密码策略和多因素认证
- 基于角色的权限管理(RBAC)
- 最小权限原则
- 定期权限审查
-
数据保护
- 敏感数据加密存储
- 传输过程加密
- 数据脱敏和匿名化
- 密钥管理和轮换
-
审计监控
- 全面的审计日志记录
- 实时安全监控
- 异常行为检测
- 安全事件响应
-
漏洞防护
- SQL注入防护
- 数据库软件及时更新
- 安全配置加固
- 网络隔离和防火墙
最佳实践建议
- 制定安全策略:建立完整的数据库安全管理制度
- 定期安全评估:进行漏洞扫描和渗透测试
- 员工安全培训:提高安全意识和技能水平
- 应急响应计划:制定安全事件处理流程
- 合规性管理:满足相关法规和标准要求
通过实施这些安全措施,可以有效保护数据库免受各种安全威胁,确保数据的机密性、完整性和可用性。