import asyncio import json import aiohttp import urllib.parse import random from database import Database class HighwayDistanceCalculator: def __init__(self): self.db = Database() self.tianditu_key = "3b2d9954565a494ffab9cfd26d7fc522" self.base_url = "https://api.tianditu.gov.cn/v2/search" self.keywords = ["高速", "互通"] # 添加常见的User-Agent列表 self.user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15" ] def convert_distance_to_meters(self, distance_str): """将距离字符串转换为米""" try: if 'km' in distance_str: # 如果是千米,转换为米 return int(float(distance_str.replace('km', '')) * 1000) else: # 如果是米,直接转换 return int(distance_str.replace('m', '')) except Exception as e: print(f"距离转换错误: {distance_str}, {str(e)}") return None async def get_nearest_distance(self, point_lonlat): """获取最近的距离""" all_pois = [] for keyword in self.keywords: post_str = { "keyWord": keyword, "level": 12, "queryRadius": 5000, "pointLonlat": point_lonlat, "queryType": 3 } # 120.38920002653107,30.221665152310052 print(f"查询关键词: {keyword}, 坐标: {point_lonlat}") # 将postStr参数进行URL编码 encoded_post_str = urllib.parse.quote(json.dumps(post_str)) # 构建完整的URL url = f"{self.base_url}?postStr={encoded_post_str}&type=query&tk={self.tianditu_key}" # 随机选择一个User-Agent headers = { "User-Agent": random.choice(self.user_agents), "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Referer": "https://map.tianditu.gov.cn/", "Origin": "https://map.tianditu.gov.cn" } async with aiohttp.ClientSession() as session: try: async with session.get(url, headers=headers) as response: if response.status == 200: data = await response.json() if data.get("pois"): for poi in data["pois"]: distance_meters = self.convert_distance_to_meters(poi["distance"]) if distance_meters is not None: all_pois.append({ "distance": distance_meters, "name": poi["name"], "keyword": keyword }) else: print(f"请求失败,状态码: {response.status}") print(f"URL: {url}") print(f"响应内容: {await response.text()}") except Exception as e: print(f"请求异常: {str(e)}") # 添加随机延迟,避免请求过于频繁 # await asyncio.sleep(random.uniform(1, 3)) # 按距离排序 if all_pois: all_pois.sort(key=lambda x: x["distance"]) print(f"找到最近的点: {all_pois[0]['name']} ({all_pois[0]['keyword']}), 距离: {all_pois[0]['distance']}米") return all_pois[0]["distance"] return None async def process_ecgap_klyzy(self): """处理ecgap_klyzy表数据""" sql = """ SELECT id, ST_X(ST_Centroid(shape::geometry)) as center_x, ST_Y(ST_Centroid(shape::geometry)) as center_y FROM ecgap_klyzy WHERE shape IS NOT NULL """ try: parcels = await self.db.execute_query(sql) for parcel in parcels: id = parcel["id"] center_x = parcel["center_x"] center_y = parcel["center_y"] # 构建中心点坐标字符串 center_point = f"{center_x},{center_y}" # 获取最近距离 distance = await self.get_nearest_distance(center_point) if distance is not None: # 更新数据库 update_sql = f""" UPDATE ecgap_klyzy SET gsjl = {distance} WHERE id = '{id}' """ await self.db.execute_transaction([update_sql]) print(f"更新ecgap_klyzy表地块 ID: {id} 的最近距离为 {distance}米") # 添加随机延迟,避免请求过于频繁 await asyncio.sleep(random.uniform(1, 3)) except Exception as e: print(f"处理ecgap_klyzy表数据错误: {str(e)}") async def process_kzxxxgh(self): """处理kzxxxgh表数据""" sql = """ SELECT id, ST_X(ST_Centroid(shape::geometry)) as center_x, ST_Y(ST_Centroid(shape::geometry)) as center_y FROM kzxxxgh WHERE shape IS NOT NULL """ try: parcels = await self.db.execute_query(sql) for parcel in parcels: id = parcel["id"] center_x = parcel["center_x"] center_y = parcel["center_y"] # 构建中心点坐标字符串 center_point = f"{center_x},{center_y}" # 获取最近距离 distance = await self.get_nearest_distance(center_point) if distance is not None: # 更新数据库 update_sql = f""" UPDATE kzxxxgh SET gsjl = {distance} WHERE id = '{id}' """ await self.db.execute_transaction([update_sql]) print(f"更新kzxxxgh表地块 ID: {id} 的最近距离为 {distance}米") # 添加随机延迟,避免请求过于频繁 await asyncio.sleep(random.uniform(1, 3)) except Exception as e: print(f"处理kzxxxgh表数据错误: {str(e)}") async def process_all_parcels(self): """处理所有地块数据""" try: await self.db.connect() # print("开始处理ecgap_klyzy表...") # await self.process_ecgap_klyzy() print("开始处理kzxxxgh表...") await self.process_kzxxxgh() except Exception as e: print(f"处理地块数据错误: {str(e)}") finally: await self.db.close() async def main(): calculator = HighwayDistanceCalculator() await calculator.process_all_parcels() if __name__ == "__main__": asyncio.run(main())