Administrator
Published on 2024-10-21 / 47 Visits
0
0

python sqlalchemy 切换数据库与使用sql语句查询

python flask-sqlalchemy 项目切换数据库临时查询

1. 基础配置说明

 #  首先这个是默认配置 
    MYSQL_USERNAME = "test"
    MYSQL_PASSWORD = "test"
    MYSQL_HOST = "192.168.1.111"
    MYSQL_PORT = 3306
    MYSQL_DATABASE = "tet"
    SQLALCHEMY_DATABASE_URI = f"mysql+pymysql://{MYSQL_USERNAME}:{urlquote(MYSQL_PASSWORD)}@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}?charset=utf8mb4"

#  下方是创建副数据库连接,如果主数据库要动态切换数据库也可以模仿下方做字典方式, 下方变量与上方有差异可以自己定义,下方只是例句
    SQLALCHEMY_BINDS = {
        'secondary_db': f"mysql+pymysql://{MYSQL_USERNAMES}:{urlquote(MYSQL_PASSWORDS)}@{MYSQL_HOSTS}:{MYSQL_PORTS}/{MYSQL_DATABASES}?charset=utf8mb4",
        'sndv1': f"mysql+pymysql://{MYSQL_USERNAMES}:{urlquote(MYSQL_PASSWORDS)}@{MYSQL_HOSTS}:{MYSQL_PORTS}/test2?charset=utf8mb4"
    }

2.sqlalchemy 默认正常初始化后再定义表类型需要注意使用__bind_key__ 绑定副数据库

# 为了完整展示,我就提供一下sqlalchemy 初始化,我是蓝图反向注册,如果不是使用这个方法请不要全抄袭,按照自己的模式进行修改
from flask_sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow

db = SQLAlchemy(query_class=Query)
ma = Marshmallow()

def init_databases(app: Flask):
    db.init_app(app)
    ma.init_app(app)
    if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
        with app.app_context():
            try:
                # 尝试连接所有配置的数据库
                db.engine.connect()
                app.logger.info("主数据库连接成功")

                # 手动连接副数据库
                secondary_engine = db.get_engine(app, bind='secondary_db')
                secondary_engine.connect()
                app.logger.info("副数据库连接成功")
            except Exception as e:
                exit(f"数据库连接失败: {e}")

#  下方是定义表,默认会使用secondary_db的数据库,就是上方定义的,假设你引用的这张表别的库也有,也可以在引入操作的时候手动切换
from applications.extensions import db

class UnitProfiler(db.Model):
    __bind_key__ = 'secondary_db'  # 绑定副数据库
    __tablename__ = 'user'  # 替换为你的表名

    id = db.Column(db.BigInteger, primary_key=True, autoincrement=True, nullable=False)  # bigint, 主键
    timestamp_col = db.Column(db.TIMESTAMP, primary_key=True, nullable=False)  # timestamp, 主键
    user_id = db.Column(db.BigInteger, nullable=False, index=True)  # bigint, 索引

# 引入表与db
from applications.models import user
from applications.extensions import db
@bp.get('/bpnftorders')
def bpnftorders():
    start_date = request.args.get('start_date')  # 这是接口获取,只是从程序拷贝出来的
    end_date = request.args.get('end_date')
    user = request.args.get('user')
    order_id = request.args.get('order_id')
    request_id = request.args.get('request_id')
    time_type = request.args.get('time_type')
    data_base = request.args.get('data_bases')

    # 根据 data_base 值选择不同的数据库绑定
    if data_base == "1":
        bind_key = 'sndv1'  # 使用 sndv1 数据库
    else:
        bind_key = 'secondary_db'  # 使用默认 secondary_db 数据库

    # 获取数据库连接
    engine = db.get_engine(bind=bind_key)
    Session = sessionmaker(bind=engine)
    session = Session()

    try:
        query_bpn = session.query(user)
        ...... 这下方就不展示了,全是sqlalchemy的语法,只是展示了使用副数据库,与副数据库切换数据库的功能展示,这是根据后台前端页面传入的data_base 按钮值判断使用的数据库
  1. 顺便说一下sqlalchemy 也支持使用sql语句的方式查询,只是不符合DB的orm方式,下方就展示一下

    # 直接引入sqlalchemy的db初始化
    from applications.extensions import db
    
    def customquerys():
        data = request.get_json()
        sql_query = data.get('query')
        limit = data.get('defaultDisplay')
        env_id = data.get('environment')
    
        if not sql_query:
            return jsonify({'error': '查询语句不能为空'}), 400
    
        # 检查是否需要添加 LIMIT 子句
        if limit == 1:
            if not re.search(r'\blimit\b', sql_query, re.IGNORECASE):
                sql_query = f"{sql_query.rstrip(';')} LIMIT 1000"
    
        # 根据传入的环境 ID 选择不同的数据库
        if env_id == '0':
            bind_key = 'secondary_db'
        elif env_id == '1':
            bind_key = 'sndv1'
        else:
            return jsonify({'error': '无效的环境ID'}), 400
    
        # 获取数据库连接
        engine = db.get_engine(bind=bind_key)
        connection = engine.connect()
    
        try:
            # 执行用户提供的 SQL 查询
            result = connection.execute(text(sql_query))
            rows = result.fetchall()
    
            # 获取列名并转换为列表
            column_names = list(result.keys())
    
            # 使用列名和结果行创建字典列表
            result_list = [dict(zip(column_names, row)) for row in rows]
    
            # 格式化 timestamp_col
            for item in result_list:
                timestamp_value = item.get('timestamp_col')
    
                # 检查类型是否是 datetime
                if isinstance(timestamp_value, (datetime.datetime, str)):
                    item['timestamp_col'] = timestamp_value.strftime('%Y-%m-%d %H:%M:%S')
    
            # 返回列名和查询结果
            return jsonify({
                'fields': column_names,  # 返回字段名顺序
                'data': result_list  # 返回数据
            })
    
        except Exception as e:
            return jsonify({'error': str(e)}), 500
    
        finally:
            connection.close()


Comment