在当今云原生与微服务架构盛行的时代,数据库的稳定性和高可用性(HA, High Availability)是企业IT系统的生命线。传统的数据库高可用方案(如 MHA、Orchestrator、Keepalived)虽然强大,但部署复杂、维护成本高,且与特定基础设施绑定较深。
本文将带你从零开始,使用 Python 编写一个轻量级、无状态、生产可用的数据库高可用自动故障转移(Failover)系统。通过核心的三个模块——健康检查(Monitor) 、拓扑选主(Leader Election) 和 流量路由切换(Routing) ,实现真正的一键式数据库故障自愈。
在深入代码之前,我们先明确为什么用 Python 编写高可用控制器(Controller):
核心设计原则
本方案设计的标准拓扑为 一主一从(Master-Slave) 架构:
高可用控制器的核心工作流如下图所示:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
+-------------------+
| 定时健康检查循环 | +---------+---------+ | v [主库是否异常?] / \ (否) (是) / \ +--------+ +-----------------------+
| 继续监控 | | 抢占分布式锁 (防脑裂) | +--------+ +-----------+-----------+ | [是否抢锁成功?] / \ (否) (是) / \ +------------+ +---------------------------+
| 放弃切换, | | 1. 再次确认主库真的挂了 | | 退出当前循环| | 2. 将从库提升为主库(Read/Write) +------------+ | 3. 更新流量路由(Consul/VIP)| +---------------------------+ |
请谨慎使用此类代码。
为了保证代码的可读性与生产实用性,我们将所有逻辑封装在一个完整的 Python 文件中。代码中包含了详细的异常处理、日志记录以及上下文管理器。
在运行代码前,请确保安装了以下依赖库:
|
1 |
pip install pymysql redis |
完整程序代码:db_ha_controller.py
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 |
#!/usr/bin/env python3 # -*- coding: utf-8 -*-
""" 模块名称: db_ha_controller.py 功能描述: 生产级 MySQL 数据库一键高可用与故障自动转移控制器 """
import os import sys import time import logging import pymysql import redis from contextlib import contextmanager
# ============================================================================== # 1. 日志与全局配置模块 # ============================================================================== logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] [%(filename)s:%(lineno)d]: %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("db_ha_controller.log", encoding="utf-8") ] ) logger = logging.getLogger("DB-HA")
class HAConfig: # 数据库群组配置 MASTER_HOST = os.getenv("HA_MASTER_HOST", "192.168.1.101") SLAVE_HOST = os.getenv("HA_SLAVE_HOST", "192.168.1.102") DB_PORT = int(os.getenv("HA_DB_PORT", 3306)) DB_USER = os.getenv("HA_DB_USER", "ha_admin") DB_PASSWORD = os.getenv("HA_DB_PASSWORD", "SecureP@ss123")
# 探测参数 CHECK_TIMEOUT = 3 # 数据库连接超时(秒) MAX_RETRIES = 3 # 判断为主库故障的连续失败次数 CHECK_INTERVAL = 5 # 健康检查周期(秒)
# 分布式锁配置(用于防脑裂) REDIS_HOST = os.getenv("HA_REDIS_HOST", "192.168.1.200") REDIS_PORT = int(os.getenv("HA_REDIS_PORT", 6379)) REDIS_PASSWORD = os.getenv("HA_REDIS_PASSWORD", None) LOCK_KEY = "db_ha_failover_lock" LOCK_TIMEOUT = 60 # 锁有效期(秒),防止抢锁后控制器崩溃导致死锁
# 伪服务发现/流量路由配置(模拟服务发现中心如 Consul/Etcd) CONSUL_KV_ROUTE_KEY = "service/mysql/primary"
# ============================================================================== # 2. 数据库与缓存连接上下文管理器 # ============================================================================== @contextmanager def get_db_connection(host, user, password, port, timeout=3): """安全地获取数据库连接的上下文管理器""" conn = None try: conn = pymysql.connect( host=host, user=user, password=password, port=port, connect_timeout=timeout, autocommit=True ) yield conn finally: if conn: try: conn.close() except Exception as e: logger.debug(f"关闭数据库连接时发生异常: {e}")
@contextmanager def get_redis_client(): """安全地获取 Redis 客户端的上下文管理器""" client = None try: client = redis.Redis( host=HAConfig.REDIS_HOST, port=HAConfig.REDIS_PORT, password=HAConfig.REDIS_PASSWORD, socket_timeout=3, decode_responses=True ) yield client finally: if client: del client
# ============================================================================== # 3. 核心功能服务类 # ============================================================================== class DatabaseHAService: def __init__(self): self.config = HAConfig()
# -------------------------------------------------------------------------- # 3.1 健康检查模块 # -------------------------------------------------------------------------- def check_database_health(self, host): """ 全方位检查数据库健康状态。 返回元组: (is_healthy, reason, is_read_only) """ try: with get_db_connection( host=host, user=self.config.DB_USER, password=self.config.DB_PASSWORD, port=self.config.DB_PORT, timeout=self.config.CHECK_TIMEOUT ) as conn: with conn.cursor() as cursor: # 1. 基础连通性与权限执行测试 cursor.execute("SELECT 1;")
# 2. 检查数据库的 read_only 状态 cursor.execute("SHOW VARIABLES LIKE 'read_only';") row = cursor.fetchone() is_read_only = True if row and row[1].upper() == "ON" else False
# 3. 尝试创建并删除临时表,验证实际写入能力(如果是主库) if not is_read_only: try: cursor.execute("CREATE DATABASE IF NOT EXISTS ha_heartbeat_db;") cursor.execute("CREATE TABLE IF NOT EXISTS ha_heartbeat_db.t_check (id INT);") cursor.execute("INSERT INTO ha_heartbeat_db.t_check VALUES (1);") cursor.execute("DROP TABLE ha_heartbeat_db.t_check;") except pymysql.MySQLError as we: return False, f"数据库虽然在线,但是无法写入数据: {str(we)}", is_read_only
return True, "健康", is_read_only except pymysql.OperationalError as oe: return False, f"网络无法连接或认证失败: {str(oe)}", None except Exception as e: return False, f"未知错误: {str(e)}", None
# -------------------------------------------------------------------------- # 3.2 分布式锁控制模块 (防脑裂) # -------------------------------------------------------------------------- def acquire_failover_lock(self, redis_cli, identifier): """尝试获取分布式锁,防止多个控制器并发执行 Failover""" logger.info(f"正在尝试获取分布式锁,识别码: {identifier} ...") # 使用 SET NX EX 实现原子性加锁 if redis_cli.set(self.config.LOCK_KEY, identifier, ex=self.config.LOCK_TIMEOUT, nx=True): logger.info("成功获取分布式锁,具备主备切换资格。") return True return False
def release_failover_lock(self, redis_cli, identifier): """使用 Lua 脚本安全释放锁,确保不会误删其他控制器的锁""" lua_release_script = """ if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end """ try: result = redis_cli.eval(lua_release_script, 1, self.config.LOCK_KEY, identifier) if result == 1: logger.info("分布式锁释放成功。") return True else: logger.warning("分布式锁未能释放,锁可能已过期或被挪用。") return False except Exception as e: logger.error(f"释放分布式锁时发生致命异常: {e}") return False
# -------------------------------------------------------------------------- # 3.3 流量切换与配置中心路由模块 # -------------------------------------------------------------------------- def update_traffic_routing(self, redis_cli, new_master_host): """ 更新配置中心的服务发现指针。 在真实生产中,这里可以替换为调用 Consul API、Etcd API 或切换 AWS/阿里云的 VIP。 """ logger.info(f"开始更新业务流量路由,新主库目标地址指向: {new_master_host}") try: # 此处以 Redis 模拟一个集中的全局配置中心 redis_cli.set(self.config.CONSUL_KV_ROUTE_KEY, f"{new_master_host}:{self.config.DB_PORT}") logger.info(">>> [流量切换成功] 业务配置路由已秒级同步至最新主库!") return True except Exception as e: logger.critical(f"流量路由更新失败,系统可能陷入瘫痪状态,请立刻人工介入!错误: {e}") return False
# -------------------------------------------------------------------------- # 3.4 拓扑提升与一键 Failover 模块 # -------------------------------------------------------------------------- def promote_slave_to_master(self, slave_host): """ 执行命令,解雇从库的复制身份,将其提拔为独立可读写的主库。 """ logger.info(f"开始对从库 {slave_host} 进行身份提拔...") try: with get_db_connection( host=slave_host, user=self.config.DB_USER, password=self.config.DB_PASSWORD, port=self.config.DB_PORT, timeout=self.config.CHECK_TIMEOUT ) as conn: with conn.cursor() as cursor: # 1. 停止从库复制链路 logger.info("执行: STOP SLAVE / STOP REPLICA...") try: cursor.execute("STOP SLAVE;") except pymysql.InternalError: # 兼容 MySQL 8.0+ 语法 cursor.execute("STOP REPLICA;")
# 2. 清除复制元数据,防止重启后再次寻找旧主库 logger.info("执行: RESET SLAVE ALL / RESET REPLICA ALL...") try: cursor.execute("RESET SLAVE ALL;") except pymysql.InternalError: cursor.execute("RESET REPLICA ALL;")
# 3. 关闭全局只读模式,赋予其完整的写权限 logger.info("执行: SET GLOBAL read_only = OFF, super_read_only = OFF...") cursor.execute("SET GLOBAL read_only = OFF;") cursor.execute("SET GLOBAL super_read_only = OFF;")
logger.info(f"从库 {slave_host} 提升为主库成功。") return True except Exception as e: logger.critical(f"提升从库 {slave_host} 失败!错误详情: {e}") return False
# -------------------------------------------------------------------------- # 3.5 自动化 Failover 总指挥流程 # -------------------------------------------------------------------------- def execute_one_click_failover(self, redis_cli, controller_id): """一键高可用切换核心逻辑控制引擎""" logger.critical("检测到主库持续异常!即将启动一键自动高可用切换流程...")
# 步骤 1:抢占分布式锁,防脑裂 if not self.acquire_failover_lock(redis_cli, controller_id): logger.warning("未抢到分布式锁,当前切换动作由其他集群控制器执行中。退出本次切换。") return False
try: # 步骤 2:双向校验。在抢到锁后,再次检查主库,防止网络由于瞬时抖动引发误报 logger.info("进入二次确认阶段,再次复检旧主库状态...") re_check, reason, _ = self.check_database_health(self.config.MASTER_HOST) if re_check: logger.warning(f"虚惊一场!旧主库在二次确认中恢复健康({reason}),终止切换流程。") return False
# 步骤 3:验证备选从库是否健康,是否有资格接管大局 logger.info(f"检查目标备选从库 {self.config.SLAVE_HOST} 是否可用...") slave_ok, slave_reason, is_ro = self.check_database_health(self.config.SLAVE_HOST) if not slave_ok:
logger.critical(f"灾难级故障:主库挂了,但目标从库也处于不可用状态!原因: {slave_reason}。无法执行Failover!") return False
# 步骤 4:执行拓扑提拔,切断主从并摘除只读 if not self.promote_slave_to_master(self.config.SLAVE_HOST): logger.critical("从库提拔逻辑执行失败,放弃流量路由修改!") return False
# 步骤 5:业务路由无缝更新 if not self.update_traffic_routing(redis_cli, self.config.SLAVE_HOST): logger.critical("虽然从库已升为主库,但路由配置中心更新失败!系统面临读写黑洞风险!") return False
logger.critical(f"====== [大功告成] 数据库群组已成功完成故障转移!新主库: {self.config.SLAVE_HOST} ======") return True
except Exception as master_error: logger.error(f"高可用切换引擎执行期间遭遇未预料的系统级异常: {master_error}") return False finally: # 无论切换成功还是抛出异常,必须妥善清理并释放分布式锁 self.release_failover_lock(redis_cli, controller_id)
# -------------------------------------------------------------------------- # 3.6 守护进程控制循环 # -------------------------------------------------------------------------- def start_guardian_loop(self): """启动持久化高可用守护轮询服务""" controller_id = f"ha_node_{os.getpid()}_{int(time.time())}" logger.info(f"MySQL 高可用自动化守护进程已成功启动。当前节点ID: {controller_id}") logger.info(f"监控目标 -> 主库: {self.config.MASTER_HOST}, 从库:{self.config.SLAVE_HOST}")
consecutive_failures = 0
while True: try: # 检查旧主库状态 is_ok, reason, is_ro = self.check_database_health(self.config.MASTER_HOST)
if is_ok: if is_ro: logger.warning(f"警告: 主库 {self.config.MASTER_HOST} 在线,但被错误地设置为了 READ-ONLY 状态!") consecutive_failures = 0 logger.info(f"心跳正常 - 主库 {self.config.MASTER_HOST} 运转良好。") else: consecutive_failures += 1 logger.error(f"心跳异常 - 主库 {self.config.MASTER_HOST} 检查失败 [{consecutive_failures}/{self.config.MAX_RETRIES}]. 原因: {reason}")
# 触发判定:当连续失败达到设定阈值时,正式宣告主库死亡 if consecutive_failures >= self.config.MAX_RETRIES: with get_redis_client() as redis_cli: self.execute_one_click_failover(redis_cli, controller_id)
# 成功执行过切换逻辑后,为避免旧主库恢复带来的反复抖动或脑裂,守护进程强制安全退出 logger.critical("本节点已完成高可用历史使命,为确保安全,守护进程现在优雅退出。") break
except redis.exceptions.RedisError as re: logger.error(f"分布式协调中心 Redis 连接异常,高可用降级为纯监控模式: {re}") except Exception as e: logger.critical(f"监控主循环中捕获到未知异常: {e}")
time.sleep(self.config.CHECK_INTERVAL)
# ==============================================================================
# 4. 自动化脚本入口
# ==============================================================================
if name == "main": # 初始化 HA 控制引擎并开始执勤 ha_engine = DatabaseHAService() try: ha_engine.start_guardian_loop() except KeyboardInterrupt: logger.info("收到终止信号,MySQL 高可用守护进程安全退出。") |
这段代码并不是一段简单的玩具脚本,它针对分布式运维中的痛点进行了深度的健壮性设计。以下是该系统的三大技术支柱解析:
大多数基础监控工具(如 Ping 命令、Zabbix 的 simple check)仅仅探测服务器的 3306 端口是否开放。然而,在真实的生产环境中,很多数据库故障表现为**“端口开着,但无法提供服务”**。
* **物理死锁 / 线程满**:此时建立连接会直接挂起。本代码通过 `pymysql.connect(..., connect_timeout=3)` 设定了强超时,防止监控线程自身被卡死。
* **磁盘满导致的“只读”**:当云盘空间满或存储底层故障时,操作系统或 MySQL 会强制进入 `read_only` 状态。此时能够成功执行 `SELECT 1`,但业务写入会全部报错。
* **本代码的绝招**:在检测中,如果是主库,代码会动态尝试去创建、插入、并当场销毁一个临时表 `ha_heartbeat_db.t_check`。只有当“链路通、配置对、真能写”这三个条件同时满足时,才判定主库存活。
高可用系统最忌讳的是**“脑裂”**:由于网络分区(Network Partition),主库与监控节点断开,但主库与一部分业务客户端依然连通;此时如果监控节点擅自拉起一个新主库,就会导致整个架构同时存在两个写主库,数据将会发生毁灭性的错乱。
为了规避这一风险,代码采用了分布式协调方案:
|
1 |
redis_cli.set(self.config.LOCK_KEY, identifier, ex=self.config.LOCK_TIMEOUT, nx=True) |
当认定旧主库无可救药时,控制器的核心动作是对从库的身份执行“手术”:
如果您准备将这套 Python 脚本应用到生产或准生产环境中,以下几点架构改造将让它变得坚不可摧:
在上面的脚本中,update_traffic_routing 模块只进行了一个 Redis 字符串的写操作作为演示。在企业级实际应用中,您有以下三种标准替代手段:
若服务器部署在物理机或私有云中,可让高可用脚本通过 paramiko (Python 的 SSH 客户端库)远程登录到新主库,在其网卡上动态绑定虚拟 IP:
|
1 |
ip addr add 192.168.1.250/24 dev eth0 |
高可用控制器本身作为一个 Python 脚本,也有崩溃的可能。绝对不能直接在终端里用 python3 db_ha_controller.py & 粗暴运行。
推荐使用 Linux 的 Systemd 进行服务托管。编写配置文件 /etc/systemd/system/db-ha.service :
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
[Unit] Description=MySQL Database High Availability Controller After=network.target [Service] Type=simple User=root Environment=HA_MASTER_HOST=10.0.0.10 HA_SLAVE_HOST=10.0.0.11 HA_REDIS_HOST=10.0.0.50 ExecStart=/usr/bin/python3 /usr/local/bin/db_ha_controller.py Restart=always RestartSec=5 PrivateTmp=true [Install] WantedBy=multi-user.target |
通过配置 Restart=always ,当该 Python 进程因内存溢出、环境异常等意外 挂掉时,Linux 操作系统会在 5 秒内自动重启该脚本,确保高可用监控永远在线。
任何高可用系统,如果底层数据库的数据没有同步好,强行切换都是在耍流氓。为了配合此高可用 Python 脚本,建议在 MySQL 层面开启以下两项硬核技术:
在上线前,必须在测试环境进行毁灭性压测,以验证高可用是否灵敏。
在一台 独立的服务器上启动我们的 Python 脚本:
|
1 2 |
systemctl start db-ha tail -f db_ha_controller.log |
此时日志应持续输出:[INFO]: 心跳正常 - 主库 192.168.1.101 运转良好。
登录主库所在的服务器 192.168.1.101 ,直接拔掉网线,或者暴力停止 MySQL 服务:
|
1 2 3 4 |
# 模拟彻底宕机 systemctl stop mysqld # 或者直接让端口消失 iptables -A INPUT -p tcp --dport 3306 -j DROP |
此时,转回 Python 控制器的日志终端,你会看到如下惊心动魄且井然有序的自愈过程:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
2026-06-24 20:00:00 [ERROR] [db_ha_controller.py:116]: 心跳异常 - 主库 192.168.1.101 检查失败 [1/3]. 原因: 网络无法连接或认证失败... 2026-06-24 20:00:05 [ERROR] [db_ha_controller.py:116]: 心跳异常 - 主库 192.168.1.101 检查失败 [2/3]. 原因: 网络无法连接或认证失败... 2026-06-24 20:00:10 [ERROR] [db_ha_controller.py:116]: 心跳异常 - 主库 192.168.1.101 检查失败 [3/3]. 原因: 网络无法连接或认证失败... 2026-06-24 20:00:10 [CRITICAL] [db_ha_controller.py:173]: 检测到主库持续异常!即将启动一键自动高可用切换流程... 2026-06-24 20:00:10 [INFO] [db_ha_controller.py:126]: 正在尝试获取分布式锁,识别码: ha_node_4215_1719273610 ... 2026-06-24 20:00:10 [INFO] [db_ha_controller.py:129]: 成功获取分布式锁,具备主备切换资格。 2026-06-24 20:00:10 [INFO] [db_ha_controller.py:179]: 进入二次确认阶段,再次复检旧主库状态... 2026-06-24 20:00:13 [INFO] [db_ha_controller.py:185]: 检查目标备选从库 192.168.1.102 是否可用... 2026-06-24 20:00:13 [INFO] [db_ha_controller.py:149]: 开始对从库 192.168.1.102 进行身份提拔... 2026-06-24 20:00:13 [INFO] [db_ha_controller.py:155]: 执行: STOP SLAVE / STOP REPLICA... 2026-06-24 20:00:13 [INFO] [db_ha_controller.py:161]: 执行: RESET SLAVE ALL / RESET REPLICA ALL... 2026-06-24 20:00:13 [INFO] [db_ha_controller.py:167]: 执行: SET GLOBAL read_only = OFF, super_read_only = OFF... 2026-06-24 20:00:14 [INFO] [db_ha_controller.py:170]: 从库 192.168.1.102 提升为主库成功。 2026-06-24 20:00:14 [INFO] [db_ha_controller.py:136]: 开始更新业务流量路由,新主库目标地址指向: 192.168.1.102 2026-06-24 20:00:14 [INFO] [db_ha_controller.py:139]: >>> [流量切换成功] 业务配置路由已秒级同步至最新主库! 2026-06-24 20:00:14 [CRITICAL] [db_ha_controller.py:199]: ====== [大功告成] 数据库群组已成功完成故障转移!新主库: 192.168.1.102 ====== 2026-06-24 20:00:14 [INFO] [db_ha_controller.py:143]: 分布式锁释放成功。 2026-06-24 20:00:14 [CRITICAL] [db_ha_controller.py:228]: 本节点已完成高可用历史使命,为确保安全,守护进程现在优雅退出。 |
此时,你去登录新主库 192.168.1.102 ,执行 SHOW VARIABLES LIKE 'read_only'; ,会发现其已经自动变为了 OFF 。此时业务应用程序也已经通过配置中心获知了新主库的地址,整个系统在 15 秒内完成了完美的故障自愈。
高可用不是某一个高深工具的代名词,而是一整套严密逻辑的组合拳。本文通过 200 余行纯正的 Python 代码,剥离了市面上复杂高可用组件的繁琐外壳,直击“探测 -> 抢锁 -> 提拔 -> 路由” 的本质。
通过将这段代码融入你的运维脚本库,不仅能够加深对分布式系统底层的理解,更能在面对突发故障时,让原本长达数十分钟的人工抢修,缩短为机器自我修复的短短 15 秒 。