Compare commits

...

6 Commits
dev ... main

Author SHA1 Message Date
d653810dcb Update README.md 2023-10-30 10:04:09 +03:00
c735ca2ee5
Added readme 2023-10-29 17:09:54 +03:00
2efb4d4846
Merge branch 'dev' 2023-10-29 16:27:41 +03:00
430f36619a
Added saving null responses setting and column name config 2023-10-29 15:36:18 +03:00
367abfc325
Added error handling for geocoder HTTP error 2023-10-29 15:34:42 +03:00
68b92b8bd2
Removed .vscode 2023-10-29 14:36:45 +03:00
12 changed files with 109 additions and 42 deletions

View File

@ -1,5 +1,9 @@
REFETCH_PERIOD_H=6 REFETCH_PERIOD_H=6
STORE_NULL_BID=True
POSTGRES_USER=lenenergo POSTGRES_USER=lenenergo
POSTGRES_PASSWORD=lenenergo POSTGRES_PASSWORD=lenenergo
POSTGRES_DB=lenenergo POSTGRES_DB=lenenergo
POSTGRES_HOST=db POSTGRES_HOST=db
# or
DB_URL=postgresql://lenenergo:lenenergo@localhost:5432

2
.gitignore vendored
View File

@ -5,3 +5,5 @@ __pycache__
data*.csv data*.csv
.idea/ .idea/
.ipynb_checkpoints .ipynb_checkpoints
.vscode/
*.odb

15
.vscode/launch.json vendored
View File

@ -1,15 +0,0 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Module",
"type": "python",
"request": "launch",
"module": "parser",
"justMyCode": true,
}
]
}

37
README.md Normal file
View File

@ -0,0 +1,37 @@
# Lenenergo Parser
## DB columns settings
Append to `runner/config.py`
```python
DB_COLUMNS_MAP["<COL_NS key>"] = "<corresponding db column name>"
```
## Running instructions
```bash
docker build . -it lenenergo_parser
docker run -d \
[-e REFETCH_PERIOD_H=4] \ # Refetch period
[-e STORE_NULL_BID=False] \ # Store rows with null building_id
# DB auth variants
[-e POSTGRES_USER=lenenergo] \
[-e POSTGRES_PASSWORD=lenenergo] \
[-e POSTGRES_DB=lenenergo] \
[-e POSTGRES_HOST=localhost] \
[-e POSTGRES_PORT=5432] \
# or
[DB_URL=postgresql://lenenergo:lenenergo@localhost:5432/lenenergo] \
lenenergo_parser
```
## Dev instructions
```bash
python -m venv .venv
pip install -r requirements.txt
python -m runner
```

View File

@ -31,12 +31,18 @@ def split_addresses(df: pd.DataFrame) -> pd.DataFrame
``` ```
- `get_building_id`: - `get_building_id`:
```python ```python
def get_building_id(street: str) -> Tuple[Optional[int], Optional[float], Optional[float]] def get_building_id(street: str) -> GeoTupleType
``` ```
- `fetch_builing_ids`: - `fetch_builing_ids`:
```python ```python
def fetch_builing_ids(df: pd.DataFrame) -> pd.DataFrame def fetch_builing_ids(df: pd.DataFrame) -> pd.DataFrame
``` ```
- `async_fetch_building_id`:
```python
async def async_fetch_building_id(
session: aiohttp.ClientSession, street: str
) -> GeoTupleType
```
- `async_fetch_building_ids`: - `async_fetch_building_ids`:
```python ```python
async def async_fetch_building_ids(df: pd.DataFrame) -> pd.DataFrame async def async_fetch_building_ids(df: pd.DataFrame) -> pd.DataFrame
@ -51,11 +57,15 @@ def preprocess_df(df: pd.DataFrame) -> pd.DataFrame
``` ```
- `COL_NS`: - `COL_NS`:
```python ```python
COL_NS: Dict[str, str] COL_NS: dict[str, str]
``` ```
- `ICOL_NS`: - `ICOL_NS`:
```python ```python
ICOL_NS: Dict[str, str] ICOL_NS: dict[str, str]
```
- `PR_COL_NS`:
```python
PR_COL_NS: tuple[str]
``` ```
- `preprocess_read_df`: - `preprocess_read_df`:
```python ```python
@ -82,4 +92,4 @@ pip install -r requirements.txt
python -m parser [<Период в часах>] python -m parser [<Период в часах>]
``` ```
Формат сохраняемых файлов: `data_%d-%m-%y_%H:%M.csv` Формат сохраняемых файлов: `data_%d-%m-%y_%H.%M.csv`

View File

@ -1,5 +1,6 @@
from .address import split_addresses from .address import split_addresses
from .building_id import ( from .building_id import (
GeoTupleType,
async_fetch_building_id, async_fetch_building_id,
async_fetch_building_ids, async_fetch_building_ids,
concurrent_fetch_builing_ids, concurrent_fetch_builing_ids,
@ -11,20 +12,23 @@ from .pipeline import pipeline
from .preprocess import ( from .preprocess import (
COL_NS, COL_NS,
ICOL_NS, ICOL_NS,
PR_COL_NS,
group_by_index, group_by_index,
preprocess_df, preprocess_df,
preprocess_read_df, preprocess_read_df,
) )
__all__ = ( __all__ = (
"COL_NS",
"GeoTupleType",
"ICOL_NS",
"PR_COL_NS",
"async_fetch_building_id", "async_fetch_building_id",
"async_fetch_building_ids", "async_fetch_building_ids",
"COL_NS",
"concurrent_fetch_builing_ids", "concurrent_fetch_builing_ids",
"fetch_builing_ids", "fetch_builing_ids",
"get_building_id", "get_building_id",
"group_by_index", "group_by_index",
"ICOL_NS",
"LenenergoParser", "LenenergoParser",
"pipeline", "pipeline",
"preprocess_df", "preprocess_df",

View File

@ -45,7 +45,10 @@ async def async_fetch_building_id(
async with session.get( async with session.get(
"https://geocode.gate.petersburg.ru/parse/eas", params={"street": street} "https://geocode.gate.petersburg.ru/parse/eas", params={"street": street}
) as r: ) as r:
try:
res = await r.json() res = await r.json()
except aiohttp.client_exceptions.ContentTypeError:
res = "error"
if "error" in res: if "error" in res:
return None, None, None return None, None, None

View File

@ -21,6 +21,23 @@ COL_NS = {
"lng": "Долгота", "lng": "Долгота",
} }
PR_COL_NS = (
"index",
"region",
"area",
"town",
"street",
"branch",
"res",
"comment",
"building_id",
"lat",
"lng",
"start",
"finish",
)
ICOL_NS = dict(map(reversed, COL_NS.items())) ICOL_NS = dict(map(reversed, COL_NS.items()))

View File

@ -1,11 +1,17 @@
import schedule import schedule
from .config import REFETCH_PERIOD_H from .config import REFETCH_PERIOD_H, STORE_NULL_BID
from .job import job from .job import job
from .scheduler import run_continuously from .scheduler import run_continuously
schedule.every(REFETCH_PERIOD_H).hours.do(job) schedule.every(REFETCH_PERIOD_H).hours.do(job)
stop_run_continuously = run_continuously() stop_run_continuously = run_continuously()
print(
f"Scheduled to run every {REFETCH_PERIOD_H} hour and "
+ ("" if STORE_NULL_BID else "not ")
+ "to store NULL building_id"
)
# First run # First run
job() job()

View File

@ -1,6 +1,8 @@
import os import os
from parser import PR_COL_NS
REFETCH_PERIOD_H = int(os.environ.get("REFETCH_PERIOD_H", "4")) REFETCH_PERIOD_H = int(os.environ.get("REFETCH_PERIOD_H", "4"))
STORE_NULL_BID = os.environ.get("STORE_NULL_BID", "False") == "True"
POSTGRES_USER = os.environ.get("POSTGRES_USER", "lenenergo") POSTGRES_USER = os.environ.get("POSTGRES_USER", "lenenergo")
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "lenenergo") POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "lenenergo")
@ -9,3 +11,9 @@ POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")
POSTGRES_PORT = int(os.environ.get("POSTGRES_PORT", "5432")) POSTGRES_PORT = int(os.environ.get("POSTGRES_PORT", "5432"))
DB_URL = os.environ.get("DB_URL", None) DB_URL = os.environ.get("DB_URL", None)
DB_COLUMNS_MAP = dict(zip(PR_COL_NS, PR_COL_NS))
"""
Feel free to rewrite mapping like
DB_COLUMNS_MAP["<COL_NS key>"] = "<corresponding db column name>"
"""

View File

@ -4,23 +4,12 @@ from parser import pipeline
import pandas as pd import pandas as pd
import psycopg import psycopg
from .config import DB_COLUMNS_MAP, STORE_NULL_BID
from .database import db_credentials from .database import db_credentials
sql_statement = """COPY records ( sql_statement = "".join(
index, ("COPY records (", ", ".join(DB_COLUMNS_MAP.values()), ") FROM STDIN")
region, )
area,
town,
street,
branch,
res,
comment,
building_id,
lat,
lng,
start,
finish
) FROM STDIN"""
def job(): def job():
@ -36,6 +25,8 @@ def job():
with cursor.copy(sql_statement) as copy: with cursor.copy(sql_statement) as copy:
for _, row in parser.df.iterrows(): for _, row in parser.df.iterrows():
row = row.where((pd.notnull(row)), None) row = row.where((pd.notnull(row)), None)
copy.write_row(row.to_list()) if row["building_id"] is not None or STORE_NULL_BID:
db_row = row.rename(DB_COLUMNS_MAP)
copy.write_row(db_row.to_list())
print(f"Fetched in {datetime.now() - fetch_start}\n{parser}") print(f"Fetched in {datetime.now() - fetch_start}\n{parser}")