86 lines
2.8 KiB
Python
86 lines
2.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
from loguru import logger
|
|
from typing import List, Dict, Any, Optional, Tuple, Union
|
|
|
|
from db.connection import DBConnectionManager
|
|
|
|
|
|
class BaseDBOperator:
|
|
"""基础数据库操作类"""
|
|
|
|
def __init__(self, db_manager: DBConnectionManager):
|
|
self.db_manager = db_manager
|
|
self.LOG = logger
|
|
|
|
def execute_query(self, sql: str, params: Optional[tuple] = None, fetch_one: bool = False) -> Union[
|
|
List[Dict], Dict, None]:
|
|
"""执行查询SQL"""
|
|
conn = self.db_manager.get_mysql_connection()
|
|
try:
|
|
with conn.cursor(dictionary=True) as cursor:
|
|
cursor.execute(sql, params or ())
|
|
if fetch_one:
|
|
return cursor.fetchone()
|
|
return cursor.fetchall()
|
|
except Exception as e:
|
|
self.LOG.error(
|
|
f"执行更新SQL出错: {e}, SQL: {sql}, 参数: {str(params)[:200] + '...' if len(str(params)) > 200 else params}"
|
|
)
|
|
return None
|
|
finally:
|
|
conn.close()
|
|
|
|
def execute_update(self, sql: str, params: Optional[tuple] = None) -> bool:
|
|
"""执行更新SQL"""
|
|
conn = self.db_manager.get_mysql_connection()
|
|
try:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(sql, params or ())
|
|
conn.commit()
|
|
return True
|
|
except Exception as e:
|
|
self.LOG.error(
|
|
f"执行更新SQL出错: {e}, SQL: {sql}, 参数: {str(params)[:200] + '...' if len(str(params)) > 200 else params}"
|
|
)
|
|
conn.rollback()
|
|
return False
|
|
finally:
|
|
conn.close()
|
|
|
|
def execute_batch(self, sql: str, params_list: List[tuple]) -> bool:
|
|
"""批量执行SQL"""
|
|
if not params_list:
|
|
return True
|
|
|
|
conn = self.db_manager.get_mysql_connection()
|
|
try:
|
|
with conn.cursor() as cursor:
|
|
cursor.executemany(sql, params_list)
|
|
conn.commit()
|
|
return True
|
|
except Exception as e:
|
|
self.LOG.error(f"批量执行SQL出错: {e}, SQL: {sql}, 参数数量: {len(params_list)}")
|
|
conn.rollback()
|
|
return False
|
|
finally:
|
|
conn.close()
|
|
|
|
def execute_transaction(self, operations: List[Tuple[str, tuple]]) -> bool:
|
|
"""执行事务"""
|
|
if not operations:
|
|
return True
|
|
|
|
conn = self.db_manager.get_mysql_connection()
|
|
try:
|
|
with conn.cursor() as cursor:
|
|
for sql, params in operations:
|
|
cursor.execute(sql, params)
|
|
conn.commit()
|
|
return True
|
|
except Exception as e:
|
|
self.LOG.error(f"执行事务出错: {e}, 操作数量: {len(operations)}")
|
|
conn.rollback()
|
|
return False
|
|
finally:
|
|
conn.close()
|