import asyncio import asyncpg from typing import List, Dict, Any import json # 数据库配置 db_list: dict[str, dict[Any, Any] | dict[str, str]] = { "mysql": { # MySQL配置留空,等待后续添加 }, "pg": { "host": "10.10.9.243", "port": "5432", "database": "sde", "user": "sde", "password": "sde" } } class Database: def __init__(self, db_type: str = "pg"): self.pool = None if db_type not in db_list: raise ValueError(f"Unsupported database type: {db_type}") self.config = db_list[db_type] async def connect(self): """创建数据库连接池""" if not self.pool: self.pool = await asyncpg.create_pool( host=self.config["host"], port=self.config["port"], user=self.config["user"], password=self.config["password"], database=self.config["database"], min_size=1, max_size=10 ) async def close(self): """关闭数据库连接池""" if self.pool: await self.pool.close() self.pool = None async def execute_query(self, sql: str) -> List[Dict[str, Any]]: """ 执行SQL查询并返回结果 """ if not self.pool: await self.connect() try: async with self.pool.acquire() as conn: # 执行查询 rows = await conn.fetch(sql) # 将结果转换为字典列表 result = [] for row in rows: # 处理每个字段的值 row_dict = {} for key, value in row.items(): # 处理特殊类型 if isinstance(value, (dict, list)): row_dict[key] = json.dumps(value, ensure_ascii=False) else: row_dict[key] = value result.append(row_dict) return result except Exception as e: print(f"Database error: {str(e)}") raise async def execute_transaction(self, sql_list: List[str]) -> bool: """ 执行事务 """ if not self.pool: await self.connect() try: async with self.pool.acquire() as conn: async with conn.transaction(): for sql in sql_list: await conn.execute(sql) return True except Exception as e: print(f"Transaction error: {str(e)}") return False async def test_connection(self) -> bool: """ 测试数据库连接 """ try: if not self.pool: await self.connect() async with self.pool.acquire() as conn: await conn.execute('SELECT 1') return True except Exception as e: print(f"Connection test failed: {str(e)}") return False if __name__ == "__main__": db = Database() asyncio.run(db.test_connection())