import shutil import tempfile import time import sys from zipfile import ZipFile from sse_starlette.sse import EventSourceResponse from fastapi import FastAPI, UploadFile, File, Form from fastapi.middleware.cors import CORSMiddleware import uvicorn import os from qwen_agent.gis.utils.base_class import Geometry from qwen_agent.gis.utils.geometry_parser import GeometryParser os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' parent_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..") sys.path.append(parent_dir) from qwen_agent.planning.plan_executor import PlanExecutor from qwen_agent.planning.plan_continue_executor import PlanContinueExecutor from qwen_agent.llm.llm_client import LLMClient, LLMAsyncClient from agent_config import LLMDict_Qwen_72B_1211, LLMDict_GPT4_TURBO from agent_messages import BaseRequest from xuanzhi_query import router as xz_router prompt_lan = "CN" llm_name = "qwen-plus" llm_turbo_name = "gpt-4-turbo" max_ref_token = 4000 # model_server = "http://10.10.0.10:7907/v1" # model_server = "http://lq.lianqiai.cn:7905/v1" # model_server = "http://172.20.28.16:20331/v1" model_server = "http://ac.zjugis.com:8511/v1" api_key = "" server_host = "0.0.0.0" server_port = 8511 app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.include_router(xz_router) rspHeaders = { "Cache-Control": "no-cache", "Connection": "keep-alive", "Content-Type": "text/event-stream", "Transfer-Encoding": "chunked", } if model_server.startswith("http"): source = "local" elif model_server.startswith("dashscope"): source = "dashscope" def rm_file(file_path): if os.path.exists(file_path): os.remove(file_path) @app.post("/") def index(): return "Welcome to Lianqi AI" # 空间分析助手接口 @app.post("/subscribe/spatial") async def upload_file(question: str = Form(...), history: list = Form(...), file: UploadFile = File(...)): print(f'history: {history}') # 确保上传的文件是ZIP类型 if file.content_type != "application/zip": return {"detail": "请上传ZIP文件"} # 将文件写入临时文件 temp_dir = tempfile.TemporaryDirectory(dir=os.path.dirname(__file__) + "/upload_temps") with open(f"{temp_dir.name}/{file.filename}", "wb") as buffer: shutil.copyfileobj(file.file, buffer) print(buffer) # 解压缩 with ZipFile(f"{temp_dir.name}/{file.filename}", "r") as zip_ref: zip_ref.extractall(temp_dir.name) shp_file_path = '' for root, dirs, files in os.walk(temp_dir.name): for file in files: if file.endswith('.shp'): shp_file_path = os.path.join(root, file) geoms = GeometryParser.parse_geom_shp_file(shp_file_path) return EventSourceResponse( call_with_stream(f"原问题为:{question},上传的图形信息为: {Geometry.dumps_json(geoms)}"), media_type="text/event-stream", headers=rspHeaders, ) # 连续对话 @app.post("/subscribe/history", response_model=str) async def subscribe_with_history(request: BaseRequest): print(request) return EventSourceResponse( call_with_stream(request.data, request.history), media_type="text/event-stream", headers=rspHeaders, ) @app.post("/subscribe/", response_model=str) async def subscribe(request: BaseRequest): print(request) return EventSourceResponse( call_with_stream(request.data, request.history), media_type="text/event-stream", headers=rspHeaders, ) @app.get("/subscribe/{question}", response_model=str) async def subscribe(question: str): return EventSourceResponse( call_with_stream(question), media_type="text/event-stream", headers=rspHeaders, ) @app.post("/subscribeByTurbo/", response_model=str) async def subscribeByTurbo(question: BaseRequest): return EventSourceResponse( call_with_stream(question.data, False, LLMDict_GPT4_TURBO), media_type="text/event-stream", headers=rspHeaders, ) @app.get("/subscribeByTurbo/{question}", response_model=str) async def subscribeByTurbo(question: str): return EventSourceResponse( call_with_stream(question, False, LLMDict_GPT4_TURBO), media_type="text/event-stream", headers=rspHeaders, ) @app.post("/clarification/", response_model=str) async def clarification(request: BaseRequest): print("clarification: ", request) return EventSourceResponse( call_with_stream(request.data, True), media_type="text/event-stream", headers=rspHeaders, ) @app.post("/clarificationByTurbo/", response_model=str) async def clarificationByTurbo(request: BaseRequest): print("clarificationByTurbo: ", request) return EventSourceResponse( call_with_stream(request.data, True, LLMDict_GPT4_TURBO), media_type="text/event-stream", headers=rspHeaders, ) llm_client = LLMClient(model=llm_name, model_server=model_server) llm_client_async = LLMAsyncClient(model=llm_name, model_server=model_server) async def call_with_stream( question, history=[], isClarification=False, llm_dict=LLMDict_Qwen_72B_1211, ): for i, msg in enumerate(history): if not isinstance(msg, dict): msg = dict(msg) if msg['type'].value == 1: msg['role'] = 'user' msg['content'] = msg['data'] else: msg['role'] = 'assistant' if isinstance(msg['data'], str): msg['content'] = msg['data'] else: msg['content'] = dict(msg['data'])['exec_res'][0] msg['history'] = True del msg['data'] del msg['type'] history[i] = msg if isClarification: executor = PlanContinueExecutor( llm_dict=llm_dict, llm=llm_client_async, stream=True ) else: executor = PlanExecutor(llm_dict=llm_dict, llm=llm_client_async, stream=True) async for rsp in executor.run(question, history): if not rsp: continue else: time.sleep(0.1) yield rsp yield "[DONE]" yield "[FINISH]" if __name__ == "__main__": # uvicorn.run("run_server_async:app", host=server_host, port=server_port, workers=5) uvicorn.run(app, host=server_host, port=server_port, workers=1)