123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- import asyncio
- import asyncpg
- from typing import List, Dict, Any
- import json
- # 数据库配置
- db_list: dict[str, dict[Any, Any] | dict[str, str]] = {
- "mysql": {
- # MySQL配置留空,等待后续添加
- },
- "pg": {
- "host": "10.10.9.243",
- "port": "5432",
- "database": "sde",
- "user": "sde",
- "password": "sde"
- }
- }
- class Database:
- def __init__(self, db_type: str = "pg"):
- self.pool = None
- if db_type not in db_list:
- raise ValueError(f"Unsupported database type: {db_type}")
- self.config = db_list[db_type]
- async def connect(self):
- """创建数据库连接池"""
- if not self.pool:
- self.pool = await asyncpg.create_pool(
- host=self.config["host"],
- port=self.config["port"],
- user=self.config["user"],
- password=self.config["password"],
- database=self.config["database"],
- min_size=1,
- max_size=10
- )
- async def close(self):
- """关闭数据库连接池"""
- if self.pool:
- await self.pool.close()
- self.pool = None
- async def execute_query(self, sql: str) -> List[Dict[str, Any]]:
- """
- 执行SQL查询并返回结果
- """
- if not self.pool:
- await self.connect()
- try:
- async with self.pool.acquire() as conn:
- # 执行查询
- rows = await conn.fetch(sql)
- # 将结果转换为字典列表
- result = []
- for row in rows:
- # 处理每个字段的值
- row_dict = {}
- for key, value in row.items():
- # 处理特殊类型
- if isinstance(value, (dict, list)):
- row_dict[key] = json.dumps(value, ensure_ascii=False)
- else:
- row_dict[key] = value
- result.append(row_dict)
- return result
- except Exception as e:
- print(f"Database error: {str(e)}")
- raise
- async def execute_transaction(self, sql_list: List[str]) -> bool:
- """
- 执行事务
- """
- if not self.pool:
- await self.connect()
- try:
- async with self.pool.acquire() as conn:
- async with conn.transaction():
- for sql in sql_list:
- await conn.execute(sql)
- return True
- except Exception as e:
- print(f"Transaction error: {str(e)}")
- return False
- async def test_connection(self) -> bool:
- """
- 测试数据库连接
- """
- try:
- if not self.pool:
- await self.connect()
- async with self.pool.acquire() as conn:
- await conn.execute('SELECT 1')
- return True
- except Exception as e:
- print(f"Connection test failed: {str(e)}")
- return False
- if __name__ == "__main__":
- db = Database()
- asyncio.run(db.test_connection())
|