|
import mysql.connector
from mysql.connector import Error
def transfer_money(from_id, to_id, amount):
"""
模拟转账的事务处理
:param from_id: 转出账户ID
:param to_id: 转入账户ID
:param amount: 转账金额
"""
connection = None
try:
# 1. 建立数据库连接
connection = mysql.connector.connect(
host='localhost',
user='root',
password='123456',
database='test_db'
)
# 2. 关闭自动提交,开启手动事务控制
connection.autocommit = False
cursor = connection.cursor()
# 3. 执行事务内的多个SQL操作
# 步骤1:扣除转出账户金额
deduct_sql = "UPDATE account SET balance = balance - %s WHERE id = %s"
cursor.execute(deduct_sql, (amount, from_id))
# 步骤2:增加转入账户金额
add_sql = "UPDATE account SET balance = balance + %s WHERE id = %s"
cursor.execute(add_sql, (amount, to_id))
# 模拟异常(可取消注释测试回滚效果)
# raise Error("模拟转账异常,触发回滚")
# 4. 所有操作执行成功,提交事务
connection.commit()
print("转账成功!事务已提交")
except Error as e:
# 5. 发生异常,回滚事务(撤销所有已执行的SQL操作)
if connection:
connection.rollback()
print(f"转账失败,事务已回滚!错误信息:{e}")
finally:
# 6. 释放资源(关闭游标和连接)
if connection and connection.is_connected():
cursor.close()
# 恢复自动提交(可选,不影响,但规范)
connection.autocommit = True
connection.close()
print("数据库连接已关闭")
# ==================== 测试前准备 ====================
# 先在MySQL中创建测试表和数据:
# CREATE DATABASE IF NOT EXISTS test_db;
# USE test_db;
# CREATE TABLE IF NOT EXISTS account (
# id INT PRIMARY KEY,
# name VARCHAR(50),
# balance DECIMAL(10,2)
# );
# INSERT INTO account (id, name, balance) VALUES (1, '张三', 1000.00), (2, '李四', 500.00);
# ==================== 执行转账测试 ====================
# 测试正常转账(张三给李四转200元)
transfer_money(from_id=1, to_id=2, amount=200.00)
|