123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- import shutil
- import tempfile
- import time
- import sys
- from typing import List
- from zipfile import ZipFile
- import json
- import fiona
- from pydantic import BaseModel
- from shapely.geometry import shape
- from sse_starlette.sse import EventSourceResponse
- from fastapi import FastAPI, UploadFile, File, Form
- from fastapi.middleware.cors import CORSMiddleware
- import uvicorn
- import os
- from qwen_agent.gis.utils.base_class import Geometry
- from qwen_agent.gis.utils.geometry_parser import GeometryParser
- os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'
- parent_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")
- sys.path.append(parent_dir)
- from qwen_agent.planning.plan_executor import PlanExecutor
- from qwen_agent.planning.plan_continue_executor import PlanContinueExecutor
- from qwen_agent.llm.llm_client import LLMClient, LLMAsyncClient
- from agent_config import LLMDict_Qwen_72B_1211, LLMDict_GPT4_TURBO
- from agent_messages import BaseRequest
- from qwen_agent.tools.tools import async_xzdb, async_db
- from qwen_agent.tools.gis.spatial_analysis.geo_analysis import intersect_kfq, intersect_gyyd, xzfx, sqsx, ghfx
- prompt_lan = "CN"
- llm_name = "qwen-plus"
- llm_turbo_name = "gpt-4-turbo"
- max_ref_token = 4000
- # model_server = "http://10.10.0.10:7907/v1"
- # model_server = "http://lq.lianqiai.cn:7905/v1"
- # model_server = "http://172.20.28.16:20331/v1"
- model_server = "http://ac.zjugis.com:8511/v1"
- api_key = ""
- server_host = "0.0.0.0"
- server_port = 8511
- app = FastAPI()
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- rspHeaders = {
- "Cache-Control": "no-cache",
- "Connection": "keep-alive",
- "Content-Type": "text/event-stream",
- "Transfer-Encoding": "chunked",
- }
- if model_server.startswith("http"):
- source = "local"
- elif model_server.startswith("dashscope"):
- source = "dashscope"
- def rm_file(file_path):
- if os.path.exists(file_path):
- os.remove(file_path)
- @app.post("/")
- def index():
- return "Welcome to Lianqi AI"
- # 空间分析助手接口
- @app.post("/subscribe/spatial")
- async def upload_file(question: str = Form(...), history: list = Form(...), file: UploadFile = File(...)):
- print(f'history: {history}')
- # 确保上传的文件是ZIP类型
- if file.content_type != "application/zip":
- return {"detail": "请上传ZIP文件"}
- # 将文件写入临时文件
- temp_dir = tempfile.TemporaryDirectory(dir=os.path.dirname(__file__) + "/upload_temps")
- with open(f"{temp_dir.name}/{file.filename}", "wb") as buffer:
- shutil.copyfileobj(file.file, buffer)
- print(buffer)
- # 解压缩
- with ZipFile(f"{temp_dir.name}/{file.filename}", "r") as zip_ref:
- zip_ref.extractall(temp_dir.name)
- shp_file_path = ''
- for root, dirs, files in os.walk(temp_dir.name):
- for file in files:
- if file.endswith('.shp'):
- shp_file_path = os.path.join(root, file)
- geoms = GeometryParser.parse_geom_shp_file(shp_file_path)
- return EventSourceResponse(
- call_with_stream(f"原问题为:{question},上传的图形信息为: {Geometry.dumps_json(geoms)}"),
- media_type="text/event-stream",
- headers=rspHeaders,
- )
- # 连续对话
- @app.post("/subscribe/history", response_model=str)
- async def subscribe_with_history(request: BaseRequest):
- print(request)
- return EventSourceResponse(
- call_with_stream(request.data, request.history),
- media_type="text/event-stream",
- headers=rspHeaders,
- )
- @app.post("/subscribe/", response_model=str)
- async def subscribe(request: BaseRequest):
- print(request)
- return EventSourceResponse(
- call_with_stream(request.data, request.history),
- media_type="text/event-stream",
- headers=rspHeaders,
- )
- @app.get("/subscribe/{question}", response_model=str)
- async def subscribe(question: str):
- return EventSourceResponse(
- call_with_stream(question),
- media_type="text/event-stream",
- headers=rspHeaders,
- )
- @app.post("/subscribeByTurbo/", response_model=str)
- async def subscribeByTurbo(question: BaseRequest):
- return EventSourceResponse(
- call_with_stream(question.data, False, LLMDict_GPT4_TURBO),
- media_type="text/event-stream",
- headers=rspHeaders,
- )
- @app.get("/subscribeByTurbo/{question}", response_model=str)
- async def subscribeByTurbo(question: str):
- return EventSourceResponse(
- call_with_stream(question, False, LLMDict_GPT4_TURBO),
- media_type="text/event-stream",
- headers=rspHeaders,
- )
- @app.post("/clarification/", response_model=str)
- async def clarification(request: BaseRequest):
- print("clarification: ", request)
- return EventSourceResponse(
- call_with_stream(request.data, True),
- media_type="text/event-stream",
- headers=rspHeaders,
- )
- @app.post("/clarificationByTurbo/", response_model=str)
- async def clarificationByTurbo(request: BaseRequest):
- print("clarificationByTurbo: ", request)
- return EventSourceResponse(
- call_with_stream(request.data, True, LLMDict_GPT4_TURBO),
- media_type="text/event-stream",
- headers=rspHeaders,
- )
- @app.get("/kgQuery")
- async def kgQuery(id: str):
- sql = f'select id, xzqmc, xzqdm, dymc, yddm, ydxz, ydmj, rjlsx, rjlxx, jzmdsx, jzmdxx, jzgdsx, jzgdxx, ldlsx, ldlxx, pfwh, pfsj, st_area(shape::geography) as pfmarea,st_astext(shape) as geom, st_astext(st_centroid(shape)) as center_wkt from sde.kzxxxgh where id in ({id})'
- res_tuples = await async_db.run(sql)
- result, success = res_tuples
- return json.loads(result)
- @app.get("/klyzyQuery")
- async def klyzyQuery(id: str):
- sql = f'select *, st_astext(shape) as geom, st_astext(st_centroid(shape)) as center_wkt from sde.ecgap_klyzy where id in ({id})'
- res_tuples = await async_db.run(sql)
- result, success = res_tuples
- return json.loads(result)
- @app.get("/yjjbntQuery")
- async def yjjbntQuery(id: str):
- sql = f'select *,st_astext(shape) as geom from ddd.gcs330000g2001_yjjbnt_gx_xsb where objectid in ({id})'
- res_tuples = await async_xzdb.run(sql)
- result, success = res_tuples
- return json.loads(result)
- @app.get("/kfqintersect")
- async def kfqintersect(wkt: str):
- result = await intersect_kfq(
- wkt)
- return result
- @app.get("/gyydintersect")
- async def gyydintersect(wkt: str):
- result = await intersect_gyyd(wkt)
- return result
- llm_client = LLMClient(model=llm_name, model_server=model_server)
- llm_client_async = LLMAsyncClient(model=llm_name, model_server=model_server)
- async def call_with_stream(
- question,
- history=[],
- isClarification=False,
- llm_dict=LLMDict_Qwen_72B_1211,
- ):
- for i, msg in enumerate(history):
- if not isinstance(msg, dict):
- msg = dict(msg)
- if msg['type'].value == 1:
- msg['role'] = 'user'
- msg['content'] = msg['data']
- else:
- msg['role'] = 'assistant'
- if isinstance(msg['data'], str):
- msg['content'] = msg['data']
- else:
- msg['content'] = dict(msg['data'])['exec_res'][0]
- msg['history'] = True
- del msg['data']
- del msg['type']
- history[i] = msg
- if isClarification:
- executor = PlanContinueExecutor(
- llm_dict=llm_dict, llm=llm_client_async, stream=True
- )
- else:
- executor = PlanExecutor(llm_dict=llm_dict, llm=llm_client_async, stream=True)
- async for rsp in executor.run(question, history):
- if not rsp:
- continue
- else:
- time.sleep(0.1)
- yield rsp
- yield "[DONE]"
- yield "[FINISH]"
- if __name__ == "__main__":
- # uvicorn.run("run_server_async:app", host=server_host, port=server_port, workers=5)
- uvicorn.run(app, host=server_host, port=server_port, workers=1)
|