1. 使用mysql-connector-python(官方驱动)
安装
pip install mysql-connector-python
基本示例
import mysql.connector
from mysql.connector import Error
def call_stored_procedure():
"""调用存储过程的基本示例"""
try:
# 连接数据库
connection = mysql.connector.connect(
host='localhost',
database='your_database',
user='your_user',
password='your_password'
)
if connection.is_connected():
cursor = connection.cursor()
# 示例1:调用无参数的存储过程
cursor.callproc('get_all_users')
# 获取结果
for result in cursor.stored_results():
rows = result.fetchall()
for row in rows:
print(row)
# 示例2:调用带输入参数的存储过程
args = [1001] # 参数值
cursor.callproc('get_user_by_id', args)
# 示例3:调用带INOUT参数的存储过程
cursor.callproc('calculate_discount', [100, 0.1, 0]) # 第三个参数是INOUT
# 获取INOUT参数值
results = []
for result in cursor.stored_results():
results.append(result.fetchall())
# 获取OUT参数
cursor.execute("SELECT @_calculate_discount_2") # 格式:@_存储过程名_参数索引
out_value = cursor.fetchone()[0]
print(f"折扣后价格: {out_value}")
cursor.close()
connection.close()
except Error as e:
print(f"数据库错误: {e}")
# 调用示例
call_stored_procedure()
2. 使用PyMySQL
安装
pip install pymysql
示例代码
import pymysql
from pymysql import cursors
def call_procedure_with_pymysql():
"""使用PyMySQL调用存储过程"""
try:
connection = pymysql.connect(
host='localhost',
user='your_user',
password='your_password',
database='your_database',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor # 返回字典格式
)
with connection.cursor() as cursor:
# 调用存储过程
cursor.callproc('get_user_details', [1001])
# 获取所有结果集
results = cursor.fetchall()
for row in results:
print(row)
# 或者遍历多个结果集
while True:
result = cursor.fetchone()
if result is None:
break
print(result)
# 提交事务(如果需要)
connection.commit()
except pymysql.MySQLError as e:
print(f"MySQL错误: {e}")
finally:
if connection:
connection.close()
3. 完整示例:带各种参数的存储过程
MySQL存储过程示例
-- 1. 带输入参数的存储过程
DELIMITER $$
CREATE PROCEDURE get_employee_by_dept(IN dept_id INT)
BEGIN
SELECT * FROM employees WHERE department_id = dept_id;
END$$
DELIMITER ;
-- 2. 带OUT参数的存储过程
DELIMITER $$
CREATE PROCEDURE get_employee_count(
IN dept_id INT,
OUT emp_count INT
)
BEGIN
SELECT COUNT(*) INTO emp_count
FROM employees
WHERE department_id = dept_id;
END$$
DELIMITER ;
-- 3. 带INOUT参数的存储过程
DELIMITER $$
CREATE PROCEDURE calculate_bonus(
INOUT salary DECIMAL(10,2),
IN bonus_rate DECIMAL(5,2)
)
BEGIN
SET salary = salary * (1 + bonus_rate);
END$$
DELIMITER ;
Python调用代码
import mysql.connector
class MySQLProcedureCaller:
def __init__(self, config):
self.config = config
self.connection = None
def connect(self):
"""建立数据库连接"""
self.connection = mysql.connector.connect(**self.config)
return self.connection
def call_input_procedure(self, emp_id):
"""调用带输入参数的存储过程"""
cursor = self.connection.cursor(dictionary=True)
try:
cursor.callproc('get_employee_by_dept', [emp_id])
# 处理结果集
for result in cursor.stored_results():
employees = result.fetchall()
for emp in employees:
print(f"员工: {emp['name']}, 部门: {emp['department']}")
finally:
cursor.close()
def call_output_procedure(self, dept_id):
"""调用带OUT参数的存储过程"""
cursor = self.connection.cursor()
try:
# 调用存储过程
args = [dept_id, 0] # 0是OUT参数的占位符
cursor.callproc('get_employee_count', args)
# 获取OUT参数值
cursor.execute("SELECT @_get_employee_count_1")
emp_count = cursor.fetchone()[0]
print(f"部门 {dept_id} 的员工数量: {emp_count}")
finally:
cursor.close()
def call_inout_procedure(self):
"""调用带INOUT参数的存储过程"""
cursor = self.connection.cursor()
try:
# 初始工资
salary = 5000.00
# 调用存储过程
cursor.callproc('calculate_bonus', [salary, 0.15])
# 获取INOUT参数结果
cursor.execute("SELECT @_calculate_bonus_0")
new_salary = cursor.fetchone()[0]
print(f"原工资: {salary}, 加薪后: {new_salary}")
finally:
cursor.close()
def call_transaction_procedure(self):
"""在事务中调用存储过程"""
cursor = self.connection.cursor()
try:
# 开始事务
self.connection.start_transaction()
# 调用多个存储过程
cursor.callproc('update_salary', [1001, 5500])
cursor.callproc('update_bonus', [1001, 500])
# 提交事务
self.connection.commit()
print("事务提交成功")
except Exception as e:
# 回滚事务
self.connection.rollback()
print(f"事务回滚: {e}")
finally:
cursor.close()
def close(self):
"""关闭连接"""
if self.connection and self.connection.is_connected():
self.connection.close()
# 使用示例
if __name__ == "__main__":
db_config = {
'host': 'localhost',
'database': 'company_db',
'user': 'root',
'password': 'your_password',
'port': 3306
}
caller = MySQLProcedureCaller(db_config)
try:
caller.connect()
# 调用各种存储过程
caller.call_input_procedure(101)
caller.call_output_procedure(2)
caller.call_inout_procedure()
caller.call_transaction_procedure()
finally:
caller.close()
4. 使用上下文管理器(推荐)
from contextlib import contextmanager
import mysql.connector
@contextmanager
def get_db_connection(config):
"""数据库连接上下文管理器"""
connection = None
try:
connection = mysql.connector.connect(**config)
yield connection
except mysql.connector.Error as e:
print(f"数据库错误: {e}")
raise
finally:
if connection and connection.is_connected():
connection.close()
def call_procedure_safely():
"""安全调用存储过程"""
config = {
'host': 'localhost',
'database': 'test_db',
'user': 'user',
'password': 'password'
}
with get_db_connection(config) as conn:
cursor = conn.cursor()
try:
# 调用存储过程
cursor.callproc('your_procedure_name', [param1, param2])
# 处理结果
for result in cursor.stored_results():
data = result.fetchall()
# 处理数据...
finally:
cursor.close()
5. 使用SQLAlchemy(ORM方式)
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
def call_procedure_with_sqlalchemy():
"""使用SQLAlchemy调用存储过程"""
# 创建引擎
engine = create_engine(
'mysql+mysqlconnector://user:password@localhost/database'
)
Session = sessionmaker(bind=engine)
session = Session()
try:
# 调用存储过程
result = session.execute(
text("CALL get_employee_by_dept(:dept_id)"),
{'dept_id': 101}
)
# 获取结果
for row in result:
print(row)
session.commit()
except Exception as e:
session.rollback()
print(f"错误: {e}")
finally:
session.close()
注意事项
参数顺序:
cursor.callproc()的参数顺序必须与存储过程定义一致
结果集处理:存储过程可能返回多个结果集,使用
cursor.stored_results()遍历
OUT参数:通过
@_procedurename_N格式获取(N从0开始)
事务管理:确保在适当的时候提交或回滚事务
错误处理:始终使用try-except处理数据库操作
连接池:生产环境考虑使用连接池管理连接
选择哪种方法取决于你的项目需求:
- 简单项目:使用mysql-connector-python或PyMySQL
- 复杂ORM需求:使用SQLAlchemy
- 需要连接池:考虑使用DBUtils或SQLAlchemy的连接池功能