run_server_async.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. import shutil
  2. import tempfile
  3. import time
  4. import sys
  5. from zipfile import ZipFile
  6. import fiona
  7. from shapely.geometry import shape
  8. from sse_starlette.sse import EventSourceResponse
  9. from fastapi import FastAPI, UploadFile, File, Form
  10. from fastapi.middleware.cors import CORSMiddleware
  11. import uvicorn
  12. import os
  13. os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'
  14. parent_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")
  15. sys.path.append(parent_dir)
  16. from qwen_agent.planning.plan_executor import PlanExecutor
  17. from qwen_agent.planning.plan_continue_executor import PlanContinueExecutor
  18. from qwen_agent.llm.llm_client import LLMClient, LLMAsyncClient
  19. from agent_config import LLMDict_Qwen_72B_1211, LLMDict_GPT4_TURBO
  20. from agent_messages import BaseRequest
  21. prompt_lan = "CN"
  22. llm_name = "qwen-plus"
  23. llm_turbo_name = "gpt-4-turbo"
  24. max_ref_token = 4000
  25. # model_server = "http://10.10.0.10:7907/v1"
  26. model_server = "http://lq.lianqiai.cn:7905/v1"
  27. api_key = ""
  28. server_host = "0.0.0.0"
  29. server_port = 8511
  30. app = FastAPI()
  31. app.add_middleware(
  32. CORSMiddleware,
  33. allow_origins=["*"],
  34. allow_credentials=True,
  35. allow_methods=["*"],
  36. allow_headers=["*"],
  37. )
  38. rspHeaders = {
  39. "Cache-Control": "no-cache",
  40. "Connection": "keep-alive",
  41. "Content-Type": "text/event-stream",
  42. "Transfer-Encoding": "chunked",
  43. }
  44. if model_server.startswith("http"):
  45. source = "local"
  46. elif model_server.startswith("dashscope"):
  47. source = "dashscope"
  48. def rm_file(file_path):
  49. if os.path.exists(file_path):
  50. os.remove(file_path)
  51. @app.post("/")
  52. def index():
  53. return "Welcome to Lianqi AI"
  54. @app.post("/subscribe/spatial")
  55. async def upload_file(question: str = Form(...), file: UploadFile = File(...)):
  56. # 确保上传的文件是ZIP类型
  57. if file.content_type != "application/zip":
  58. return {"detail": "请上传ZIP文件"}
  59. # 将文件写入临时文件
  60. temp_dir = tempfile.TemporaryDirectory(dir=os.path.dirname(__file__) + "/upload_temps")
  61. with open(f"{temp_dir.name}/{file.filename}", "wb") as buffer:
  62. shutil.copyfileobj(file.file, buffer)
  63. print(buffer)
  64. # 解压缩
  65. with ZipFile(f"{temp_dir.name}/{file.filename}", "r") as zip_ref:
  66. zip_ref.extractall(temp_dir.name)
  67. for root, dirs, files in os.walk(temp_dir.name):
  68. for file in files:
  69. if file.endswith('.shp'):
  70. shp_file_path = os.path.join(root, file)
  71. with fiona.Collection(shp_file_path) as shp:
  72. # 文件上传逻辑将在此处编写
  73. for feature in shp:
  74. geom = shape(feature['geometry'])
  75. return EventSourceResponse(
  76. call_with_stream(f"原问题为:{question},上传的图形wkt为: {geom.wkt}"),
  77. media_type="text/event-stream",
  78. headers=rspHeaders,
  79. )
  80. @app.post("/subscribe/", response_model=str)
  81. async def subscribe(request: BaseRequest):
  82. print(request)
  83. return EventSourceResponse(
  84. call_with_stream(request.data, []),
  85. media_type="text/event-stream",
  86. headers=rspHeaders,
  87. )
  88. @app.get("/subscribe/{question}", response_model=str)
  89. async def subscribe(question: str):
  90. return EventSourceResponse(
  91. call_with_stream(question),
  92. media_type="text/event-stream",
  93. headers=rspHeaders,
  94. )
  95. @app.post("/subscribeByTurbo/", response_model=str)
  96. async def subscribeByTurbo(question: BaseRequest):
  97. return EventSourceResponse(
  98. call_with_stream(question.data, False, LLMDict_GPT4_TURBO),
  99. media_type="text/event-stream",
  100. headers=rspHeaders,
  101. )
  102. @app.get("/subscribeByTurbo/{question}", response_model=str)
  103. async def subscribeByTurbo(question: str):
  104. return EventSourceResponse(
  105. call_with_stream(question, False, LLMDict_GPT4_TURBO),
  106. media_type="text/event-stream",
  107. headers=rspHeaders,
  108. )
  109. @app.post("/clarification/", response_model=str)
  110. async def clarification(request: BaseRequest):
  111. print("clarification: ", request)
  112. return EventSourceResponse(
  113. call_with_stream(request.data, True),
  114. media_type="text/event-stream",
  115. headers=rspHeaders,
  116. )
  117. @app.post("/clarificationByTurbo/", response_model=str)
  118. async def clarificationByTurbo(request: BaseRequest):
  119. print("clarificationByTurbo: ", request)
  120. return EventSourceResponse(
  121. call_with_stream(request.data, True, LLMDict_GPT4_TURBO),
  122. media_type="text/event-stream",
  123. headers=rspHeaders,
  124. )
  125. llm_client = LLMClient(model=llm_name, model_server=model_server)
  126. llm_client_async = LLMAsyncClient(model=llm_name, model_server=model_server)
  127. async def call_with_stream(
  128. question,
  129. history=[],
  130. isClarification=False,
  131. llm_dict=LLMDict_Qwen_72B_1211,
  132. ):
  133. if isClarification:
  134. executor = PlanContinueExecutor(
  135. llm_dict=llm_dict, llm=llm_client_async, stream=True
  136. )
  137. else:
  138. executor = PlanExecutor(llm_dict=llm_dict, llm=llm_client_async, stream=True)
  139. async for rsp in executor.run(question, history):
  140. if not rsp:
  141. continue
  142. else:
  143. time.sleep(0.1)
  144. yield rsp
  145. yield "[DONE]"
  146. yield "[FINISH]"
  147. if __name__ == "__main__":
  148. uvicorn.run(app, host=server_host, port=server_port, workers=1)