import shutil import tempfile import time import sys from typing import List from zipfile import ZipFile import json import fiona from pydantic import BaseModel from shapely.geometry import shape from sse_starlette.sse import EventSourceResponse from fastapi import FastAPI, UploadFile, File, Form from fastapi.middleware.cors import CORSMiddleware import uvicorn import os from qwen_agent.gis.utils.base_class import Geometry from qwen_agent.gis.utils.geometry_parser import GeometryParser os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' parent_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..") sys.path.append(parent_dir) from qwen_agent.planning.plan_executor import PlanExecutor from qwen_agent.planning.plan_continue_executor import PlanContinueExecutor from qwen_agent.llm.llm_client import LLMClient, LLMAsyncClient from agent_config import LLMDict_Qwen_72B_1211, LLMDict_GPT4_TURBO from agent_messages import BaseRequest from qwen_agent.tools.tools import async_xzdb, async_db from qwen_agent.tools.gis.spatial_analysis.geo_analysis import intersect_kfq, intersect_gyyd, xzfx, sqsx, ghfx prompt_lan = "CN" llm_name = "qwen-plus" llm_turbo_name = "gpt-4-turbo" max_ref_token = 4000 # model_server = "http://10.10.0.10:7907/v1" # model_server = "http://lq.lianqiai.cn:7905/v1" # model_server = "http://172.20.28.16:20331/v1" model_server = "http://ac.zjugis.com:8511/v1" api_key = "" server_host = "0.0.0.0" server_port = 8511 app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) rspHeaders = { "Cache-Control": "no-cache", "Connection": "keep-alive", "Content-Type": "text/event-stream", "Transfer-Encoding": "chunked", } if model_server.startswith("http"): source = "local" elif model_server.startswith("dashscope"): source = "dashscope" def rm_file(file_path): if os.path.exists(file_path): os.remove(file_path) @app.post("/") def index(): return "Welcome to Lianqi AI" # 空间分析助手接口 @app.post("/subscribe/spatial") async def upload_file(question: str = Form(...), history: list = Form(...), file: UploadFile = File(...)): print(f'history: {history}') # 确保上传的文件是ZIP类型 if file.content_type != "application/zip": return {"detail": "请上传ZIP文件"} # 将文件写入临时文件 temp_dir = tempfile.TemporaryDirectory(dir=os.path.dirname(__file__) + "/upload_temps") with open(f"{temp_dir.name}/{file.filename}", "wb") as buffer: shutil.copyfileobj(file.file, buffer) print(buffer) # 解压缩 with ZipFile(f"{temp_dir.name}/{file.filename}", "r") as zip_ref: zip_ref.extractall(temp_dir.name) shp_file_path = '' for root, dirs, files in os.walk(temp_dir.name): for file in files: if file.endswith('.shp'): shp_file_path = os.path.join(root, file) geoms = GeometryParser.parse_geom_shp_file(shp_file_path) return EventSourceResponse( call_with_stream(f"原问题为:{question},上传的图形信息为: {Geometry.dumps_json(geoms)}"), media_type="text/event-stream", headers=rspHeaders, ) # 连续对话 @app.post("/subscribe/history", response_model=str) async def subscribe_with_history(request: BaseRequest): print(request) return EventSourceResponse( call_with_stream(request.data, request.history), media_type="text/event-stream", headers=rspHeaders, ) @app.post("/subscribe/", response_model=str) async def subscribe(request: BaseRequest): print(request) return EventSourceResponse( call_with_stream(request.data, request.history), media_type="text/event-stream", headers=rspHeaders, ) @app.get("/subscribe/{question}", response_model=str) async def subscribe(question: str): return EventSourceResponse( call_with_stream(question), media_type="text/event-stream", headers=rspHeaders, ) @app.post("/subscribeByTurbo/", response_model=str) async def subscribeByTurbo(question: BaseRequest): return EventSourceResponse( call_with_stream(question.data, False, LLMDict_GPT4_TURBO), media_type="text/event-stream", headers=rspHeaders, ) @app.get("/subscribeByTurbo/{question}", response_model=str) async def subscribeByTurbo(question: str): return EventSourceResponse( call_with_stream(question, False, LLMDict_GPT4_TURBO), media_type="text/event-stream", headers=rspHeaders, ) @app.post("/clarification/", response_model=str) async def clarification(request: BaseRequest): print("clarification: ", request) return EventSourceResponse( call_with_stream(request.data, True), media_type="text/event-stream", headers=rspHeaders, ) @app.post("/clarificationByTurbo/", response_model=str) async def clarificationByTurbo(request: BaseRequest): print("clarificationByTurbo: ", request) return EventSourceResponse( call_with_stream(request.data, True, LLMDict_GPT4_TURBO), media_type="text/event-stream", headers=rspHeaders, ) @app.get("/kgQuery") async def kgQuery(id: str): sql = f'select id, xzqmc, xzqdm, dymc, yddm, ydxz, ydmj, rjlsx, rjlxx, jzmdsx, jzmdxx, jzgdsx, jzgdxx, ldlsx, ldlxx, pfwh, pfsj, st_area(shape::geography) as pfmarea,st_astext(shape) as geom, st_astext(st_centroid(shape)) as center_wkt from sde.kzxxxgh where id in ({id})' res_tuples = await async_db.run(sql) result, success = res_tuples return json.loads(result) @app.get("/klyzyQuery") async def klyzyQuery(id: str): sql = f'select *, st_astext(shape) as geom, st_astext(st_centroid(shape)) as center_wkt from sde.ecgap_klyzy where id in ({id})' res_tuples = await async_db.run(sql) result, success = res_tuples return json.loads(result) @app.get("/yjjbntQuery") async def yjjbntQuery(id: str): sql = f'select *,st_astext(shape) as geom from ddd.gcs330000g2001_yjjbnt_gx_xsb where objectid in ({id})' res_tuples = await async_xzdb.run(sql) result, success = res_tuples return json.loads(result) @app.get("/kfqintersect") async def kfqintersect(wkt: str): result = await intersect_kfq( wkt) return result @app.get("/gyydintersect") async def gyydintersect(wkt: str): result = await intersect_gyyd(wkt) return result llm_client = LLMClient(model=llm_name, model_server=model_server) llm_client_async = LLMAsyncClient(model=llm_name, model_server=model_server) async def call_with_stream( question, history=[], isClarification=False, llm_dict=LLMDict_Qwen_72B_1211, ): for i, msg in enumerate(history): if not isinstance(msg, dict): msg = dict(msg) if msg['type'].value == 1: msg['role'] = 'user' msg['content'] = msg['data'] else: msg['role'] = 'assistant' if isinstance(msg['data'], str): msg['content'] = msg['data'] else: msg['content'] = dict(msg['data'])['exec_res'][0] msg['history'] = True del msg['data'] del msg['type'] history[i] = msg if isClarification: executor = PlanContinueExecutor( llm_dict=llm_dict, llm=llm_client_async, stream=True ) else: executor = PlanExecutor(llm_dict=llm_dict, llm=llm_client_async, stream=True) async for rsp in executor.run(question, history): if not rsp: continue else: time.sleep(0.1) yield rsp yield "[DONE]" yield "[FINISH]" if __name__ == "__main__": # uvicorn.run("run_server_async:app", host=server_host, port=server_port, workers=5) uvicorn.run(app, host=server_host, port=server_port, workers=1)