123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- import asyncio
- import json
- import aiohttp
- import urllib.parse
- import random
- from database import Database
- class HighwayDistanceCalculator:
- def __init__(self):
- self.db = Database()
- self.tianditu_key = "3b2d9954565a494ffab9cfd26d7fc522"
- self.base_url = "https://api.tianditu.gov.cn/v2/search"
- self.keywords = ["高速", "互通"]
- # 添加常见的User-Agent列表
- self.user_agents = [
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0",
- "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15"
- ]
- def convert_distance_to_meters(self, distance_str):
- """将距离字符串转换为米"""
- try:
- if 'km' in distance_str:
- # 如果是千米,转换为米
- return int(float(distance_str.replace('km', '')) * 1000)
- else:
- # 如果是米,直接转换
- return int(distance_str.replace('m', ''))
- except Exception as e:
- print(f"距离转换错误: {distance_str}, {str(e)}")
- return None
- async def get_nearest_distance(self, point_lonlat):
- """获取最近的距离"""
- all_pois = []
-
- for keyword in self.keywords:
- post_str = {
- "keyWord": keyword,
- "level": 12,
- "queryRadius": 5000,
- "pointLonlat": point_lonlat,
- "queryType": 3
- }
-
- # 120.38920002653107,30.221665152310052
- print(f"查询关键词: {keyword}, 坐标: {point_lonlat}")
-
- # 将postStr参数进行URL编码
- encoded_post_str = urllib.parse.quote(json.dumps(post_str))
-
- # 构建完整的URL
- url = f"{self.base_url}?postStr={encoded_post_str}&type=query&tk={self.tianditu_key}"
- # 随机选择一个User-Agent
- headers = {
- "User-Agent": random.choice(self.user_agents),
- "Accept": "application/json, text/plain, */*",
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
- "Accept-Encoding": "gzip, deflate, br",
- "Connection": "keep-alive",
- "Referer": "https://map.tianditu.gov.cn/",
- "Origin": "https://map.tianditu.gov.cn"
- }
- async with aiohttp.ClientSession() as session:
- try:
- async with session.get(url, headers=headers) as response:
- if response.status == 200:
- data = await response.json()
- if data.get("pois"):
- for poi in data["pois"]:
- distance_meters = self.convert_distance_to_meters(poi["distance"])
- if distance_meters is not None:
- all_pois.append({
- "distance": distance_meters,
- "name": poi["name"],
- "keyword": keyword
- })
- else:
- print(f"请求失败,状态码: {response.status}")
- print(f"URL: {url}")
- print(f"响应内容: {await response.text()}")
- except Exception as e:
- print(f"请求异常: {str(e)}")
-
- # 添加随机延迟,避免请求过于频繁
- # await asyncio.sleep(random.uniform(1, 3))
- # 按距离排序
- if all_pois:
- all_pois.sort(key=lambda x: x["distance"])
- print(f"找到最近的点: {all_pois[0]['name']} ({all_pois[0]['keyword']}), 距离: {all_pois[0]['distance']}米")
- return all_pois[0]["distance"]
- return None
- async def process_ecgap_klyzy(self):
- """处理ecgap_klyzy表数据"""
- sql = """
- SELECT
- id,
- ST_X(ST_Centroid(shape::geometry)) as center_x,
- ST_Y(ST_Centroid(shape::geometry)) as center_y
- FROM ecgap_klyzy
- WHERE shape IS NOT NULL
- """
-
- try:
- parcels = await self.db.execute_query(sql)
-
- for parcel in parcels:
- id = parcel["id"]
- center_x = parcel["center_x"]
- center_y = parcel["center_y"]
-
- # 构建中心点坐标字符串
- center_point = f"{center_x},{center_y}"
-
- # 获取最近距离
- distance = await self.get_nearest_distance(center_point)
- if distance is not None:
- # 更新数据库
- update_sql = f"""
- UPDATE ecgap_klyzy
- SET gsjl = {distance}
- WHERE id = '{id}'
- """
- await self.db.execute_transaction([update_sql])
- print(f"更新ecgap_klyzy表地块 ID: {id} 的最近距离为 {distance}米")
-
- # 添加随机延迟,避免请求过于频繁
- await asyncio.sleep(random.uniform(1, 3))
-
- except Exception as e:
- print(f"处理ecgap_klyzy表数据错误: {str(e)}")
- async def process_kzxxxgh(self):
- """处理kzxxxgh表数据"""
- sql = """
- SELECT
- id,
- ST_X(ST_Centroid(shape::geometry)) as center_x,
- ST_Y(ST_Centroid(shape::geometry)) as center_y
- FROM kzxxxgh
- WHERE shape IS NOT NULL
- """
-
- try:
- parcels = await self.db.execute_query(sql)
-
- for parcel in parcels:
- id = parcel["id"]
- center_x = parcel["center_x"]
- center_y = parcel["center_y"]
-
- # 构建中心点坐标字符串
- center_point = f"{center_x},{center_y}"
-
- # 获取最近距离
- distance = await self.get_nearest_distance(center_point)
- if distance is not None:
- # 更新数据库
- update_sql = f"""
- UPDATE kzxxxgh
- SET gsjl = {distance}
- WHERE id = '{id}'
- """
- await self.db.execute_transaction([update_sql])
- print(f"更新kzxxxgh表地块 ID: {id} 的最近距离为 {distance}米")
-
- # 添加随机延迟,避免请求过于频繁
- await asyncio.sleep(random.uniform(1, 3))
-
- except Exception as e:
- print(f"处理kzxxxgh表数据错误: {str(e)}")
- async def process_all_parcels(self):
- """处理所有地块数据"""
- try:
- await self.db.connect()
-
- # print("开始处理ecgap_klyzy表...")
- # await self.process_ecgap_klyzy()
-
- print("开始处理kzxxxgh表...")
- await self.process_kzxxxgh()
-
- except Exception as e:
- print(f"处理地块数据错误: {str(e)}")
- finally:
- await self.db.close()
- async def main():
- calculator = HighwayDistanceCalculator()
- await calculator.process_all_parcels()
- if __name__ == "__main__":
- asyncio.run(main())
|