import asyncio import os from dotenv import load_dotenv import asyncpg from typing import List, Dict, Any import json # 加载config.env文件 load_dotenv("config.env") # 数据库配置 DB_CONFIG = { "host": os.getenv("DB_HOST"), "port": os.getenv("DB_PORT"), "database": os.getenv("DB_NAME"), "user": os.getenv("DB_USER"), "password": os.getenv("DB_PASSWORD") } class Database: def __init__(self): self.pool = None self.config = DB_CONFIG async def connect(self): """创建数据库连接池""" if not self.pool: self.pool = await asyncpg.create_pool( host=self.config["host"], port=self.config["port"], user=self.config["user"], password=self.config["password"], database=self.config["database"], min_size=1, max_size=10 ) async def close(self): """关闭数据库连接池""" if self.pool: await self.pool.close() self.pool = None async def execute_query(self, sql: str) -> List[Dict[str, Any]]: """ 执行SQL查询并返回结果 """ if not self.pool: await self.connect() try: async with self.pool.acquire() as conn: # 执行查询 rows = await conn.fetch(sql) # 将结果转换为字典列表 result = [] for row in rows: # 处理每个字段的值 row_dict = {} for key, value in row.items(): # 处理特殊类型 if isinstance(value, (dict, list)): row_dict[key] = json.dumps(value, ensure_ascii=False) else: row_dict[key] = value result.append(row_dict) return result except Exception as e: print(f"Database error: {str(e)}") raise async def execute_transaction(self, sql_list: List[str]) -> bool: """ 执行事务 """ if not self.pool: await self.connect() try: async with self.pool.acquire() as conn: async with conn.transaction(): for sql in sql_list: await conn.execute(sql) return True except Exception as e: print(f"Transaction error: {str(e)}") return False async def test_connection(self) -> bool: """ 测试数据库连接 """ try: if not self.pool: await self.connect() async with self.pool.acquire() as conn: await conn.execute('SELECT 1') return True except Exception as e: print(f"Connection test failed: {str(e)}") return False if __name__ == "__main__": db = Database() asyncio.run(db.test_connection())