广告位联系
返回顶部
分享到

OpenClaw中间件请求拦截、转换与增强的完整指南

Ai 来源:互联网 作者:佚名 发布时间:2026-06-28 18:14:43 人浏览
摘要

中间件是 OpenClaw 处理链路中最灵活的一环。本文从中间件的设计哲学出发,系统讲解中间件的三种模式(前置、后置、环绕)、洋葱模型执行链、请求/响应变换机制,以及流式消息处理的特殊

中间件是 OpenClaw 处理链路中最灵活的一环。本文从中间件的设计哲学出发,系统讲解中间件的三种模式(前置、后置、环绕)、洋葱模型执行链、请求/响应变换机制,以及流式消息处理的特殊考量。通过两个完整实战案例——“消息敏感词过滤中间件"和"请求耗时统计仪表盘中间件”——你将掌握从零开发生产级中间件的全部技能。读完你会理解:为什么 AI Agent 框架需要中间件,以及如何用它优雅地解决横切关注点。

1. 引言:为什么 AI Agent 也需要中间件

1.1 场景还原

先说一个真实遇到过的问题。

你的 OpenClaw Agent 接入了企业微信和飞书两个渠道,一切都运行得很好。直到有一天,安全部门找上门来:“你们 Agent 回复的消息里,有没有可能泄露敏感信息?能不能在消息发出去之前做一次检查?”

你第一反应可能是——在每个处理逻辑里加判断。但是 Agent 的回复路径不止一条:直接回复、自动摘要、子代理消息、心跳输出……每条路径都要改一遍?

更麻烦的是,一周后运维团队又提了新需求:“能不能统计每个渠道的消息响应耗时?我们想做性能监控。” 两周后产品说:“飞书渠道的消息需要自动添加加粗格式,但微信渠道不需要。”

面对这些横切关注点(cross-cutting concerns),逐个修改业务逻辑的代价越来越高。这正是中间件的用武之地。

1.2 中间件的本质

1.2 中间件的本质

1.2 中间件的本质_图2

方案 扩展性 维护性 复用性
无中间件(修改核心) ? 每次加需求改核心 ? 核心代码膨胀 ? 无法复用
有中间件(拦截链) ? 添加新中间件即可 ? 核心保持干净 ? 中间件独立复用

一句话总结:中间件让你在不修改 Agent 核心逻辑的前提下,插入任何需要的前置和后置处理逻辑。

2. 中间件架构设计

2.1 三种中间件模式

OpenClaw 的中间件根据切入时机分为三种:

模式 切入时机 典型用途 能否修改数据
Before(前置) Agent 处理之前 鉴权、参数校验、输入清洗 ? 可修改请求
After(后置) Agent 处理之后 格式化、翻译、脱敏 ? 可修改响应
Around(环绕) 包裹整个处理过程 性能统计、异常捕获、缓存 ? 可控制全流程

2.1 三种中间件模式

2.2 洋葱模型执行链

多个中间件会形成一个洋葱模型的执行链——请求从外层穿过一层层中间件到达核心,响应再从核心穿过一层层中间件回到外层:

2.2 洋葱模型执行链

洋葱模型的执行顺序:

1

2

请求阶段:Mid 1 → Mid 2 → Agent Core

响应阶段:Agent Core → Mid 2 → Mid 1

这个设计保证了:最先拦截请求的中间件,最后看到响应。就像剥洋葱——先接触的外层,最后离开。

2.3 中间件配置体系

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

# openclaw.yaml

middleware:

  enabled: true

  # 全局中间件(对所有渠道生效)

  global:

    - name: "auth-validator"      # 身份验证

      enabled: true

      order: 10                    # 执行顺序(数字越小越靠外)

    - name: "rate-limiter"         # 速率限制

      enabled: true

      order: 20

    - name: "request-logger"       # 请求日志

      enabled: true

      order: 30

  # 按渠道配置中间件

  channels:

    feishu:

      - name: "feishu-formatter"   # 飞书格式转换

        enabled: true

        order: 50

      - name: "sensitive-filter"   # 敏感信息过滤

        enabled: true

        order: 60

        config:

          keywords: ["密码", "token", "密钥"]

          mask_char: "*"

    wecom:

      - name: "wecom-formatter"    # 企微格式转换

        enabled: true

        order: 50

    discord:

      - name: "discord-sanitizer"  # Discord Markdown清洗

        enabled: true

        order: 50

配置策略表:

配置项 说明 建议
global 对所有渠道生效 放基础中间件(鉴权、日志、限流)
channels.<name> 针对特定渠道 放渠道特有的中间件(格式化、清洗)
order 执行顺序 鉴权(order:10) → 限流(20) → 日志(30) → 格式化(50) → 过滤(60)
config 中间件特定配置 通过 config 段传递自定义参数

3. 中间件开发实战:消息敏感词过滤

3.1 需求分析

目标:开发一个敏感信息过滤中间件,在 Agent 回复消息发送前自动检测并脱敏。不阻止消息发送(非阻塞模式),但标记并记录敏感信息。

3.2 完整实现

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

"""

middleware_sensitive_filter.py

消息敏感词过滤中间件

功能:

1. 检测消息中的敏感信息(手机号、身份证、银行卡、API Key)

2. 自动脱敏替换(如 138****1234)

3. 记录脱敏日志供审计

4. 支持自定义规则和豁免名单

"""

import re

import json

import time

from typing import Dict, Any, Optional, List

from dataclasses import dataclass, field

# ============================================

# 1. 敏感信息检测规则定义

# ============================================

@dataclass

class DetectionRule:

    """检测规则定义"""

    name: str

    pattern: str

    mask_fn: callable  # 脱敏函数

    severity: str = "medium"  # low / medium / high

    enabled: bool = True

# 预置检测规则库

BUILTIN_RULES: List[DetectionRule] = [

    # 中国大陆手机号

    DetectionRule(

        name="phone_cn",

        pattern=r'1[3-9]\d{9}',

        mask_fn=lambda m: m.group()[:3] + "****" + m.group()[-4:],

        severity="high"

    ),

    # 身份证号码

    DetectionRule(

        name="id_card_cn",

        pattern=r'\d{17}[\dXx]',

        mask_fn=lambda m: m.group()[:4] + "**********" + m.group()[-4:],

        severity="high"

    ),

    # 银行卡号(16-19位)

    DetectionRule(

        name="bank_card",

        pattern=r'\d{16,19}',

        mask_fn=lambda m: m.group()[:4] + " **** **** " + m.group()[-4:],

        severity="high"

    ),

    # API Key(常见格式)

    DetectionRule(

        name="api_key",

        pattern=r'(?:api[_-]?key|apikey|token|secret)[:=]\s*["\']?([\w-]{20,})["\']?',

        mask_fn=lambda m: m.group(1)[:4] + "***..." + m.group(1)[-4:]

            if len(m.group(1)) > 8 else "***",

        severity="high"

    ),

    # 邮箱地址

    DetectionRule(

        name="email",

        pattern=r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',

        mask_fn=lambda m: m.group()[:2] + "****@" + m.group().split("@")[1],

        severity="medium"

    ),

]

# ============================================

# 2. 自定义关键词匹配器

# ============================================

class KeywordMatcher:

    """基于关键词列表的匹配器

    支持:

    - 精确匹配

    - 正则模式匹配 

    - 自定义脱敏字符

    """

    def __init__(self, keywords: List[str], mask_char: str = "*"):

        self.keywords = set(keywords)

        self.mask_char = mask_char

    def check(self, text: str) -> Dict[str, Any]:

        """检查文本是否包含关键词"""

        found = []

        clean_text = text

        for kw in self.keywords:

            if kw.lower() in clean_text.lower():

                found.append(kw)

                # 替换为脱敏字符

                masked = self.mask_char * len(kw)

                clean_text = clean_text.replace(kw, masked)

        return {

            "has_sensitive": len(found) > 0,

            "matched_keywords": found,

            "count": len(found),

            "masked_text": clean_text

        }

# ============================================

# 3. 核心中间件逻辑

# ============================================

class SensitiveFilterMiddleware:

    """敏感信息过滤中间件

    工作流程:

    1. 接收消息 → 正则匹配敏感模式

    2. 关键词匹配

    3. 脱敏替换

    4. 记录审计日志

    5. 返回脱敏后的消息

    """

    def __init__(self, config: Optional[Dict] = None):

        cfg = config or {}

        # 加载规则

        self.rules = BUILTIN_RULES.copy()

        # 关键词过滤器

        keywords = cfg.get("keywords", [])

        mask_char = cfg.get("mask_char", "*")

        self.keyword_matcher = KeywordMatcher(keywords, mask_char)

        # 豁免名单(不对特定用户做过滤)

        self.whitelist = set(cfg.get("whitelist", []))

        # 审计日志

        self.audit_log = []

        # 统计

        self.stats = {

            "total_checked": 0,

            "total_masked": 0,

            "by_rule": {}

        }

    def check(self, text: str, user_id: Optional[str] = None) -> Dict[str, Any]:

        """

        检查并脱敏一条消息

        Args:

            text: 待检查的文本

            user_id: 消息发送者ID(用于豁免检查)

        Returns:

            包含脱敏文本和检测报告的字典

        """

        # 豁免检查

        if user_id and user_id in self.whitelist:

            return {

                "original": text,

                "masked_text": text,

                "masked": False,

                "reason": "whitelist",

                "details": []

            }

        clean_text = text

        details = []

        masked_count = 0

        # 1. 正则规则检测

        for rule in self.rules:

            if not rule.enabled:

                continue

            matches = list(re.finditer(rule.pattern, clean_text, re.IGNORECASE))

            if matches:

                # 应用脱敏

                for match in matches:

                    mask_result = rule.mask_fn(match)

                    clean_text = clean_text.replace(match.group(), mask_result)

                details.append({

                    "rule": rule.name,

                    "severity": rule.severity,

                    "matches": len(matches),

                    "samples": [m.group()[:20] for m in matches[:3]]

                })

                masked_count += len(matches)

                # 更新统计

                self.stats["by_rule"][rule.name] = \

                    self.stats["by_rule"].get(rule.name, 0) + len(matches)

        # 2. 关键词检测

        kw_result = self.keyword_matcher.check(clean_text)

        if kw_result["has_sensitive"]:

            clean_text = kw_result["masked_text"]

            details.append({

                "rule": "keyword_match",

                "severity": "low",

                "matches": kw_result["count"],

                "samples": kw_result["matched_keywords"][:3]

            })

            masked_count += kw_result["count"]

        # 3. 更新统计

        self.stats["total_checked"] += 1

        if masked_count > 0:

            self.stats["total_masked"] += 1

        # 4. 审计日志(不记录原文内容,只记录元数据)

        audit_entry = {

            "timestamp": time.time(),

            "user_id": user_id,

            "text_length": len(text),

            "masked": masked_count > 0,

            "match_count": masked_count,

            "rule_names": [d["rule"] for d in details]

        }

        self.audit_log.append(audit_entry)

        return {

            "original_length": len(text),

            "masked_text": clean_text,

            "masked": masked_count > 0,

            "total_matched": masked_count,

            "details": details,

            "audit_id": len(self.audit_log)

        }

# ============================================

# 4. 中间件钩子函数(OpenClaw 接口)

# ============================================

middleware_instance = None

def on_load(config: Dict = None):

    """中间件加载初始化"""

    global middleware_instance

    middleware_instance = SensitiveFilterMiddleware(config)

    print(f"? 敏感信息过滤中间件已加载 "

          f"(规则: {len(middleware_instance.rules)}, "

          f"关键词: {len(middleware_instance.keyword_matcher.keywords)})")

    return {"status": "loaded"}

def before_message(message: Dict) -> Dict:

    """消息发送前拦截"""

    if not middleware_instance:

        return message

    text = message.get("content", "")

    user_id = message.get("user_id")

    result = middleware_instance.check(text, user_id)

    # 替换为脱敏后的内容

    message["content"] = result["masked_text"]

    # 附加元数据(不影响消息内容,供下游使用)

    message["_meta"] = message.get("_meta", {})

    message["_meta"]["sensitive_filter"] = {

        "masked": result["masked"],

        "count": result["total_matched"]

    }

    return message

def on_unload():

    """卸载前清理"""

    global middleware_instance

    if middleware_instance:

        print(f"???? 敏感信息过滤统计: "

              f"检查 {middleware_instance.stats['total_checked']} 条, "

              f"脱敏 {middleware_instance.stats['total_masked']} 条")

    middleware_instance = None

    print("???? 敏感信息过滤中间件已卸载")

    return {"status": "unloaded"}

3.3 效果演示

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

# 测试敏感词过滤

from middleware_sensitive_filter import SensitiveFilterMiddleware

middleware = SensitiveFilterMiddleware({

    "keywords": ["密码", "密钥", "secret"],

    "mask_char": "*"

})

test_messages = [

    "您的手机号是13812341234,请记录",

    "我的身份证号是310101199001011234",

    "API密钥是sk-abcdefghijklmnopqrstuvw",

    "请帮我重置一下登录密码",

]

for msg in test_messages:

    result = middleware.check(msg)

    status = "????" if not result["masked"] else "????"

    print(f"{status} 原文: {result['original_length']}字")

    print(f"   脱敏后: {result['masked_text']}")

    if result["details"]:

        for d in result["details"]:

            print(f"   └─ {d['rule']}: {d['matches']}处匹配")

    print()

预期输出:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

???? 原文: 16字

   脱敏后: 您的手机号是138****1234,请记录

   └─ phone_cn: 1处匹配

 

???? 原文: 19字

   脱敏后: 我的身份证号是3101**********1234

   └─ id_card_cn: 1处匹配

 

???? 原文: 18字

   脱敏后: API密钥是sk-a***...tuvw

   └─ api_key: 1处匹配

 

???? 原文: 11字

   脱敏后: 请帮我重置一下登录**

   └─ keyword_match: 1处匹配

4. 实战案例二:请求耗时统计仪表盘中间件

4.1 需求分析

目标:统计每个渠道的消息处理耗时,按小时聚合,提供 API 查询仪表盘数据。

4.2 环绕中间件实现

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

"""

middleware_latency_stats.py 

请求耗时统计仪表盘中间件

采用环绕模式(Around)实现对 Agent 处理全流程的耗时统计,

并聚合为小时级数据供监控面板查询。

"""

import time

import json

from collections import defaultdict

from typing import Dict, Any, Optional

from datetime import datetime

class LatencyStatsMiddleware:

    """环绕中间件——包裹Agent处理过程并记录耗时"""

    def __init__(self, config: Optional[Dict] = None):

        # 小时级聚合数据 { "2026-06-21T12": { "feishu": {count, total, min, max} } }

        self.hourly_stats = defaultdict(lambda: defaultdict(

            lambda: {"count": 0, "total_ms": 0, "min": float("inf"), "max": 0, "p99": 0}

        ))

        self.all_durations = []  # 用于计算P99

    def get_hour_key(self) -> str:

        """获取当前小时标识"""

        return datetime.now().strftime("%Y-%m-%dT%H")

    def update_stats(self, channel: str, duration_ms: float):

        """更新统计指标"""

        hour = self.get_hour_key()

        stats = self.hourly_stats[hour][channel]

        stats["count"] += 1

        stats["total_ms"] += duration_ms

        stats["min"] = min(stats["min"], duration_ms)

        stats["max"] = max(stats["max"], duration_ms)

        # 保留最近1000条用于P99计算

        self.all_durations.append(duration_ms)

        if len(self.all_durations) > 1000:

            self.all_durations.pop(0)

    def get_stats(self, hours: int = 24) -> Dict:

        """获取最近 N 小时的统计数据,供监控面板 API 查询"""

        result = {}

        now = datetime.now()

        for h in range(hours):

            key = (now.replace(hour=now.hour - h)).strftime("%Y-%m-%dT%H")

            if key in self.hourly_stats:

                hour_data = {}

                for channel, stats in self.hourly_stats[key].items():

                    avg = stats["total_ms"] / stats["count"] if stats["count"] > 0 else 0

                    hour_data[channel] = {

                        **stats,

                        "avg_ms": round(avg, 2),

                        "min": round(stats["min"], 2) if stats["min"] != float("inf") else 0,

                        "max": round(stats["max"], 2)

                    }

                result[key] = hour_data

        return result

    def calculate_p99(self) -> float:

        """计算P99延迟"""

        if not self.all_durations:

            return 0

        sorted_durations = sorted(self.all_durations)

        idx = int(len(sorted_durations) * 0.99)

        return sorted_durations[idx]

# ============================================

# 环绕中间件——执行包裹

# ============================================

stats_middleware = None

def on_load(config: Dict = None):

    global stats_middleware

    stats_middleware = LatencyStatsMiddleware(config)

    print("? 请求耗时统计中间件已加载")

    return {"status": "loaded"}

# 环绕中间件需要一个包裹函数来同时处理请求前和请求后

# 在OpenClaw中,通过yield语法实现环绕

def around_message(message: Dict, next_handler: callable):

    """

    环绕中间件——包裹完整的消息处理流程

    prev_handler: 返回生成器(yield前是前置,yield后是后置)

    """

    channel = message.get("channel", "unknown")

    start = time.time()

    # ====== 前置:记录开始时间 ======

    # 可以在这里做限流判断

    print(f"???? [{channel}] 开始处理消息...")

    # ====== 调用下一层(Agent处理) ======

    result = next_handler(message)

    # ====== 后置:计算耗时并更新统计 ======

    duration_ms = (time.time() - start) * 1000

    stats_middleware.update_stats(channel, duration_ms)

    print(f"???? [{channel}] 处理完成, 耗时: {duration_ms:.2f}ms")

    # 将耗时信息附加到响应元数据

    if isinstance(result, dict):

        result["_meta"] = result.get("_meta", {})

        result["_meta"]["latency_ms"] = round(duration_ms, 2)

    return result

def get_dashboard_data():

    """查询仪表盘数据的API"""

    if not stats_middleware:

        return {"error": "中间件未初始化"}

    return {

        "p99_ms": round(stats_middleware.calculate_p99(), 2),

        "hourly": stats_middleware.get_stats(24)

    }

EXPORTS = {

    "middleware": {

        "around_message": around_message

    },

    "api": {

        "get_dashboard_data": get_dashboard_data

    }

}

4.3 模拟仪表盘数据查询

1

2

3

4

5

6

7

8

9

# 查询仪表盘数据

data = get_dashboard_data()

for hour_key, channels in sorted(data["hourly"].items()):

    print(f"\n? {hour_key}")

    for channel, stats in channels.items():

        print(f"  ???? {channel}: "

              f"请求数={stats['count']}, "

              f"平均={stats['avg_ms']}ms, "

              f"P99={data['p99_ms']}ms")

5. 中间件开发最佳实践

5.1 中间件设计原则

原则 说明 反例
单一职责 一个中间件只做一件事 既做鉴权又做统计
无副作用 不改变业务逻辑的正确性 过滤中间件把正常内容也删了
可配置 行为由配置控制 硬编码所有参数
幂等 多次执行不会出错 重复脱敏导致信息丢失
快速失败 检查不通过快速拒绝 链路里卡30秒不返回

5.2 执行顺序的最佳实践

1

2

3

4

5

6

最外层(先拦截、最后返回)

  ├── 安全类中间件(鉴权、限流、CORS)

  ├── 日志类中间件(请求日志)

  ├── 业务类中间件(格式化、脱敏)

  ├── 监控类中间件(耗时、错误率)

  └── 最内层:Agent 核心处理

类型 order 建议 必须最先/最后
鉴权/限流 10-19 ? 必须最先(不通过则拒绝)
请求日志 20-29 记录原始请求
输入清洗 30-39 在 Agent 处理前完成
输出格式化 40-49 在 Agent 处理后立即执行
输出脱敏 50-59 ? 必须最后(保证干净输出)
性能统计 60-69 包裹整个流程

6. 总结

本文从零构建了两个生产级中间件,覆盖了 OpenClaw 中间件系统的完整知识点:

核心要点:

  1. 三种模式,一个入口:前置(before)做输入校验,后置(after)做输出增强,环绕(around)做全流程监控
  2. 洋葱模型:多个中间件形成层层嵌套的执行链——请求从外向内穿过,响应从内向外返回
  3. 敏感词过滤:正则匹配 → 关键词筛 → 脱敏替换 → 审计日志,四步完成安全过滤
  4. 耗时统计:环绕中间件记录全流程耗时,按小时+渠道聚合 P99 延迟
  5. 横切关注点分离:中间件的核心价值——不改核心代码,独立添加任何横向能力

思考题:

  1. 你有一个中间件需要耗时较长(如调用外部 API 做内容审核)。如何设计异步中间件,避免阻塞整个消息处理链路?
  2. 两个中间件可能产生冲突——比如"格式化中间件"添加了 Markdown 样式,但"脱敏中间件"把加粗语法也脱敏了。你会如何设计中间件的执行顺序避免这种冲突?
  3. 中间件本身也会出错(比如统计中间件的 Redis 挂了)。如何设计中间件的降级策略——是让整个流程失败,还是跳过出错的中间件继续处理?

版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 :
相关文章
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计