Compare commits
No commits in common. "main" and "dev" have entirely different histories.
@ -1,9 +1,5 @@
|
|||||||
REFETCH_PERIOD_H=6
|
REFETCH_PERIOD_H=6
|
||||||
STORE_NULL_BID=True
|
|
||||||
|
|
||||||
POSTGRES_USER=lenenergo
|
POSTGRES_USER=lenenergo
|
||||||
POSTGRES_PASSWORD=lenenergo
|
POSTGRES_PASSWORD=lenenergo
|
||||||
POSTGRES_DB=lenenergo
|
POSTGRES_DB=lenenergo
|
||||||
POSTGRES_HOST=db
|
POSTGRES_HOST=db
|
||||||
# or
|
|
||||||
DB_URL=postgresql://lenenergo:lenenergo@localhost:5432
|
|
||||||
|
2
.gitignore
vendored
2
.gitignore
vendored
@ -5,5 +5,3 @@ __pycache__
|
|||||||
data*.csv
|
data*.csv
|
||||||
.idea/
|
.idea/
|
||||||
.ipynb_checkpoints
|
.ipynb_checkpoints
|
||||||
.vscode/
|
|
||||||
*.odb
|
|
||||||
|
15
.vscode/launch.json
vendored
Normal file
15
.vscode/launch.json
vendored
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
{
|
||||||
|
// Use IntelliSense to learn about possible attributes.
|
||||||
|
// Hover to view descriptions of existing attributes.
|
||||||
|
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
|
||||||
|
"version": "0.2.0",
|
||||||
|
"configurations": [
|
||||||
|
{
|
||||||
|
"name": "Python: Module",
|
||||||
|
"type": "python",
|
||||||
|
"request": "launch",
|
||||||
|
"module": "parser",
|
||||||
|
"justMyCode": true,
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
37
README.md
37
README.md
@ -1,37 +0,0 @@
|
|||||||
# Lenenergo Parser
|
|
||||||
|
|
||||||
## DB columns settings
|
|
||||||
|
|
||||||
Append to `runner/config.py`
|
|
||||||
|
|
||||||
```python
|
|
||||||
DB_COLUMNS_MAP["<COL_NS key>"] = "<corresponding db column name>"
|
|
||||||
```
|
|
||||||
|
|
||||||
## Running instructions
|
|
||||||
|
|
||||||
```bash
|
|
||||||
docker build . -it lenenergo_parser
|
|
||||||
docker run -d \
|
|
||||||
[-e REFETCH_PERIOD_H=4] \ # Refetch period
|
|
||||||
[-e STORE_NULL_BID=False] \ # Store rows with null building_id
|
|
||||||
# DB auth variants
|
|
||||||
[-e POSTGRES_USER=lenenergo] \
|
|
||||||
[-e POSTGRES_PASSWORD=lenenergo] \
|
|
||||||
[-e POSTGRES_DB=lenenergo] \
|
|
||||||
[-e POSTGRES_HOST=localhost] \
|
|
||||||
[-e POSTGRES_PORT=5432] \
|
|
||||||
# or
|
|
||||||
[DB_URL=postgresql://lenenergo:lenenergo@localhost:5432/lenenergo] \
|
|
||||||
lenenergo_parser
|
|
||||||
```
|
|
||||||
|
|
||||||
## Dev instructions
|
|
||||||
|
|
||||||
```bash
|
|
||||||
python -m venv .venv
|
|
||||||
|
|
||||||
pip install -r requirements.txt
|
|
||||||
|
|
||||||
python -m runner
|
|
||||||
```
|
|
@ -31,18 +31,12 @@ def split_addresses(df: pd.DataFrame) -> pd.DataFrame
|
|||||||
```
|
```
|
||||||
- `get_building_id`:
|
- `get_building_id`:
|
||||||
```python
|
```python
|
||||||
def get_building_id(street: str) -> GeoTupleType
|
def get_building_id(street: str) -> Tuple[Optional[int], Optional[float], Optional[float]]
|
||||||
```
|
```
|
||||||
- `fetch_builing_ids`:
|
- `fetch_builing_ids`:
|
||||||
```python
|
```python
|
||||||
def fetch_builing_ids(df: pd.DataFrame) -> pd.DataFrame
|
def fetch_builing_ids(df: pd.DataFrame) -> pd.DataFrame
|
||||||
```
|
```
|
||||||
- `async_fetch_building_id`:
|
|
||||||
```python
|
|
||||||
async def async_fetch_building_id(
|
|
||||||
session: aiohttp.ClientSession, street: str
|
|
||||||
) -> GeoTupleType
|
|
||||||
```
|
|
||||||
- `async_fetch_building_ids`:
|
- `async_fetch_building_ids`:
|
||||||
```python
|
```python
|
||||||
async def async_fetch_building_ids(df: pd.DataFrame) -> pd.DataFrame
|
async def async_fetch_building_ids(df: pd.DataFrame) -> pd.DataFrame
|
||||||
@ -57,15 +51,11 @@ def preprocess_df(df: pd.DataFrame) -> pd.DataFrame
|
|||||||
```
|
```
|
||||||
- `COL_NS`:
|
- `COL_NS`:
|
||||||
```python
|
```python
|
||||||
COL_NS: dict[str, str]
|
COL_NS: Dict[str, str]
|
||||||
```
|
```
|
||||||
- `ICOL_NS`:
|
- `ICOL_NS`:
|
||||||
```python
|
```python
|
||||||
ICOL_NS: dict[str, str]
|
ICOL_NS: Dict[str, str]
|
||||||
```
|
|
||||||
- `PR_COL_NS`:
|
|
||||||
```python
|
|
||||||
PR_COL_NS: tuple[str]
|
|
||||||
```
|
```
|
||||||
- `preprocess_read_df`:
|
- `preprocess_read_df`:
|
||||||
```python
|
```python
|
||||||
@ -92,4 +82,4 @@ pip install -r requirements.txt
|
|||||||
python -m parser [<Период в часах>]
|
python -m parser [<Период в часах>]
|
||||||
```
|
```
|
||||||
|
|
||||||
Формат сохраняемых файлов: `data_%d-%m-%y_%H.%M.csv`
|
Формат сохраняемых файлов: `data_%d-%m-%y_%H:%M.csv`
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
from .address import split_addresses
|
from .address import split_addresses
|
||||||
from .building_id import (
|
from .building_id import (
|
||||||
GeoTupleType,
|
|
||||||
async_fetch_building_id,
|
async_fetch_building_id,
|
||||||
async_fetch_building_ids,
|
async_fetch_building_ids,
|
||||||
concurrent_fetch_builing_ids,
|
concurrent_fetch_builing_ids,
|
||||||
@ -12,23 +11,20 @@ from .pipeline import pipeline
|
|||||||
from .preprocess import (
|
from .preprocess import (
|
||||||
COL_NS,
|
COL_NS,
|
||||||
ICOL_NS,
|
ICOL_NS,
|
||||||
PR_COL_NS,
|
|
||||||
group_by_index,
|
group_by_index,
|
||||||
preprocess_df,
|
preprocess_df,
|
||||||
preprocess_read_df,
|
preprocess_read_df,
|
||||||
)
|
)
|
||||||
|
|
||||||
__all__ = (
|
__all__ = (
|
||||||
"COL_NS",
|
|
||||||
"GeoTupleType",
|
|
||||||
"ICOL_NS",
|
|
||||||
"PR_COL_NS",
|
|
||||||
"async_fetch_building_id",
|
"async_fetch_building_id",
|
||||||
"async_fetch_building_ids",
|
"async_fetch_building_ids",
|
||||||
|
"COL_NS",
|
||||||
"concurrent_fetch_builing_ids",
|
"concurrent_fetch_builing_ids",
|
||||||
"fetch_builing_ids",
|
"fetch_builing_ids",
|
||||||
"get_building_id",
|
"get_building_id",
|
||||||
"group_by_index",
|
"group_by_index",
|
||||||
|
"ICOL_NS",
|
||||||
"LenenergoParser",
|
"LenenergoParser",
|
||||||
"pipeline",
|
"pipeline",
|
||||||
"preprocess_df",
|
"preprocess_df",
|
||||||
|
@ -45,10 +45,7 @@ async def async_fetch_building_id(
|
|||||||
async with session.get(
|
async with session.get(
|
||||||
"https://geocode.gate.petersburg.ru/parse/eas", params={"street": street}
|
"https://geocode.gate.petersburg.ru/parse/eas", params={"street": street}
|
||||||
) as r:
|
) as r:
|
||||||
try:
|
res = await r.json()
|
||||||
res = await r.json()
|
|
||||||
except aiohttp.client_exceptions.ContentTypeError:
|
|
||||||
res = "error"
|
|
||||||
|
|
||||||
if "error" in res:
|
if "error" in res:
|
||||||
return None, None, None
|
return None, None, None
|
||||||
|
@ -21,23 +21,6 @@ COL_NS = {
|
|||||||
"lng": "Долгота",
|
"lng": "Долгота",
|
||||||
}
|
}
|
||||||
|
|
||||||
PR_COL_NS = (
|
|
||||||
"index",
|
|
||||||
"region",
|
|
||||||
"area",
|
|
||||||
"town",
|
|
||||||
"street",
|
|
||||||
"branch",
|
|
||||||
"res",
|
|
||||||
"comment",
|
|
||||||
"building_id",
|
|
||||||
"lat",
|
|
||||||
"lng",
|
|
||||||
"start",
|
|
||||||
"finish",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
ICOL_NS = dict(map(reversed, COL_NS.items()))
|
ICOL_NS = dict(map(reversed, COL_NS.items()))
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,17 +1,11 @@
|
|||||||
import schedule
|
import schedule
|
||||||
|
|
||||||
from .config import REFETCH_PERIOD_H, STORE_NULL_BID
|
from .config import REFETCH_PERIOD_H
|
||||||
from .job import job
|
from .job import job
|
||||||
from .scheduler import run_continuously
|
from .scheduler import run_continuously
|
||||||
|
|
||||||
schedule.every(REFETCH_PERIOD_H).hours.do(job)
|
schedule.every(REFETCH_PERIOD_H).hours.do(job)
|
||||||
stop_run_continuously = run_continuously()
|
stop_run_continuously = run_continuously()
|
||||||
|
|
||||||
print(
|
|
||||||
f"Scheduled to run every {REFETCH_PERIOD_H} hour and "
|
|
||||||
+ ("" if STORE_NULL_BID else "not ")
|
|
||||||
+ "to store NULL building_id"
|
|
||||||
)
|
|
||||||
|
|
||||||
# First run
|
# First run
|
||||||
job()
|
job()
|
||||||
|
@ -1,8 +1,6 @@
|
|||||||
import os
|
import os
|
||||||
from parser import PR_COL_NS
|
|
||||||
|
|
||||||
REFETCH_PERIOD_H = int(os.environ.get("REFETCH_PERIOD_H", "4"))
|
REFETCH_PERIOD_H = int(os.environ.get("REFETCH_PERIOD_H", "4"))
|
||||||
STORE_NULL_BID = os.environ.get("STORE_NULL_BID", "False") == "True"
|
|
||||||
|
|
||||||
POSTGRES_USER = os.environ.get("POSTGRES_USER", "lenenergo")
|
POSTGRES_USER = os.environ.get("POSTGRES_USER", "lenenergo")
|
||||||
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "lenenergo")
|
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "lenenergo")
|
||||||
@ -11,9 +9,3 @@ POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")
|
|||||||
POSTGRES_PORT = int(os.environ.get("POSTGRES_PORT", "5432"))
|
POSTGRES_PORT = int(os.environ.get("POSTGRES_PORT", "5432"))
|
||||||
|
|
||||||
DB_URL = os.environ.get("DB_URL", None)
|
DB_URL = os.environ.get("DB_URL", None)
|
||||||
|
|
||||||
DB_COLUMNS_MAP = dict(zip(PR_COL_NS, PR_COL_NS))
|
|
||||||
"""
|
|
||||||
Feel free to rewrite mapping like
|
|
||||||
DB_COLUMNS_MAP["<COL_NS key>"] = "<corresponding db column name>"
|
|
||||||
"""
|
|
||||||
|
@ -4,12 +4,23 @@ from parser import pipeline
|
|||||||
import pandas as pd
|
import pandas as pd
|
||||||
import psycopg
|
import psycopg
|
||||||
|
|
||||||
from .config import DB_COLUMNS_MAP, STORE_NULL_BID
|
|
||||||
from .database import db_credentials
|
from .database import db_credentials
|
||||||
|
|
||||||
sql_statement = "".join(
|
sql_statement = """COPY records (
|
||||||
("COPY records (", ", ".join(DB_COLUMNS_MAP.values()), ") FROM STDIN")
|
index,
|
||||||
)
|
region,
|
||||||
|
area,
|
||||||
|
town,
|
||||||
|
street,
|
||||||
|
branch,
|
||||||
|
res,
|
||||||
|
comment,
|
||||||
|
building_id,
|
||||||
|
lat,
|
||||||
|
lng,
|
||||||
|
start,
|
||||||
|
finish
|
||||||
|
) FROM STDIN"""
|
||||||
|
|
||||||
|
|
||||||
def job():
|
def job():
|
||||||
@ -25,8 +36,6 @@ def job():
|
|||||||
with cursor.copy(sql_statement) as copy:
|
with cursor.copy(sql_statement) as copy:
|
||||||
for _, row in parser.df.iterrows():
|
for _, row in parser.df.iterrows():
|
||||||
row = row.where((pd.notnull(row)), None)
|
row = row.where((pd.notnull(row)), None)
|
||||||
if row["building_id"] is not None or STORE_NULL_BID:
|
copy.write_row(row.to_list())
|
||||||
db_row = row.rename(DB_COLUMNS_MAP)
|
|
||||||
copy.write_row(db_row.to_list())
|
|
||||||
|
|
||||||
print(f"Fetched in {datetime.now() - fetch_start}\n{parser}")
|
print(f"Fetched in {datetime.now() - fetch_start}\n{parser}")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user