事务(Transaction)是数据库操作的最小逻辑单元,遵循 ACID 原则:
Python操作MySQL时,默认是自动提交(autocommit) 模式(执行单条SQL会立即生效),而事务处理需要先关闭自动提交,手动控制提交/回滚。
以「转账场景」为例(经典的事务应用场景:A账户扣钱、B账户加钱,必须同时成功/失败):
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
import mysql.connector from mysql.connector import Error def transfer_money(from_id, to_id, amount): """ 模拟转账的事务处理 :param from_id: 转出账户ID :param to_id: 转入账户ID :param amount: 转账金额 """ connection = None try: # 1. 建立数据库连接 connection = mysql.connector.connect( host='localhost', user='root', password='123456', database='test_db' ) # 2. 关闭自动提交,开启手动事务控制 connection.autocommit = False cursor = connection.cursor() # 3. 执行事务内的多个SQL操作 # 步骤1:扣除转出账户金额 deduct_sql = "UPDATE account SET balance = balance - %s WHERE id = %s" cursor.execute(deduct_sql, (amount, from_id)) # 步骤2:增加转入账户金额 add_sql = "UPDATE account SET balance = balance + %s WHERE id = %s" cursor.execute(add_sql, (amount, to_id)) # 模拟异常(可取消注释测试回滚效果) # raise Error("模拟转账异常,触发回滚") # 4. 所有操作执行成功,提交事务 connection.commit() print("转账成功!事务已提交") except Error as e: # 5. 发生异常,回滚事务(撤销所有已执行的SQL操作) if connection: connection.rollback() print(f"转账失败,事务已回滚!错误信息:{e}") finally: # 6. 释放资源(关闭游标和连接) if connection and connection.is_connected(): cursor.close() # 恢复自动提交(可选,不影响,但规范) connection.autocommit = True connection.close() print("数据库连接已关闭") # ==================== 测试前准备 ==================== # 先在MySQL中创建测试表和数据: # CREATE DATABASE IF NOT EXISTS test_db; # USE test_db; # CREATE TABLE IF NOT EXISTS account ( # id INT PRIMARY KEY, # name VARCHAR(50), # balance DECIMAL(10,2) # ); # INSERT INTO account (id, name, balance) VALUES (1, '张三', 1000.00), (2, '李四', 500.00); # ==================== 执行转账测试 ==================== # 测试正常转账(张三给李四转200元) transfer_money(from_id=1, to_id=2, amount=200.00) |
如果需要批量插入/更新多条数据,事务同样适用,示例如下:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
def batch_insert_users(users): """批量插入用户,使用事务保证全部成功/失败""" connection = None try: connection = mysql.connector.connect( host='localhost', user='root', password='123456', database='test_db' ) connection.autocommit = False cursor = connection.cursor() insert_sql = "INSERT INTO user (name, age) VALUES (%s, %s)" # 批量执行SQL(效率更高) cursor.executemany(insert_sql, users) connection.commit() print(f"批量插入 {cursor.rowcount} 条数据成功") except Error as e: if connection: connection.rollback() print(f"批量插入失败,事务回滚:{e}") finally: if connection and connection.is_connected(): cursor.close() connection.autocommit = True connection.close() # 测试批量插入 user_list = [("王五", 30), ("赵六", 28), ("孙七", 35)] batch_insert_users(user_list) |