|
"""
middleware_sensitive_filter.py
消息敏感词过滤中间件
功能:
1. 检测消息中的敏感信息(手机号、身份证、银行卡、API Key)
2. 自动脱敏替换(如 138****1234)
3. 记录脱敏日志供审计
4. 支持自定义规则和豁免名单
"""
import re
import json
import time
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
# ============================================
# 1. 敏感信息检测规则定义
# ============================================
@dataclass
class DetectionRule:
"""检测规则定义"""
name: str
pattern: str
mask_fn: callable # 脱敏函数
severity: str = "medium" # low / medium / high
enabled: bool = True
# 预置检测规则库
BUILTIN_RULES: List[DetectionRule] = [
# 中国大陆手机号
DetectionRule(
name="phone_cn",
pattern=r'1[3-9]\d{9}',
mask_fn=lambda m: m.group()[:3] + "****" + m.group()[-4:],
severity="high"
),
# 身份证号码
DetectionRule(
name="id_card_cn",
pattern=r'\d{17}[\dXx]',
mask_fn=lambda m: m.group()[:4] + "**********" + m.group()[-4:],
severity="high"
),
# 银行卡号(16-19位)
DetectionRule(
name="bank_card",
pattern=r'\d{16,19}',
mask_fn=lambda m: m.group()[:4] + " **** **** " + m.group()[-4:],
severity="high"
),
# API Key(常见格式)
DetectionRule(
name="api_key",
pattern=r'(?:api[_-]?key|apikey|token|secret)[:=]\s*["\']?([\w-]{20,})["\']?',
mask_fn=lambda m: m.group(1)[:4] + "***..." + m.group(1)[-4:]
if len(m.group(1)) > 8 else "***",
severity="high"
),
# 邮箱地址
DetectionRule(
name="email",
pattern=r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
mask_fn=lambda m: m.group()[:2] + "****@" + m.group().split("@")[1],
severity="medium"
),
]
# ============================================
# 2. 自定义关键词匹配器
# ============================================
class KeywordMatcher:
"""基于关键词列表的匹配器
支持:
- 精确匹配
- 正则模式匹配
- 自定义脱敏字符
"""
def __init__(self, keywords: List[str], mask_char: str = "*"):
self.keywords = set(keywords)
self.mask_char = mask_char
def check(self, text: str) -> Dict[str, Any]:
"""检查文本是否包含关键词"""
found = []
clean_text = text
for kw in self.keywords:
if kw.lower() in clean_text.lower():
found.append(kw)
# 替换为脱敏字符
masked = self.mask_char * len(kw)
clean_text = clean_text.replace(kw, masked)
return {
"has_sensitive": len(found) > 0,
"matched_keywords": found,
"count": len(found),
"masked_text": clean_text
}
# ============================================
# 3. 核心中间件逻辑
# ============================================
class SensitiveFilterMiddleware:
"""敏感信息过滤中间件
工作流程:
1. 接收消息 → 正则匹配敏感模式
2. 关键词匹配
3. 脱敏替换
4. 记录审计日志
5. 返回脱敏后的消息
"""
def __init__(self, config: Optional[Dict] = None):
cfg = config or {}
# 加载规则
self.rules = BUILTIN_RULES.copy()
# 关键词过滤器
keywords = cfg.get("keywords", [])
mask_char = cfg.get("mask_char", "*")
self.keyword_matcher = KeywordMatcher(keywords, mask_char)
# 豁免名单(不对特定用户做过滤)
self.whitelist = set(cfg.get("whitelist", []))
# 审计日志
self.audit_log = []
# 统计
self.stats = {
"total_checked": 0,
"total_masked": 0,
"by_rule": {}
}
def check(self, text: str, user_id: Optional[str] = None) -> Dict[str, Any]:
"""
检查并脱敏一条消息
Args:
text: 待检查的文本
user_id: 消息发送者ID(用于豁免检查)
Returns:
包含脱敏文本和检测报告的字典
"""
# 豁免检查
if user_id and user_id in self.whitelist:
return {
"original": text,
"masked_text": text,
"masked": False,
"reason": "whitelist",
"details": []
}
clean_text = text
details = []
masked_count = 0
# 1. 正则规则检测
for rule in self.rules:
if not rule.enabled:
continue
matches = list(re.finditer(rule.pattern, clean_text, re.IGNORECASE))
if matches:
# 应用脱敏
for match in matches:
mask_result = rule.mask_fn(match)
clean_text = clean_text.replace(match.group(), mask_result)
details.append({
"rule": rule.name,
"severity": rule.severity,
"matches": len(matches),
"samples": [m.group()[:20] for m in matches[:3]]
})
masked_count += len(matches)
# 更新统计
self.stats["by_rule"][rule.name] = \
self.stats["by_rule"].get(rule.name, 0) + len(matches)
# 2. 关键词检测
kw_result = self.keyword_matcher.check(clean_text)
if kw_result["has_sensitive"]:
clean_text = kw_result["masked_text"]
details.append({
"rule": "keyword_match",
"severity": "low",
"matches": kw_result["count"],
"samples": kw_result["matched_keywords"][:3]
})
masked_count += kw_result["count"]
# 3. 更新统计
self.stats["total_checked"] += 1
if masked_count > 0:
self.stats["total_masked"] += 1
# 4. 审计日志(不记录原文内容,只记录元数据)
audit_entry = {
"timestamp": time.time(),
"user_id": user_id,
"text_length": len(text),
"masked": masked_count > 0,
"match_count": masked_count,
"rule_names": [d["rule"] for d in details]
}
self.audit_log.append(audit_entry)
return {
"original_length": len(text),
"masked_text": clean_text,
"masked": masked_count > 0,
"total_matched": masked_count,
"details": details,
"audit_id": len(self.audit_log)
}
# ============================================
# 4. 中间件钩子函数(OpenClaw 接口)
# ============================================
middleware_instance = None
def on_load(config: Dict = None):
"""中间件加载初始化"""
global middleware_instance
middleware_instance = SensitiveFilterMiddleware(config)
print(f"? 敏感信息过滤中间件已加载 "
f"(规则: {len(middleware_instance.rules)}, "
f"关键词: {len(middleware_instance.keyword_matcher.keywords)})")
return {"status": "loaded"}
def before_message(message: Dict) -> Dict:
"""消息发送前拦截"""
if not middleware_instance:
return message
text = message.get("content", "")
user_id = message.get("user_id")
result = middleware_instance.check(text, user_id)
# 替换为脱敏后的内容
message["content"] = result["masked_text"]
# 附加元数据(不影响消息内容,供下游使用)
message["_meta"] = message.get("_meta", {})
message["_meta"]["sensitive_filter"] = {
"masked": result["masked"],
"count": result["total_matched"]
}
return message
def on_unload():
"""卸载前清理"""
global middleware_instance
if middleware_instance:
print(f"???? 敏感信息过滤统计: "
f"检查 {middleware_instance.stats['total_checked']} 条, "
f"脱敏 {middleware_instance.stats['total_masked']} 条")
middleware_instance = None
print("???? 敏感信息过滤中间件已卸载")
return {"status": "unloaded"}
|