Added async building_id fetching and defaulted it
This commit is contained in:
@@ -33,6 +33,14 @@ def split_addresses(df: pd.DataFrame) -> pd.DataFrame
|
||||
```python
|
||||
def fetch_builing_ids(df: pd.DataFrame) -> pd.DataFrame
|
||||
```
|
||||
- `async_fetch_building_ids`:
|
||||
```python
|
||||
async def async_fetch_building_ids(df: pd.DataFrame) -> pd.DataFrame
|
||||
```
|
||||
- `concurrent_fetch_builing_ids`:
|
||||
```python
|
||||
def concurrent_fetch_builing_ids(df: pd.Dataframe) -> pd.DataFrame
|
||||
```
|
||||
- `preprocess_df`:
|
||||
```python
|
||||
def preprocess_df(df: pd.DataFrame) -> pd.DataFrame
|
||||
|
@@ -1,5 +1,5 @@
|
||||
from .rosseti import RossetiParser
|
||||
from .address import split_addresses
|
||||
from .building_id import fetch_builing_ids
|
||||
from .building_id import fetch_builing_ids, async_fetch_building_ids, concurrent_fetch_builing_ids
|
||||
from .preprocess import preprocess_df, COL_NS, ICOL_NS, preprocess_read_df, group_by_index
|
||||
from .util import pipeline
|
||||
|
@@ -4,6 +4,8 @@ from typing import Optional, Tuple, Any, List
|
||||
import requests
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import asyncio
|
||||
import aiohttp
|
||||
|
||||
GeoTupleType = Tuple[Optional[int], Optional[float], Optional[float]]
|
||||
|
||||
@@ -29,3 +31,42 @@ def fetch_builing_ids(df: pd.DataFrame) -> pd.DataFrame:
|
||||
lambda row: get_building_id(row['Улица']), axis=1, result_type='expand')
|
||||
|
||||
return df
|
||||
|
||||
|
||||
async def async_fetch_building_id(session: aiohttp.ClientSession, street: str) -> GeoTupleType:
|
||||
if pd.isnull(street):
|
||||
return None, None, None
|
||||
|
||||
async with session.get('https://geocode.gate.petersburg.ru/parse/eas', params={
|
||||
'street': street
|
||||
}) as r:
|
||||
res = await r.json()
|
||||
|
||||
if 'error' in res:
|
||||
return None, None, None
|
||||
|
||||
return res['Building_ID'], res['Latitude'], res['Longitude']
|
||||
|
||||
|
||||
async def async_fetch_building_ids(df: pd.DataFrame) -> pd.DataFrame:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
tasks = []
|
||||
|
||||
for _, row in df.iterrows():
|
||||
tasks.append(
|
||||
asyncio.ensure_future(
|
||||
async_fetch_building_id(session, row['Улица'])
|
||||
)
|
||||
)
|
||||
|
||||
res = await asyncio.gather(*tasks)
|
||||
|
||||
df[['ID здания', 'Широта', 'Долгота']] = res
|
||||
|
||||
return df
|
||||
|
||||
|
||||
def concurrent_fetch_builing_ids(df: pd.Dataframe) -> pd.DataFrame:
|
||||
return asyncio.run(
|
||||
async_fetch_building_ids(df)
|
||||
)
|
||||
|
@@ -1,6 +1,6 @@
|
||||
from typing import Optional
|
||||
|
||||
from . import RossetiParser, split_addresses, fetch_builing_ids, preprocess_df
|
||||
from . import RossetiParser, split_addresses, concurrent_fetch_builing_ids, preprocess_df
|
||||
|
||||
|
||||
def pipeline(parser: Optional[RossetiParser] = None) -> RossetiParser:
|
||||
@@ -11,7 +11,7 @@ def pipeline(parser: Optional[RossetiParser] = None) -> RossetiParser:
|
||||
|
||||
parser.df = split_addresses(parser.df)
|
||||
|
||||
parser.df = fetch_builing_ids(parser.df)
|
||||
parser.df = concurrent_fetch_builing_ids(parser.df)
|
||||
|
||||
parser.df = preprocess_df(parser.df)
|
||||
|
||||
|
Reference in New Issue
Block a user