广告位联系
返回顶部
分享到

使用Python一键实现MySQL数据库高可用切换

python 来源:互联网 作者:佚名 发布时间:2026-06-28 18:59:13 人浏览
摘要

在当今云原生与微服务架构盛行的时代,数据库的稳定性和高可用性(HA, High Availability)是企业IT系统的生命线。传统的数据库高可用方案(如 MHA、Orchestrator、Keepalived)虽然强大,但部署复杂

在当今云原生与微服务架构盛行的时代,数据库的稳定性和高可用性(HA, High Availability)是企业IT系统的生命线。传统的数据库高可用方案(如 MHA、Orchestrator、Keepalived)虽然强大,但部署复杂、维护成本高,且与特定基础设施绑定较深。

本文将带你从零开始,使用 Python 编写一个轻量级、无状态、生产可用的数据库高可用自动故障转移(Failover)系统。通过核心的三个模块——健康检查(Monitor) 、拓扑选主(Leader Election) 和 流量路由切换(Routing) ,实现真正的一键式数据库故障自愈。

一、 为什么选择 Python 实现高可用?

在深入代码之前,我们先明确为什么用 Python 编写高可用控制器(Controller):

  1. 开发效率高 :Python 拥有丰富的生态库(如 pymysql 、redis 、paramiko ),可以用极少的代码对接各种基础设施。
  2. 易于集成 :现代运维体系(如 Ansible、SaltStack)原生支持 Python,编写的脚本可以无缝嵌入现有的 CI/CD 或自动化运维平台。
  3. 灵活性强 :企业环境千差万别,自研的 Python 控制器可以根据业务需求,自由定制切换逻辑(如:结合云平台的 VIP 绑定 API,或者直接修改 Consul/Nacos 的配置中心)。

核心设计原则

  • 防脑裂(Split-Brain Prevention) :通过引入分布式锁(本方案采用 Redis,生产环境建议使用 Consul 或 Etcd),确保同一时间只有一个控制器在执行切换动作。
  • 双向校验 :不仅检查 TCP 存活,更要深入数据库内部检查 read_only 状态、主从复制延迟(Replication Delay)以及事务执行能力。
  • 幂等性(Idempotence) :所有切换步骤必须可重入。如果切换中途失败,再次运行能够安全地接着执行或回滚。

二、 系统架构与工作原理

本方案设计的标准拓扑为 一主一从(Master-Slave) 架构:

  • Master (主库) :负责业务的读写,read_only = OFF 。
  • Slave (从库) :负责同步主库数据,支持读业务,read_only = ON 。

高可用控制器的核心工作流如下图所示:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

+-------------------+

 

|  定时健康检查循环  |

+---------+---------+

          |

          v

    [主库是否异常?]

       /      \

     (否)     (是)

     /          \

+--------+   +-----------------------+

 

| 继续监控 |   |  抢占分布式锁 (防脑裂)   |

+--------+   +-----------+-----------+

                         |

                   [是否抢锁成功?]

                      /      \

                    (否)     (是)

                    /          \

           +------------+   +---------------------------+

 

           | 放弃切换, |   | 1. 再次确认主库真的挂了    |

           | 退出当前循环|   | 2. 将从库提升为主库(Read/Write)

           +------------+   | 3. 更新流量路由(Consul/VIP)|

                            +---------------------------+

请谨慎使用此类代码。

三、 全套 Python 核心代码实现

为了保证代码的可读性与生产实用性,我们将所有逻辑封装在一个完整的 Python 文件中。代码中包含了详细的异常处理、日志记录以及上下文管理器。

在运行代码前,请确保安装了以下依赖库:

1

pip install pymysql redis

完整程序代码:db_ha_controller.py

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

 

"""

模块名称: db_ha_controller.py

功能描述: 生产级 MySQL 数据库一键高可用与故障自动转移控制器

"""

 

import os

import sys

import time

import logging

import pymysql

import redis

from contextlib import contextmanager

 

# ==============================================================================

# 1. 日志与全局配置模块

# ==============================================================================

logging.basicConfig(

    level=logging.INFO,

    format='%(asctime)s [%(levelname)s] [%(filename)s:%(lineno)d]: %(message)s',

    handlers=[

        logging.StreamHandler(sys.stdout),

        logging.FileHandler("db_ha_controller.log", encoding="utf-8")

    ]

)

logger = logging.getLogger("DB-HA")

 

class HAConfig:

    # 数据库群组配置

    MASTER_HOST = os.getenv("HA_MASTER_HOST", "192.168.1.101")

    SLAVE_HOST = os.getenv("HA_SLAVE_HOST", "192.168.1.102")

    DB_PORT = int(os.getenv("HA_DB_PORT", 3306))

    DB_USER = os.getenv("HA_DB_USER", "ha_admin")

    DB_PASSWORD = os.getenv("HA_DB_PASSWORD", "SecureP@ss123")

     

    # 探测参数

    CHECK_TIMEOUT = 3       # 数据库连接超时(秒)

    MAX_RETRIES = 3         # 判断为主库故障的连续失败次数

    CHECK_INTERVAL = 5      # 健康检查周期(秒)

     

    # 分布式锁配置(用于防脑裂)

    REDIS_HOST = os.getenv("HA_REDIS_HOST", "192.168.1.200")

    REDIS_PORT = int(os.getenv("HA_REDIS_PORT", 6379))

    REDIS_PASSWORD = os.getenv("HA_REDIS_PASSWORD", None)

    LOCK_KEY = "db_ha_failover_lock"

    LOCK_TIMEOUT = 60       # 锁有效期(秒),防止抢锁后控制器崩溃导致死锁

     

    # 伪服务发现/流量路由配置(模拟服务发现中心如 Consul/Etcd)

    CONSUL_KV_ROUTE_KEY = "service/mysql/primary"

 

# ==============================================================================

# 2. 数据库与缓存连接上下文管理器

# ==============================================================================

@contextmanager

def get_db_connection(host, user, password, port, timeout=3):

    """安全地获取数据库连接的上下文管理器"""

    conn = None

    try:

        conn = pymysql.connect(

            host=host,

            user=user,

            password=password,

            port=port,

            connect_timeout=timeout,

            autocommit=True

        )

        yield conn

    finally:

        if conn:

            try:

                conn.close()

            except Exception as e:

                logger.debug(f"关闭数据库连接时发生异常: {e}")

 

@contextmanager

def get_redis_client():

    """安全地获取 Redis 客户端的上下文管理器"""

    client = None

    try:

        client = redis.Redis(

            host=HAConfig.REDIS_HOST,

            port=HAConfig.REDIS_PORT,

            password=HAConfig.REDIS_PASSWORD,

            socket_timeout=3,

            decode_responses=True

        )

        yield client

    finally:

        if client:

            del client

 

# ==============================================================================

# 3. 核心功能服务类

# ==============================================================================

class DatabaseHAService:

    def __init__(self):

        self.config = HAConfig()

 

    # --------------------------------------------------------------------------

    # 3.1 健康检查模块

    # --------------------------------------------------------------------------

    def check_database_health(self, host):

        """

        全方位检查数据库健康状态。

        返回元组: (is_healthy, reason, is_read_only)

        """

        try:

            with get_db_connection(

                host=host,

                user=self.config.DB_USER,

                password=self.config.DB_PASSWORD,

                port=self.config.DB_PORT,

                timeout=self.config.CHECK_TIMEOUT

            ) as conn:

                with conn.cursor() as cursor:

                    # 1. 基础连通性与权限执行测试

                    cursor.execute("SELECT 1;")

                     

                    # 2. 检查数据库的 read_only 状态

                    cursor.execute("SHOW VARIABLES LIKE 'read_only';")

                    row = cursor.fetchone()

                    is_read_only = True if row and row[1].upper() == "ON" else False

                     

                    # 3. 尝试创建并删除临时表,验证实际写入能力(如果是主库)

                    if not is_read_only:

                        try:

                            cursor.execute("CREATE DATABASE IF NOT EXISTS ha_heartbeat_db;")

                            cursor.execute("CREATE TABLE IF NOT EXISTS ha_heartbeat_db.t_check (id INT);")

                            cursor.execute("INSERT INTO ha_heartbeat_db.t_check VALUES (1);")

                            cursor.execute("DROP TABLE ha_heartbeat_db.t_check;")

                        except pymysql.MySQLError as we:

                            return False, f"数据库虽然在线,但是无法写入数据: {str(we)}", is_read_only

 

                    return True, "健康", is_read_only

        except pymysql.OperationalError as oe:

            return False, f"网络无法连接或认证失败: {str(oe)}", None

        except Exception as e:

            return False, f"未知错误: {str(e)}", None

 

    # --------------------------------------------------------------------------

    # 3.2 分布式锁控制模块 (防脑裂)

    # --------------------------------------------------------------------------

    def acquire_failover_lock(self, redis_cli, identifier):

        """尝试获取分布式锁,防止多个控制器并发执行 Failover"""

        logger.info(f"正在尝试获取分布式锁,识别码: {identifier} ...")

        # 使用 SET NX EX 实现原子性加锁

        if redis_cli.set(self.config.LOCK_KEY, identifier, ex=self.config.LOCK_TIMEOUT, nx=True):

            logger.info("成功获取分布式锁,具备主备切换资格。")

            return True

        return False

 

    def release_failover_lock(self, redis_cli, identifier):

        """使用 Lua 脚本安全释放锁,确保不会误删其他控制器的锁"""

        lua_release_script = """

        if redis.call('get', KEYS[1]) == ARGV[1] then

            return redis.call('del', KEYS[1])

        else

            return 0

        end

        """

        try:

            result = redis_cli.eval(lua_release_script, 1, self.config.LOCK_KEY, identifier)

            if result == 1:

                logger.info("分布式锁释放成功。")

                return True

            else:

                logger.warning("分布式锁未能释放,锁可能已过期或被挪用。")

                return False

        except Exception as e:

            logger.error(f"释放分布式锁时发生致命异常: {e}")

            return False

 

    # --------------------------------------------------------------------------

    # 3.3 流量切换与配置中心路由模块

    # --------------------------------------------------------------------------

    def update_traffic_routing(self, redis_cli, new_master_host):

        """

        更新配置中心的服务发现指针。

        在真实生产中,这里可以替换为调用 Consul API、Etcd API 或切换 AWS/阿里云的 VIP。

        """

        logger.info(f"开始更新业务流量路由,新主库目标地址指向: {new_master_host}")

        try:

            # 此处以 Redis 模拟一个集中的全局配置中心

            redis_cli.set(self.config.CONSUL_KV_ROUTE_KEY, f"{new_master_host}:{self.config.DB_PORT}")

            logger.info(">>> [流量切换成功] 业务配置路由已秒级同步至最新主库!")

            return True

        except Exception as e:

            logger.critical(f"流量路由更新失败,系统可能陷入瘫痪状态,请立刻人工介入!错误: {e}")

            return False

 

    # --------------------------------------------------------------------------

    # 3.4 拓扑提升与一键 Failover 模块

    # --------------------------------------------------------------------------

    def promote_slave_to_master(self, slave_host):

        """

        执行命令,解雇从库的复制身份,将其提拔为独立可读写的主库。

        """

        logger.info(f"开始对从库 {slave_host} 进行身份提拔...")

        try:

            with get_db_connection(

                host=slave_host,

                user=self.config.DB_USER,

                password=self.config.DB_PASSWORD,

                port=self.config.DB_PORT,

                timeout=self.config.CHECK_TIMEOUT

            ) as conn:

                with conn.cursor() as cursor:

                    # 1. 停止从库复制链路

                    logger.info("执行: STOP SLAVE / STOP REPLICA...")

                    try:

                        cursor.execute("STOP SLAVE;")

                    except pymysql.InternalError:

                        # 兼容 MySQL 8.0+ 语法

                        cursor.execute("STOP REPLICA;")

                     

                    # 2. 清除复制元数据,防止重启后再次寻找旧主库

                    logger.info("执行: RESET SLAVE ALL / RESET REPLICA ALL...")

                    try:

                        cursor.execute("RESET SLAVE ALL;")

                    except pymysql.InternalError:

                        cursor.execute("RESET REPLICA ALL;")

                     

                    # 3. 关闭全局只读模式,赋予其完整的写权限

                    logger.info("执行: SET GLOBAL read_only = OFF, super_read_only = OFF...")

                    cursor.execute("SET GLOBAL read_only = OFF;")

                    cursor.execute("SET GLOBAL super_read_only = OFF;")

                     

                    logger.info(f"从库 {slave_host} 提升为主库成功。")

                    return True

        except Exception as e:

            logger.critical(f"提升从库 {slave_host} 失败!错误详情: {e}")

            return False

 

    # --------------------------------------------------------------------------

    # 3.5 自动化 Failover 总指挥流程

    # --------------------------------------------------------------------------

    def execute_one_click_failover(self, redis_cli, controller_id):

        """一键高可用切换核心逻辑控制引擎"""

        logger.critical("检测到主库持续异常!即将启动一键自动高可用切换流程...")

         

        # 步骤 1:抢占分布式锁,防脑裂

        if not self.acquire_failover_lock(redis_cli, controller_id):

            logger.warning("未抢到分布式锁,当前切换动作由其他集群控制器执行中。退出本次切换。")

            return False

 

        try:

            # 步骤 2:双向校验。在抢到锁后,再次检查主库,防止网络由于瞬时抖动引发误报

            logger.info("进入二次确认阶段,再次复检旧主库状态...")

            re_check, reason, _ = self.check_database_health(self.config.MASTER_HOST)

            if re_check:

                logger.warning(f"虚惊一场!旧主库在二次确认中恢复健康({reason}),终止切换流程。")

                return False

             

            # 步骤 3:验证备选从库是否健康,是否有资格接管大局

            logger.info(f"检查目标备选从库 {self.config.SLAVE_HOST} 是否可用...")

            slave_ok, slave_reason, is_ro = self.check_database_health(self.config.SLAVE_HOST)

            if not slave_ok:

 

 

 

logger.critical(f"灾难级故障:主库挂了,但目标从库也处于不可用状态!原因: {slave_reason}。无法执行Failover!")

return False

 

# 步骤 4:执行拓扑提拔,切断主从并摘除只读

if not self.promote_slave_to_master(self.config.SLAVE_HOST):

logger.critical("从库提拔逻辑执行失败,放弃流量路由修改!")

return False

 

# 步骤 5:业务路由无缝更新

if not self.update_traffic_routing(redis_cli, self.config.SLAVE_HOST):

logger.critical("虽然从库已升为主库,但路由配置中心更新失败!系统面临读写黑洞风险!")

return False

 

logger.critical(f"====== [大功告成] 数据库群组已成功完成故障转移!新主库: {self.config.SLAVE_HOST} ======")

return True

 

except Exception as master_error:

logger.error(f"高可用切换引擎执行期间遭遇未预料的系统级异常: {master_error}")

return False

finally:

# 无论切换成功还是抛出异常,必须妥善清理并释放分布式锁

self.release_failover_lock(redis_cli, controller_id)

 

# --------------------------------------------------------------------------

# 3.6 守护进程控制循环

# --------------------------------------------------------------------------

def start_guardian_loop(self):

"""启动持久化高可用守护轮询服务"""

controller_id = f"ha_node_{os.getpid()}_{int(time.time())}"

logger.info(f"MySQL 高可用自动化守护进程已成功启动。当前节点ID: {controller_id}")

logger.info(f"监控目标 -> 主库: {self.config.MASTER_HOST}, 从库:{self.config.SLAVE_HOST}")

 

consecutive_failures = 0

 

while True:

try:

# 检查旧主库状态

is_ok, reason, is_ro = self.check_database_health(self.config.MASTER_HOST)

 

if is_ok:

if is_ro:

logger.warning(f"警告: 主库 {self.config.MASTER_HOST} 在线,但被错误地设置为了 READ-ONLY 状态!")

consecutive_failures = 0

logger.info(f"心跳正常 - 主库 {self.config.MASTER_HOST} 运转良好。")

else:

consecutive_failures += 1

logger.error(f"心跳异常 - 主库 {self.config.MASTER_HOST} 检查失败 [{consecutive_failures}/{self.config.MAX_RETRIES}]. 原因: {reason}")

 

# 触发判定:当连续失败达到设定阈值时,正式宣告主库死亡

if consecutive_failures >= self.config.MAX_RETRIES:

with get_redis_client() as redis_cli:

self.execute_one_click_failover(redis_cli, controller_id)

 

# 成功执行过切换逻辑后,为避免旧主库恢复带来的反复抖动或脑裂,守护进程强制安全退出

logger.critical("本节点已完成高可用历史使命,为确保安全,守护进程现在优雅退出。")

break

 

except redis.exceptions.RedisError as re:

logger.error(f"分布式协调中心 Redis 连接异常,高可用降级为纯监控模式: {re}")

except Exception as e:

logger.critical(f"监控主循环中捕获到未知异常: {e}")

 

time.sleep(self.config.CHECK_INTERVAL)

 

 

# ==============================================================================

 

 

# 4. 自动化脚本入口

 

 

# ==============================================================================

 

if name == "main":

# 初始化 HA 控制引擎并开始执勤

ha_engine = DatabaseHAService()

try:

ha_engine.start_guardian_loop()

except KeyboardInterrupt:

logger.info("收到终止信号,MySQL 高可用守护进程安全退出。")

??????四、 核心代码深度解析

这段代码并不是一段简单的玩具脚本,它针对分布式运维中的痛点进行了深度的健壮性设计。以下是该系统的三大技术支柱解析:

1. 深入数据库骨髓的“全位健康检查”

大多数基础监控工具(如 Ping 命令、Zabbix 的 simple check)仅仅探测服务器的 3306 端口是否开放。然而,在真实的生产环境中,很多数据库故障表现为**“端口开着,但无法提供服务”**。
* **物理死锁 / 线程满**:此时建立连接会直接挂起。本代码通过 `pymysql.connect(..., connect_timeout=3)` 设定了强超时,防止监控线程自身被卡死。
* **磁盘满导致的“只读”**:当云盘空间满或存储底层故障时,操作系统或 MySQL 会强制进入 `read_only` 状态。此时能够成功执行 `SELECT 1`,但业务写入会全部报错。
* **本代码的绝招**:在检测中,如果是主库,代码会动态尝试去创建、插入、并当场销毁一个临时表 `ha_heartbeat_db.t_check`。只有当“链路通、配置对、真能写”这三个条件同时满足时,才判定主库存活。

2. 利用 Redis 实现的高效防脑裂锁机制

高可用系统最忌讳的是**“脑裂”**:由于网络分区(Network Partition),主库与监控节点断开,但主库与一部分业务客户端依然连通;此时如果监控节点擅自拉起一个新主库,就会导致整个架构同时存在两个写主库,数据将会发生毁灭性的错乱。

为了规避这一风险,代码采用了分布式协调方案:

1

redis_cli.set(self.config.LOCK_KEY, identifier, ex=self.config.LOCK_TIMEOUT, nx=True)

  • nx=True :保证了该操作的原子性。只有当该 Key 不存在时,当前实例才能写成功。抢到锁的实例,才有资格下达“切换”的军令。
  • ex=HAConfig.LOCK_TIMEOUT :给锁加上了生命周期。一旦获得锁的 HA 控制器在切换过程中意外宕机或断网,锁会在 60 秒内自动释放,允许其他幸存的 HA 控制器接管任务,保证了整个高可用系统的弹性。
  • Lua 脚本安全解锁 :在 release_failover_lock 中,采用了通过内置 Lua 脚本去验证 value 相同才删除锁。这是分布式锁的标准设计,避免了因为本节点切换超时,锁到期自动释放后,本节点又误删了其他新抢锁节点的锁。

3. 一键提拔与解雇(Replica Promotion)

当认定旧主库无可救药时,控制器的核心动作是对从库的身份执行“手术”:

  1. STOP REPLICA; :首先切断其向旧主库同步数据的纽带,防止旧主库偶尔蹦出零星脏数据污染新主库。
  2. RESET REPLICA ALL; :彻底洗刷它的“奴隶”身份。该命令会清除所有关于旧主库的 IP、端口、Master_Log_File 等元数据,使其成为一个底气十足的独立新实例。
  3. SET GLOBAL read_only = OFF; :解除封印,正式对外接收来自应用端源源不断的写流量。

五、 生产环境部署与最佳实践指南

如果您准备将这套 Python 脚本应用到生产或准生产环境中,以下几点架构改造将让它变得坚不可摧:

1. 流量路由的落地实现方式

在上面的脚本中,update_traffic_routing 模块只进行了一个 Redis 字符串的写操作作为演示。在企业级实际应用中,您有以下三种标准替代手段:

  • Consul / Nacos 服务发现模式 :
    将微服务系统的 datasource.url 配置指向服务发现中心的外部域名。在 Python 脚本的 update_traffic_routing 中,使用 requests 库调用 Consul 的 HTTP API,将对应名称的服务的 Address 修改为新主库的 IP。整个微服务群在数秒内即可收到配置变更推送,无缝迁移流量。
  • 自建 Keepalived / 虚IP(VIP)模式 :

若服务器部署在物理机或私有云中,可让高可用脚本通过 paramiko (Python 的 SSH 客户端库)远程登录到新主库,在其网卡上动态绑定虚拟 IP:

1

ip addr add 192.168.1.250/24 dev eth0

  • 同时,SSH 登录旧主库将该 VIP 解绑。
  • 公有云 API 切换(如 AWS / 阿里云) :
    如果数据库部署在云主机上,可以引入云厂商的官方 SDK(如 boto3 或 alibabacloud ),在流量切换时直接调用 API,将绑定的 EIP(弹性公英网IP) 或内部负载均衡器的后端节点直接漂移至新主库主机。

2. 守护进程的自愈与托管

高可用控制器本身作为一个 Python 脚本,也有崩溃的可能。绝对不能直接在终端里用 python3 db_ha_controller.py & 粗暴运行。

推荐使用 Linux 的 Systemd 进行服务托管。编写配置文件 /etc/systemd/system/db-ha.service :

1

2

3

4

5

6

7

8

9

10

11

12

13

[Unit]

Description=MySQL Database High Availability Controller

After=network.target

[Service]

Type=simple

User=root

Environment=HA_MASTER_HOST=10.0.0.10 HA_SLAVE_HOST=10.0.0.11 HA_REDIS_HOST=10.0.0.50

ExecStart=/usr/bin/python3 /usr/local/bin/db_ha_controller.py

Restart=always

RestartSec=5

PrivateTmp=true

[Install]

WantedBy=multi-user.target

通过配置 Restart=always ,当该 Python 进程因内存溢出、环境异常等意外 挂掉时,Linux 操作系统会在 5 秒内自动重启该脚本,确保高可用监控永远在线。

3. 数据一致性保障(半同步复制与 GTID)

任何高可用系统,如果底层数据库的数据没有同步好,强行切换都是在耍流氓。为了配合此高可用 Python 脚本,建议在 MySQL 层面开启以下两项硬核技术:

  1. GTID(全局事务标识符) :
    开启 GTID(gtid_mode=ON )能确保主从库的事务在全球拥有唯一身份证。未来旧主库修复好后,可以极其轻松地将其作为新从库重新挂载,不会发生错位。
  2. 半同步复制(Semi-Sync Replication) :
    默认的异步复制在主库暴毙时,从库可能还没收到最后的几条 Binlog。开启半同步复制后,主库在提交事务时,必须至少等待一个从库收到并写入 Relay Log 后才向客户端返回成功。这能配合高可用脚本做到 RPO=0(零数据丢失) 。

六、 演练与验证:如何测试这套高可用系统?

在上线前,必须在测试环境进行毁灭性压测,以验证高可用是否灵敏。

步骤 1:启动监控

在一台 独立的服务器上启动我们的 Python 脚本:

1

2

systemctl start db-ha

tail -f db_ha_controller.log

此时日志应持续输出:[INFO]: 心跳正常 - 主库 192.168.1.101 运转良好。

步骤 2:模拟主库突发灾难

登录主库所在的服务器 192.168.1.101 ,直接拔掉网线,或者暴力停止 MySQL 服务:

1

2

3

4

# 模拟彻底宕机

systemctl stop mysqld

# 或者直接让端口消失

iptables -A INPUT -p tcp --dport 3306 -j DROP

步骤 3:观察高可用控制器的反应

此时,转回 Python 控制器的日志终端,你会看到如下惊心动魄且井然有序的自愈过程:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

2026-06-24 20:00:00 [ERROR] [db_ha_controller.py:116]: 心跳异常 - 主库 192.168.1.101 检查失败 [1/3]. 原因: 网络无法连接或认证失败...

2026-06-24 20:00:05 [ERROR] [db_ha_controller.py:116]: 心跳异常 - 主库 192.168.1.101 检查失败 [2/3]. 原因: 网络无法连接或认证失败...

2026-06-24 20:00:10 [ERROR] [db_ha_controller.py:116]: 心跳异常 - 主库 192.168.1.101 检查失败 [3/3]. 原因: 网络无法连接或认证失败...

2026-06-24 20:00:10 [CRITICAL] [db_ha_controller.py:173]: 检测到主库持续异常!即将启动一键自动高可用切换流程...

2026-06-24 20:00:10 [INFO] [db_ha_controller.py:126]: 正在尝试获取分布式锁,识别码: ha_node_4215_1719273610 ...

2026-06-24 20:00:10 [INFO] [db_ha_controller.py:129]: 成功获取分布式锁,具备主备切换资格。

2026-06-24 20:00:10 [INFO] [db_ha_controller.py:179]: 进入二次确认阶段,再次复检旧主库状态...

2026-06-24 20:00:13 [INFO] [db_ha_controller.py:185]: 检查目标备选从库 192.168.1.102 是否可用...

2026-06-24 20:00:13 [INFO] [db_ha_controller.py:149]: 开始对从库 192.168.1.102 进行身份提拔...

2026-06-24 20:00:13 [INFO] [db_ha_controller.py:155]: 执行: STOP SLAVE / STOP REPLICA...

2026-06-24 20:00:13 [INFO] [db_ha_controller.py:161]: 执行: RESET SLAVE ALL / RESET REPLICA ALL...

2026-06-24 20:00:13 [INFO] [db_ha_controller.py:167]: 执行: SET GLOBAL read_only = OFF, super_read_only = OFF...

2026-06-24 20:00:14 [INFO] [db_ha_controller.py:170]: 从库 192.168.1.102 提升为主库成功。

2026-06-24 20:00:14 [INFO] [db_ha_controller.py:136]: 开始更新业务流量路由,新主库目标地址指向: 192.168.1.102

2026-06-24 20:00:14 [INFO] [db_ha_controller.py:139]: >>> [流量切换成功] 业务配置路由已秒级同步至最新主库!

2026-06-24 20:00:14 [CRITICAL] [db_ha_controller.py:199]: ====== [大功告成] 数据库群组已成功完成故障转移!新主库: 192.168.1.102 ======

2026-06-24 20:00:14 [INFO] [db_ha_controller.py:143]: 分布式锁释放成功。

2026-06-24 20:00:14 [CRITICAL] [db_ha_controller.py:228]: 本节点已完成高可用历史使命,为确保安全,守护进程现在优雅退出。

此时,你去登录新主库 192.168.1.102 ,执行 SHOW VARIABLES LIKE 'read_only'; ,会发现其已经自动变为了 OFF 。此时业务应用程序也已经通过配置中心获知了新主库的地址,整个系统在 15 秒内完成了完美的故障自愈。

七、 总结

高可用不是某一个高深工具的代名词,而是一整套严密逻辑的组合拳。本文通过 200 余行纯正的 Python 代码,剥离了市面上复杂高可用组件的繁琐外壳,直击“探测 -> 抢锁 -> 提拔 -> 路由” 的本质。

通过将这段代码融入你的运维脚本库,不仅能够加深对分布式系统底层的理解,更能在面对突发故障时,让原本长达数十分钟的人工抢修,缩短为机器自我修复的短短 15 秒 。


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 :
相关文章
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计