|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
模块名称: db_ha_controller.py
功能描述: 生产级 MySQL 数据库一键高可用与故障自动转移控制器
"""
import os
import sys
import time
import logging
import pymysql
import redis
from contextlib import contextmanager
# ==============================================================================
# 1. 日志与全局配置模块
# ==============================================================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] [%(filename)s:%(lineno)d]: %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("db_ha_controller.log", encoding="utf-8")
]
)
logger = logging.getLogger("DB-HA")
class HAConfig:
# 数据库群组配置
MASTER_HOST = os.getenv("HA_MASTER_HOST", "192.168.1.101")
SLAVE_HOST = os.getenv("HA_SLAVE_HOST", "192.168.1.102")
DB_PORT = int(os.getenv("HA_DB_PORT", 3306))
DB_USER = os.getenv("HA_DB_USER", "ha_admin")
DB_PASSWORD = os.getenv("HA_DB_PASSWORD", "SecureP@ss123")
# 探测参数
CHECK_TIMEOUT = 3 # 数据库连接超时(秒)
MAX_RETRIES = 3 # 判断为主库故障的连续失败次数
CHECK_INTERVAL = 5 # 健康检查周期(秒)
# 分布式锁配置(用于防脑裂)
REDIS_HOST = os.getenv("HA_REDIS_HOST", "192.168.1.200")
REDIS_PORT = int(os.getenv("HA_REDIS_PORT", 6379))
REDIS_PASSWORD = os.getenv("HA_REDIS_PASSWORD", None)
LOCK_KEY = "db_ha_failover_lock"
LOCK_TIMEOUT = 60 # 锁有效期(秒),防止抢锁后控制器崩溃导致死锁
# 伪服务发现/流量路由配置(模拟服务发现中心如 Consul/Etcd)
CONSUL_KV_ROUTE_KEY = "service/mysql/primary"
# ==============================================================================
# 2. 数据库与缓存连接上下文管理器
# ==============================================================================
@contextmanager
def get_db_connection(host, user, password, port, timeout=3):
"""安全地获取数据库连接的上下文管理器"""
conn = None
try:
conn = pymysql.connect(
host=host,
user=user,
password=password,
port=port,
connect_timeout=timeout,
autocommit=True
)
yield conn
finally:
if conn:
try:
conn.close()
except Exception as e:
logger.debug(f"关闭数据库连接时发生异常: {e}")
@contextmanager
def get_redis_client():
"""安全地获取 Redis 客户端的上下文管理器"""
client = None
try:
client = redis.Redis(
host=HAConfig.REDIS_HOST,
port=HAConfig.REDIS_PORT,
password=HAConfig.REDIS_PASSWORD,
socket_timeout=3,
decode_responses=True
)
yield client
finally:
if client:
del client
# ==============================================================================
# 3. 核心功能服务类
# ==============================================================================
class DatabaseHAService:
def __init__(self):
self.config = HAConfig()
# --------------------------------------------------------------------------
# 3.1 健康检查模块
# --------------------------------------------------------------------------
def check_database_health(self, host):
"""
全方位检查数据库健康状态。
返回元组: (is_healthy, reason, is_read_only)
"""
try:
with get_db_connection(
host=host,
user=self.config.DB_USER,
password=self.config.DB_PASSWORD,
port=self.config.DB_PORT,
timeout=self.config.CHECK_TIMEOUT
) as conn:
with conn.cursor() as cursor:
# 1. 基础连通性与权限执行测试
cursor.execute("SELECT 1;")
# 2. 检查数据库的 read_only 状态
cursor.execute("SHOW VARIABLES LIKE 'read_only';")
row = cursor.fetchone()
is_read_only = True if row and row[1].upper() == "ON" else False
# 3. 尝试创建并删除临时表,验证实际写入能力(如果是主库)
if not is_read_only:
try:
cursor.execute("CREATE DATABASE IF NOT EXISTS ha_heartbeat_db;")
cursor.execute("CREATE TABLE IF NOT EXISTS ha_heartbeat_db.t_check (id INT);")
cursor.execute("INSERT INTO ha_heartbeat_db.t_check VALUES (1);")
cursor.execute("DROP TABLE ha_heartbeat_db.t_check;")
except pymysql.MySQLError as we:
return False, f"数据库虽然在线,但是无法写入数据: {str(we)}", is_read_only
return True, "健康", is_read_only
except pymysql.OperationalError as oe:
return False, f"网络无法连接或认证失败: {str(oe)}", None
except Exception as e:
return False, f"未知错误: {str(e)}", None
# --------------------------------------------------------------------------
# 3.2 分布式锁控制模块 (防脑裂)
# --------------------------------------------------------------------------
def acquire_failover_lock(self, redis_cli, identifier):
"""尝试获取分布式锁,防止多个控制器并发执行 Failover"""
logger.info(f"正在尝试获取分布式锁,识别码: {identifier} ...")
# 使用 SET NX EX 实现原子性加锁
if redis_cli.set(self.config.LOCK_KEY, identifier, ex=self.config.LOCK_TIMEOUT, nx=True):
logger.info("成功获取分布式锁,具备主备切换资格。")
return True
return False
def release_failover_lock(self, redis_cli, identifier):
"""使用 Lua 脚本安全释放锁,确保不会误删其他控制器的锁"""
lua_release_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
try:
result = redis_cli.eval(lua_release_script, 1, self.config.LOCK_KEY, identifier)
if result == 1:
logger.info("分布式锁释放成功。")
return True
else:
logger.warning("分布式锁未能释放,锁可能已过期或被挪用。")
return False
except Exception as e:
logger.error(f"释放分布式锁时发生致命异常: {e}")
return False
# --------------------------------------------------------------------------
# 3.3 流量切换与配置中心路由模块
# --------------------------------------------------------------------------
def update_traffic_routing(self, redis_cli, new_master_host):
"""
更新配置中心的服务发现指针。
在真实生产中,这里可以替换为调用 Consul API、Etcd API 或切换 AWS/阿里云的 VIP。
"""
logger.info(f"开始更新业务流量路由,新主库目标地址指向: {new_master_host}")
try:
# 此处以 Redis 模拟一个集中的全局配置中心
redis_cli.set(self.config.CONSUL_KV_ROUTE_KEY, f"{new_master_host}:{self.config.DB_PORT}")
logger.info(">>> [流量切换成功] 业务配置路由已秒级同步至最新主库!")
return True
except Exception as e:
logger.critical(f"流量路由更新失败,系统可能陷入瘫痪状态,请立刻人工介入!错误: {e}")
return False
# --------------------------------------------------------------------------
# 3.4 拓扑提升与一键 Failover 模块
# --------------------------------------------------------------------------
def promote_slave_to_master(self, slave_host):
"""
执行命令,解雇从库的复制身份,将其提拔为独立可读写的主库。
"""
logger.info(f"开始对从库 {slave_host} 进行身份提拔...")
try:
with get_db_connection(
host=slave_host,
user=self.config.DB_USER,
password=self.config.DB_PASSWORD,
port=self.config.DB_PORT,
timeout=self.config.CHECK_TIMEOUT
) as conn:
with conn.cursor() as cursor:
# 1. 停止从库复制链路
logger.info("执行: STOP SLAVE / STOP REPLICA...")
try:
cursor.execute("STOP SLAVE;")
except pymysql.InternalError:
# 兼容 MySQL 8.0+ 语法
cursor.execute("STOP REPLICA;")
# 2. 清除复制元数据,防止重启后再次寻找旧主库
logger.info("执行: RESET SLAVE ALL / RESET REPLICA ALL...")
try:
cursor.execute("RESET SLAVE ALL;")
except pymysql.InternalError:
cursor.execute("RESET REPLICA ALL;")
# 3. 关闭全局只读模式,赋予其完整的写权限
logger.info("执行: SET GLOBAL read_only = OFF, super_read_only = OFF...")
cursor.execute("SET GLOBAL read_only = OFF;")
cursor.execute("SET GLOBAL super_read_only = OFF;")
logger.info(f"从库 {slave_host} 提升为主库成功。")
return True
except Exception as e:
logger.critical(f"提升从库 {slave_host} 失败!错误详情: {e}")
return False
# --------------------------------------------------------------------------
# 3.5 自动化 Failover 总指挥流程
# --------------------------------------------------------------------------
def execute_one_click_failover(self, redis_cli, controller_id):
"""一键高可用切换核心逻辑控制引擎"""
logger.critical("检测到主库持续异常!即将启动一键自动高可用切换流程...")
# 步骤 1:抢占分布式锁,防脑裂
if not self.acquire_failover_lock(redis_cli, controller_id):
logger.warning("未抢到分布式锁,当前切换动作由其他集群控制器执行中。退出本次切换。")
return False
try:
# 步骤 2:双向校验。在抢到锁后,再次检查主库,防止网络由于瞬时抖动引发误报
logger.info("进入二次确认阶段,再次复检旧主库状态...")
re_check, reason, _ = self.check_database_health(self.config.MASTER_HOST)
if re_check:
logger.warning(f"虚惊一场!旧主库在二次确认中恢复健康({reason}),终止切换流程。")
return False
# 步骤 3:验证备选从库是否健康,是否有资格接管大局
logger.info(f"检查目标备选从库 {self.config.SLAVE_HOST} 是否可用...")
slave_ok, slave_reason, is_ro = self.check_database_health(self.config.SLAVE_HOST)
if not slave_ok:
logger.critical(f"灾难级故障:主库挂了,但目标从库也处于不可用状态!原因: {slave_reason}。无法执行Failover!")
return False
# 步骤 4:执行拓扑提拔,切断主从并摘除只读
if not self.promote_slave_to_master(self.config.SLAVE_HOST):
logger.critical("从库提拔逻辑执行失败,放弃流量路由修改!")
return False
# 步骤 5:业务路由无缝更新
if not self.update_traffic_routing(redis_cli, self.config.SLAVE_HOST):
logger.critical("虽然从库已升为主库,但路由配置中心更新失败!系统面临读写黑洞风险!")
return False
logger.critical(f"====== [大功告成] 数据库群组已成功完成故障转移!新主库: {self.config.SLAVE_HOST} ======")
return True
except Exception as master_error:
logger.error(f"高可用切换引擎执行期间遭遇未预料的系统级异常: {master_error}")
return False
finally:
# 无论切换成功还是抛出异常,必须妥善清理并释放分布式锁
self.release_failover_lock(redis_cli, controller_id)
# --------------------------------------------------------------------------
# 3.6 守护进程控制循环
# --------------------------------------------------------------------------
def start_guardian_loop(self):
"""启动持久化高可用守护轮询服务"""
controller_id = f"ha_node_{os.getpid()}_{int(time.time())}"
logger.info(f"MySQL 高可用自动化守护进程已成功启动。当前节点ID: {controller_id}")
logger.info(f"监控目标 -> 主库: {self.config.MASTER_HOST}, 从库:{self.config.SLAVE_HOST}")
consecutive_failures = 0
while True:
try:
# 检查旧主库状态
is_ok, reason, is_ro = self.check_database_health(self.config.MASTER_HOST)
if is_ok:
if is_ro:
logger.warning(f"警告: 主库 {self.config.MASTER_HOST} 在线,但被错误地设置为了 READ-ONLY 状态!")
consecutive_failures = 0
logger.info(f"心跳正常 - 主库 {self.config.MASTER_HOST} 运转良好。")
else:
consecutive_failures += 1
logger.error(f"心跳异常 - 主库 {self.config.MASTER_HOST} 检查失败 [{consecutive_failures}/{self.config.MAX_RETRIES}]. 原因: {reason}")
# 触发判定:当连续失败达到设定阈值时,正式宣告主库死亡
if consecutive_failures >= self.config.MAX_RETRIES:
with get_redis_client() as redis_cli:
self.execute_one_click_failover(redis_cli, controller_id)
# 成功执行过切换逻辑后,为避免旧主库恢复带来的反复抖动或脑裂,守护进程强制安全退出
logger.critical("本节点已完成高可用历史使命,为确保安全,守护进程现在优雅退出。")
break
except redis.exceptions.RedisError as re:
logger.error(f"分布式协调中心 Redis 连接异常,高可用降级为纯监控模式: {re}")
except Exception as e:
logger.critical(f"监控主循环中捕获到未知异常: {e}")
time.sleep(self.config.CHECK_INTERVAL)
# ==============================================================================
# 4. 自动化脚本入口
# ==============================================================================
if name == "main":
# 初始化 HA 控制引擎并开始执勤
ha_engine = DatabaseHAService()
try:
ha_engine.start_guardian_loop()
except KeyboardInterrupt:
logger.info("收到终止信号,MySQL 高可用守护进程安全退出。")
|