鹤壁市护送信息网

Python实现调用MySQL数据库的存储过程

2026-03-27 17:37:02 浏览次数:0
详细信息

1. 使用mysql-connector-python(官方驱动)

安装

pip install mysql-connector-python

基本示例

import mysql.connector
from mysql.connector import Error

def call_stored_procedure():
    """调用存储过程的基本示例"""
    try:
        # 连接数据库
        connection = mysql.connector.connect(
            host='localhost',
            database='your_database',
            user='your_user',
            password='your_password'
        )

        if connection.is_connected():
            cursor = connection.cursor()

            # 示例1:调用无参数的存储过程
            cursor.callproc('get_all_users')

            # 获取结果
            for result in cursor.stored_results():
                rows = result.fetchall()
                for row in rows:
                    print(row)

            # 示例2:调用带输入参数的存储过程
            args = [1001]  # 参数值
            cursor.callproc('get_user_by_id', args)

            # 示例3:调用带INOUT参数的存储过程
            cursor.callproc('calculate_discount', [100, 0.1, 0])  # 第三个参数是INOUT

            # 获取INOUT参数值
            results = []
            for result in cursor.stored_results():
                results.append(result.fetchall())

            # 获取OUT参数
            cursor.execute("SELECT @_calculate_discount_2")  # 格式:@_存储过程名_参数索引
            out_value = cursor.fetchone()[0]
            print(f"折扣后价格: {out_value}")

            cursor.close()
            connection.close()

    except Error as e:
        print(f"数据库错误: {e}")

# 调用示例
call_stored_procedure()

2. 使用PyMySQL

安装

pip install pymysql

示例代码

import pymysql
from pymysql import cursors

def call_procedure_with_pymysql():
    """使用PyMySQL调用存储过程"""
    try:
        connection = pymysql.connect(
            host='localhost',
            user='your_user',
            password='your_password',
            database='your_database',
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor  # 返回字典格式
        )

        with connection.cursor() as cursor:
            # 调用存储过程
            cursor.callproc('get_user_details', [1001])

            # 获取所有结果集
            results = cursor.fetchall()
            for row in results:
                print(row)

            # 或者遍历多个结果集
            while True:
                result = cursor.fetchone()
                if result is None:
                    break
                print(result)

        # 提交事务(如果需要)
        connection.commit()

    except pymysql.MySQLError as e:
        print(f"MySQL错误: {e}")
    finally:
        if connection:
            connection.close()

3. 完整示例:带各种参数的存储过程

MySQL存储过程示例

-- 1. 带输入参数的存储过程
DELIMITER $$
CREATE PROCEDURE get_employee_by_dept(IN dept_id INT)
BEGIN
    SELECT * FROM employees WHERE department_id = dept_id;
END$$
DELIMITER ;

-- 2. 带OUT参数的存储过程
DELIMITER $$
CREATE PROCEDURE get_employee_count(
    IN dept_id INT,
    OUT emp_count INT
)
BEGIN
    SELECT COUNT(*) INTO emp_count 
    FROM employees 
    WHERE department_id = dept_id;
END$$
DELIMITER ;

-- 3. 带INOUT参数的存储过程
DELIMITER $$
CREATE PROCEDURE calculate_bonus(
    INOUT salary DECIMAL(10,2),
    IN bonus_rate DECIMAL(5,2)
)
BEGIN
    SET salary = salary * (1 + bonus_rate);
END$$
DELIMITER ;

Python调用代码

import mysql.connector

class MySQLProcedureCaller:
    def __init__(self, config):
        self.config = config
        self.connection = None

    def connect(self):
        """建立数据库连接"""
        self.connection = mysql.connector.connect(**self.config)
        return self.connection

    def call_input_procedure(self, emp_id):
        """调用带输入参数的存储过程"""
        cursor = self.connection.cursor(dictionary=True)
        try:
            cursor.callproc('get_employee_by_dept', [emp_id])

            # 处理结果集
            for result in cursor.stored_results():
                employees = result.fetchall()
                for emp in employees:
                    print(f"员工: {emp['name']}, 部门: {emp['department']}")

        finally:
            cursor.close()

    def call_output_procedure(self, dept_id):
        """调用带OUT参数的存储过程"""
        cursor = self.connection.cursor()
        try:
            # 调用存储过程
            args = [dept_id, 0]  # 0是OUT参数的占位符
            cursor.callproc('get_employee_count', args)

            # 获取OUT参数值
            cursor.execute("SELECT @_get_employee_count_1")
            emp_count = cursor.fetchone()[0]
            print(f"部门 {dept_id} 的员工数量: {emp_count}")

        finally:
            cursor.close()

    def call_inout_procedure(self):
        """调用带INOUT参数的存储过程"""
        cursor = self.connection.cursor()
        try:
            # 初始工资
            salary = 5000.00

            # 调用存储过程
            cursor.callproc('calculate_bonus', [salary, 0.15])

            # 获取INOUT参数结果
            cursor.execute("SELECT @_calculate_bonus_0")
            new_salary = cursor.fetchone()[0]
            print(f"原工资: {salary}, 加薪后: {new_salary}")

        finally:
            cursor.close()

    def call_transaction_procedure(self):
        """在事务中调用存储过程"""
        cursor = self.connection.cursor()
        try:
            # 开始事务
            self.connection.start_transaction()

            # 调用多个存储过程
            cursor.callproc('update_salary', [1001, 5500])
            cursor.callproc('update_bonus', [1001, 500])

            # 提交事务
            self.connection.commit()
            print("事务提交成功")

        except Exception as e:
            # 回滚事务
            self.connection.rollback()
            print(f"事务回滚: {e}")
        finally:
            cursor.close()

    def close(self):
        """关闭连接"""
        if self.connection and self.connection.is_connected():
            self.connection.close()

# 使用示例
if __name__ == "__main__":
    db_config = {
        'host': 'localhost',
        'database': 'company_db',
        'user': 'root',
        'password': 'your_password',
        'port': 3306
    }

    caller = MySQLProcedureCaller(db_config)

    try:
        caller.connect()

        # 调用各种存储过程
        caller.call_input_procedure(101)
        caller.call_output_procedure(2)
        caller.call_inout_procedure()
        caller.call_transaction_procedure()

    finally:
        caller.close()

4. 使用上下文管理器(推荐)

from contextlib import contextmanager
import mysql.connector

@contextmanager
def get_db_connection(config):
    """数据库连接上下文管理器"""
    connection = None
    try:
        connection = mysql.connector.connect(**config)
        yield connection
    except mysql.connector.Error as e:
        print(f"数据库错误: {e}")
        raise
    finally:
        if connection and connection.is_connected():
            connection.close()

def call_procedure_safely():
    """安全调用存储过程"""
    config = {
        'host': 'localhost',
        'database': 'test_db',
        'user': 'user',
        'password': 'password'
    }

    with get_db_connection(config) as conn:
        cursor = conn.cursor()

        try:
            # 调用存储过程
            cursor.callproc('your_procedure_name', [param1, param2])

            # 处理结果
            for result in cursor.stored_results():
                data = result.fetchall()
                # 处理数据...

        finally:
            cursor.close()

5. 使用SQLAlchemy(ORM方式)

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker

def call_procedure_with_sqlalchemy():
    """使用SQLAlchemy调用存储过程"""

    # 创建引擎
    engine = create_engine(
        'mysql+mysqlconnector://user:password@localhost/database'
    )

    Session = sessionmaker(bind=engine)
    session = Session()

    try:
        # 调用存储过程
        result = session.execute(
            text("CALL get_employee_by_dept(:dept_id)"),
            {'dept_id': 101}
        )

        # 获取结果
        for row in result:
            print(row)

        session.commit()

    except Exception as e:
        session.rollback()
        print(f"错误: {e}")
    finally:
        session.close()

注意事项

参数顺序cursor.callproc()的参数顺序必须与存储过程定义一致 结果集处理:存储过程可能返回多个结果集,使用cursor.stored_results()遍历 OUT参数:通过@_procedurename_N格式获取(N从0开始) 事务管理:确保在适当的时候提交或回滚事务 错误处理:始终使用try-except处理数据库操作 连接池:生产环境考虑使用连接池管理连接

选择哪种方法取决于你的项目需求:

相关推荐