Added saving null responses setting and column name config
This commit is contained in:
parent
367abfc325
commit
430f36619a
@ -10,6 +10,7 @@ from .lenenergo import LenenergoParser
|
|||||||
from .preprocess import (
|
from .preprocess import (
|
||||||
COL_NS,
|
COL_NS,
|
||||||
ICOL_NS,
|
ICOL_NS,
|
||||||
|
PR_COL_NS,
|
||||||
group_by_index,
|
group_by_index,
|
||||||
preprocess_df,
|
preprocess_df,
|
||||||
preprocess_read_df,
|
preprocess_read_df,
|
||||||
@ -20,6 +21,7 @@ __all__ = (
|
|||||||
"async_fetch_building_id",
|
"async_fetch_building_id",
|
||||||
"async_fetch_building_ids",
|
"async_fetch_building_ids",
|
||||||
"COL_NS",
|
"COL_NS",
|
||||||
|
"PR_COL_NS",
|
||||||
"concurrent_fetch_builing_ids",
|
"concurrent_fetch_builing_ids",
|
||||||
"fetch_builing_ids",
|
"fetch_builing_ids",
|
||||||
"get_building_id",
|
"get_building_id",
|
||||||
|
@ -21,6 +21,23 @@ COL_NS = {
|
|||||||
"lng": "Долгота",
|
"lng": "Долгота",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PR_COL_NS = (
|
||||||
|
"index",
|
||||||
|
"region",
|
||||||
|
"area",
|
||||||
|
"town",
|
||||||
|
"street",
|
||||||
|
"branch",
|
||||||
|
"res",
|
||||||
|
"comment",
|
||||||
|
"building_id",
|
||||||
|
"lat",
|
||||||
|
"lng",
|
||||||
|
"start",
|
||||||
|
"finish",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
ICOL_NS = dict(map(reversed, COL_NS.items()))
|
ICOL_NS = dict(map(reversed, COL_NS.items()))
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,11 +1,17 @@
|
|||||||
import schedule
|
import schedule
|
||||||
|
|
||||||
from .config import REFETCH_PERIOD_H
|
from .config import REFETCH_PERIOD_H, STORE_NULL_BID
|
||||||
from .job import job
|
from .job import job
|
||||||
from .scheduler import run_continuously
|
from .scheduler import run_continuously
|
||||||
|
|
||||||
schedule.every(REFETCH_PERIOD_H).hours.do(job)
|
schedule.every(REFETCH_PERIOD_H).hours.do(job)
|
||||||
stop_run_continuously = run_continuously()
|
stop_run_continuously = run_continuously()
|
||||||
|
|
||||||
|
print(
|
||||||
|
f"Scheduled to run every {REFETCH_PERIOD_H} hour and "
|
||||||
|
+ ("" if STORE_NULL_BID else "not ")
|
||||||
|
+ "to store NULL building_id"
|
||||||
|
)
|
||||||
|
|
||||||
# First run
|
# First run
|
||||||
job()
|
job()
|
||||||
|
@ -1,6 +1,9 @@
|
|||||||
import os
|
import os
|
||||||
|
|
||||||
|
from parser import PR_COL_NS
|
||||||
|
|
||||||
REFETCH_PERIOD_H = int(os.environ.get("REFETCH_PERIOD_H", "4"))
|
REFETCH_PERIOD_H = int(os.environ.get("REFETCH_PERIOD_H", "4"))
|
||||||
|
STORE_NULL_BID = os.environ.get("STORE_NULL_BID", "False") == "True"
|
||||||
|
|
||||||
POSTGRES_USER = os.environ.get("POSTGRES_USER", "lenenergo")
|
POSTGRES_USER = os.environ.get("POSTGRES_USER", "lenenergo")
|
||||||
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "lenenergo")
|
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "lenenergo")
|
||||||
@ -9,3 +12,9 @@ POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")
|
|||||||
POSTGRES_PORT = int(os.environ.get("POSTGRES_PORT", "5432"))
|
POSTGRES_PORT = int(os.environ.get("POSTGRES_PORT", "5432"))
|
||||||
|
|
||||||
DB_URL = os.environ.get("DB_URL", None)
|
DB_URL = os.environ.get("DB_URL", None)
|
||||||
|
|
||||||
|
DB_COLUMNS_MAP = dict(zip(PR_COL_NS, PR_COL_NS))
|
||||||
|
"""
|
||||||
|
Feel free to rewrite mapping like
|
||||||
|
DB_COLUMNS_MAP["<COL_NS key>"] = "<corresponding db column name>"
|
||||||
|
"""
|
||||||
|
@ -4,23 +4,13 @@ from parser import pipeline
|
|||||||
import pandas as pd
|
import pandas as pd
|
||||||
import psycopg
|
import psycopg
|
||||||
|
|
||||||
|
from .config import DB_COLUMNS_MAP, STORE_NULL_BID
|
||||||
from .database import db_credentials
|
from .database import db_credentials
|
||||||
|
|
||||||
sql_statement = """COPY records (
|
|
||||||
index,
|
sql_statement = "".join(
|
||||||
region,
|
("COPY records (", ", ".join(DB_COLUMNS_MAP.values()), ") FROM STDIN")
|
||||||
area,
|
)
|
||||||
town,
|
|
||||||
street,
|
|
||||||
branch,
|
|
||||||
res,
|
|
||||||
comment,
|
|
||||||
building_id,
|
|
||||||
lat,
|
|
||||||
lng,
|
|
||||||
start,
|
|
||||||
finish
|
|
||||||
) FROM STDIN"""
|
|
||||||
|
|
||||||
|
|
||||||
def job():
|
def job():
|
||||||
@ -36,6 +26,8 @@ def job():
|
|||||||
with cursor.copy(sql_statement) as copy:
|
with cursor.copy(sql_statement) as copy:
|
||||||
for _, row in parser.df.iterrows():
|
for _, row in parser.df.iterrows():
|
||||||
row = row.where((pd.notnull(row)), None)
|
row = row.where((pd.notnull(row)), None)
|
||||||
copy.write_row(row.to_list())
|
if row["building_id"] is not None or STORE_NULL_BID:
|
||||||
|
db_row = row.rename(DB_COLUMNS_MAP)
|
||||||
|
copy.write_row(db_row.to_list())
|
||||||
|
|
||||||
print(f"Fetched in {datetime.now() - fetch_start}\n{parser}")
|
print(f"Fetched in {datetime.now() - fetch_start}\n{parser}")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user