diff --git a/parser/__init__.py b/parser/__init__.py index cd4b8b3..0f3b9e1 100644 --- a/parser/__init__.py +++ b/parser/__init__.py @@ -10,6 +10,7 @@ from .lenenergo import LenenergoParser from .preprocess import ( COL_NS, ICOL_NS, + PR_COL_NS, group_by_index, preprocess_df, preprocess_read_df, @@ -20,6 +21,7 @@ __all__ = ( "async_fetch_building_id", "async_fetch_building_ids", "COL_NS", + "PR_COL_NS", "concurrent_fetch_builing_ids", "fetch_builing_ids", "get_building_id", diff --git a/parser/preprocess.py b/parser/preprocess.py index 5de5c98..cd84da3 100644 --- a/parser/preprocess.py +++ b/parser/preprocess.py @@ -21,6 +21,23 @@ COL_NS = { "lng": "Долгота", } +PR_COL_NS = ( + "index", + "region", + "area", + "town", + "street", + "branch", + "res", + "comment", + "building_id", + "lat", + "lng", + "start", + "finish", +) + + ICOL_NS = dict(map(reversed, COL_NS.items())) diff --git a/runner/__main__.py b/runner/__main__.py index 40752e4..e3089fe 100644 --- a/runner/__main__.py +++ b/runner/__main__.py @@ -1,11 +1,17 @@ import schedule -from .config import REFETCH_PERIOD_H +from .config import REFETCH_PERIOD_H, STORE_NULL_BID from .job import job from .scheduler import run_continuously schedule.every(REFETCH_PERIOD_H).hours.do(job) stop_run_continuously = run_continuously() +print( + f"Scheduled to run every {REFETCH_PERIOD_H} hour and " + + ("" if STORE_NULL_BID else "not ") + + "to store NULL building_id" +) + # First run job() diff --git a/runner/config.py b/runner/config.py index 2e94e42..add0d99 100644 --- a/runner/config.py +++ b/runner/config.py @@ -1,6 +1,9 @@ import os +from parser import PR_COL_NS + REFETCH_PERIOD_H = int(os.environ.get("REFETCH_PERIOD_H", "4")) +STORE_NULL_BID = os.environ.get("STORE_NULL_BID", "False") == "True" POSTGRES_USER = os.environ.get("POSTGRES_USER", "lenenergo") POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "lenenergo") @@ -9,3 +12,9 @@ POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost") POSTGRES_PORT = int(os.environ.get("POSTGRES_PORT", "5432")) DB_URL = os.environ.get("DB_URL", None) + +DB_COLUMNS_MAP = dict(zip(PR_COL_NS, PR_COL_NS)) +""" +Feel free to rewrite mapping like +DB_COLUMNS_MAP[""] = "" +""" diff --git a/runner/job.py b/runner/job.py index e4060f3..a2af302 100644 --- a/runner/job.py +++ b/runner/job.py @@ -4,23 +4,13 @@ from parser import pipeline import pandas as pd import psycopg +from .config import DB_COLUMNS_MAP, STORE_NULL_BID from .database import db_credentials -sql_statement = """COPY records ( - index, - region, - area, - town, - street, - branch, - res, - comment, - building_id, - lat, - lng, - start, - finish -) FROM STDIN""" + +sql_statement = "".join( + ("COPY records (", ", ".join(DB_COLUMNS_MAP.values()), ") FROM STDIN") +) def job(): @@ -36,6 +26,8 @@ def job(): with cursor.copy(sql_statement) as copy: for _, row in parser.df.iterrows(): row = row.where((pd.notnull(row)), None) - copy.write_row(row.to_list()) + if row["building_id"] is not None or STORE_NULL_BID: + db_row = row.rename(DB_COLUMNS_MAP) + copy.write_row(db_row.to_list()) print(f"Fetched in {datetime.now() - fetch_start}\n{parser}")