python flask-sqlalchemy 项目切换数据库临时查询
1. 基础配置说明
# 首先这个是默认配置
MYSQL_USERNAME = "test"
MYSQL_PASSWORD = "test"
MYSQL_HOST = "192.168.1.111"
MYSQL_PORT = 3306
MYSQL_DATABASE = "tet"
SQLALCHEMY_DATABASE_URI = f"mysql+pymysql://{MYSQL_USERNAME}:{urlquote(MYSQL_PASSWORD)}@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}?charset=utf8mb4"
# 下方是创建副数据库连接,如果主数据库要动态切换数据库也可以模仿下方做字典方式, 下方变量与上方有差异可以自己定义,下方只是例句
SQLALCHEMY_BINDS = {
'secondary_db': f"mysql+pymysql://{MYSQL_USERNAMES}:{urlquote(MYSQL_PASSWORDS)}@{MYSQL_HOSTS}:{MYSQL_PORTS}/{MYSQL_DATABASES}?charset=utf8mb4",
'sndv1': f"mysql+pymysql://{MYSQL_USERNAMES}:{urlquote(MYSQL_PASSWORDS)}@{MYSQL_HOSTS}:{MYSQL_PORTS}/test2?charset=utf8mb4"
}
2.sqlalchemy 默认正常初始化后再定义表类型需要注意使用__bind_key__ 绑定副数据库
# 为了完整展示,我就提供一下sqlalchemy 初始化,我是蓝图反向注册,如果不是使用这个方法请不要全抄袭,按照自己的模式进行修改
from flask_sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow
db = SQLAlchemy(query_class=Query)
ma = Marshmallow()
def init_databases(app: Flask):
db.init_app(app)
ma.init_app(app)
if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
with app.app_context():
try:
# 尝试连接所有配置的数据库
db.engine.connect()
app.logger.info("主数据库连接成功")
# 手动连接副数据库
secondary_engine = db.get_engine(app, bind='secondary_db')
secondary_engine.connect()
app.logger.info("副数据库连接成功")
except Exception as e:
exit(f"数据库连接失败: {e}")
# 下方是定义表,默认会使用secondary_db的数据库,就是上方定义的,假设你引用的这张表别的库也有,也可以在引入操作的时候手动切换
from applications.extensions import db
class UnitProfiler(db.Model):
__bind_key__ = 'secondary_db' # 绑定副数据库
__tablename__ = 'user' # 替换为你的表名
id = db.Column(db.BigInteger, primary_key=True, autoincrement=True, nullable=False) # bigint, 主键
timestamp_col = db.Column(db.TIMESTAMP, primary_key=True, nullable=False) # timestamp, 主键
user_id = db.Column(db.BigInteger, nullable=False, index=True) # bigint, 索引
# 引入表与db
from applications.models import user
from applications.extensions import db
@bp.get('/bpnftorders')
def bpnftorders():
start_date = request.args.get('start_date') # 这是接口获取,只是从程序拷贝出来的
end_date = request.args.get('end_date')
user = request.args.get('user')
order_id = request.args.get('order_id')
request_id = request.args.get('request_id')
time_type = request.args.get('time_type')
data_base = request.args.get('data_bases')
# 根据 data_base 值选择不同的数据库绑定
if data_base == "1":
bind_key = 'sndv1' # 使用 sndv1 数据库
else:
bind_key = 'secondary_db' # 使用默认 secondary_db 数据库
# 获取数据库连接
engine = db.get_engine(bind=bind_key)
Session = sessionmaker(bind=engine)
session = Session()
try:
query_bpn = session.query(user)
...... 这下方就不展示了,全是sqlalchemy的语法,只是展示了使用副数据库,与副数据库切换数据库的功能展示,这是根据后台前端页面传入的data_base 按钮值判断使用的数据库
顺便说一下sqlalchemy 也支持使用sql语句的方式查询,只是不符合DB的orm方式,下方就展示一下
# 直接引入sqlalchemy的db初始化 from applications.extensions import db def customquerys(): data = request.get_json() sql_query = data.get('query') limit = data.get('defaultDisplay') env_id = data.get('environment') if not sql_query: return jsonify({'error': '查询语句不能为空'}), 400 # 检查是否需要添加 LIMIT 子句 if limit == 1: if not re.search(r'\blimit\b', sql_query, re.IGNORECASE): sql_query = f"{sql_query.rstrip(';')} LIMIT 1000" # 根据传入的环境 ID 选择不同的数据库 if env_id == '0': bind_key = 'secondary_db' elif env_id == '1': bind_key = 'sndv1' else: return jsonify({'error': '无效的环境ID'}), 400 # 获取数据库连接 engine = db.get_engine(bind=bind_key) connection = engine.connect() try: # 执行用户提供的 SQL 查询 result = connection.execute(text(sql_query)) rows = result.fetchall() # 获取列名并转换为列表 column_names = list(result.keys()) # 使用列名和结果行创建字典列表 result_list = [dict(zip(column_names, row)) for row in rows] # 格式化 timestamp_col for item in result_list: timestamp_value = item.get('timestamp_col') # 检查类型是否是 datetime if isinstance(timestamp_value, (datetime.datetime, str)): item['timestamp_col'] = timestamp_value.strftime('%Y-%m-%d %H:%M:%S') # 返回列名和查询结果 return jsonify({ 'fields': column_names, # 返回字段名顺序 'data': result_list # 返回数据 }) except Exception as e: return jsonify({'error': str(e)}), 500 finally: connection.close()