Compare commits
No commits in common. "5fddedb7091ff14473b204486896776c5e1bbf01" and "9cd21f717dfc99a2c5e58e3cba59a085426c8f50" have entirely different histories.
5fddedb709
...
9cd21f717d
@ -1,12 +1,10 @@
|
|||||||
from .address import split_addresses
|
from .address import split_addresses
|
||||||
from .building_id import (
|
from .building_id import (
|
||||||
async_fetch_building_id,
|
|
||||||
async_fetch_building_ids,
|
async_fetch_building_ids,
|
||||||
concurrent_fetch_builing_ids,
|
concurrent_fetch_builing_ids,
|
||||||
fetch_builing_ids,
|
fetch_builing_ids,
|
||||||
get_building_id,
|
get_building_id,
|
||||||
)
|
)
|
||||||
from .lenenergo import LenenergoParser
|
|
||||||
from .preprocess import (
|
from .preprocess import (
|
||||||
COL_NS,
|
COL_NS,
|
||||||
ICOL_NS,
|
ICOL_NS,
|
||||||
@ -14,20 +12,20 @@ from .preprocess import (
|
|||||||
preprocess_df,
|
preprocess_df,
|
||||||
preprocess_read_df,
|
preprocess_read_df,
|
||||||
)
|
)
|
||||||
|
from .lenenergo import LenenergoParser
|
||||||
from .util import pipeline
|
from .util import pipeline
|
||||||
|
|
||||||
__all__ = (
|
__all__ = (
|
||||||
"async_fetch_building_id",
|
|
||||||
"async_fetch_building_ids",
|
|
||||||
"COL_NS",
|
|
||||||
"concurrent_fetch_builing_ids",
|
|
||||||
"fetch_builing_ids",
|
|
||||||
"get_building_id",
|
|
||||||
"group_by_index",
|
|
||||||
"ICOL_NS",
|
|
||||||
"LenenergoParser",
|
"LenenergoParser",
|
||||||
"pipeline",
|
|
||||||
"preprocess_df",
|
|
||||||
"preprocess_read_df",
|
|
||||||
"split_addresses",
|
"split_addresses",
|
||||||
|
"get_building_id",
|
||||||
|
"fetch_builing_ids",
|
||||||
|
"async_fetch_building_ids",
|
||||||
|
"concurrent_fetch_builing_ids",
|
||||||
|
"preprocess_df",
|
||||||
|
"COL_NS",
|
||||||
|
"ICOL_NS",
|
||||||
|
"preprocess_read_df",
|
||||||
|
"group_by_index",
|
||||||
|
"pipeline",
|
||||||
)
|
)
|
||||||
|
@ -34,8 +34,6 @@ def preprocess_df(df: pd.DataFrame) -> pd.DataFrame:
|
|||||||
)
|
)
|
||||||
df.drop(columns=[f"{a}_date", f"{a}_time"], inplace=True)
|
df.drop(columns=[f"{a}_date", f"{a}_time"], inplace=True)
|
||||||
|
|
||||||
df = df.convert_dtypes()
|
|
||||||
|
|
||||||
return df
|
return df
|
||||||
|
|
||||||
|
|
||||||
|
@ -1,9 +1,13 @@
|
|||||||
import schedule
|
import schedule
|
||||||
|
|
||||||
|
from . import models
|
||||||
from .config import REFETCH_PERIOD_H
|
from .config import REFETCH_PERIOD_H
|
||||||
|
from .database import engine
|
||||||
from .job import job
|
from .job import job
|
||||||
from .scheduler import run_continuously
|
from .scheduler import run_continuously
|
||||||
|
|
||||||
|
models.Base.metadata.create_all(bind=engine)
|
||||||
|
|
||||||
schedule.every(REFETCH_PERIOD_H).hours.do(job)
|
schedule.every(REFETCH_PERIOD_H).hours.do(job)
|
||||||
stop_run_continuously = run_continuously()
|
stop_run_continuously = run_continuously()
|
||||||
|
|
||||||
|
@ -7,5 +7,3 @@ POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "lenenergo")
|
|||||||
POSTGRES_DB = os.environ.get("POSTGRES_DB", "lenenergo")
|
POSTGRES_DB = os.environ.get("POSTGRES_DB", "lenenergo")
|
||||||
POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")
|
POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")
|
||||||
POSTGRES_PORT = int(os.environ.get("POSTGRES_PORT", "5432"))
|
POSTGRES_PORT = int(os.environ.get("POSTGRES_PORT", "5432"))
|
||||||
|
|
||||||
DB_URL = os.environ.get("DB_URL", None)
|
|
||||||
|
25
runner/controller.py
Normal file
25
runner/controller.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from . import models, schemas
|
||||||
|
|
||||||
|
|
||||||
|
def create_record(db: Session, record: schemas.Record):
|
||||||
|
db_record = models.Record(
|
||||||
|
region=record.region,
|
||||||
|
area=record.area,
|
||||||
|
town=record.town,
|
||||||
|
street=record.street,
|
||||||
|
start=record.start,
|
||||||
|
finish=record.finish,
|
||||||
|
branch=record.branch,
|
||||||
|
res=record.res,
|
||||||
|
comment=record.comment,
|
||||||
|
building_id=record.building_id,
|
||||||
|
lat=record.lat,
|
||||||
|
lng=record.lng,
|
||||||
|
)
|
||||||
|
db.add(db_record)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(db_record)
|
||||||
|
|
||||||
|
return db_record
|
@ -1,19 +1,37 @@
|
|||||||
|
from typing import Generator
|
||||||
|
|
||||||
|
from sqlalchemy import URL, create_engine
|
||||||
|
from sqlalchemy.ext.declarative import declarative_base
|
||||||
|
from sqlalchemy.orm import Session, sessionmaker
|
||||||
|
|
||||||
from .config import (
|
from .config import (
|
||||||
POSTGRES_DB,
|
POSTGRES_DB,
|
||||||
POSTGRES_HOST,
|
POSTGRES_HOST,
|
||||||
POSTGRES_PASSWORD,
|
POSTGRES_PASSWORD,
|
||||||
POSTGRES_PORT,
|
POSTGRES_PORT,
|
||||||
POSTGRES_USER,
|
POSTGRES_USER,
|
||||||
DB_URL,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
db_credentials = {"conninfo": DB_URL}
|
engine = create_engine(
|
||||||
|
URL.create(
|
||||||
|
"postgresql+psycopg",
|
||||||
|
username=POSTGRES_USER,
|
||||||
|
password=POSTGRES_PASSWORD,
|
||||||
|
host=POSTGRES_HOST,
|
||||||
|
port=POSTGRES_PORT,
|
||||||
|
database=POSTGRES_DB,
|
||||||
|
),
|
||||||
|
client_encoding="utf8",
|
||||||
|
)
|
||||||
|
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||||
|
|
||||||
if not DB_URL:
|
Base = declarative_base()
|
||||||
db_credentials = {
|
|
||||||
"host": POSTGRES_HOST,
|
|
||||||
"port": POSTGRES_PORT,
|
# Dependency
|
||||||
"dbname": POSTGRES_DB,
|
def get_db() -> Generator[Session, None, None]:
|
||||||
"user": POSTGRES_USER,
|
db = SessionLocal()
|
||||||
"password": POSTGRES_PASSWORD,
|
try:
|
||||||
}
|
yield db
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
@ -1,41 +1,28 @@
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from parser import pipeline
|
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import psycopg
|
from parser import pipeline
|
||||||
|
|
||||||
from .database import db_credentials
|
from . import models
|
||||||
|
from .database import get_db
|
||||||
sql_statement = """COPY records (
|
|
||||||
index,
|
|
||||||
region,
|
|
||||||
area,
|
|
||||||
town,
|
|
||||||
street,
|
|
||||||
branch,
|
|
||||||
res,
|
|
||||||
comment,
|
|
||||||
building_id,
|
|
||||||
lat,
|
|
||||||
lng,
|
|
||||||
start,
|
|
||||||
finish
|
|
||||||
) FROM STDIN"""
|
|
||||||
|
|
||||||
|
|
||||||
def job():
|
def job():
|
||||||
fetch_start = datetime.now()
|
fetch_start = datetime.now()
|
||||||
print("Starting refetch job: " + fetch_start.isoformat())
|
print("Starting refetch job: " + fetch_start.isoformat())
|
||||||
|
|
||||||
|
db = next(get_db())
|
||||||
|
|
||||||
parser = pipeline()
|
parser = pipeline()
|
||||||
|
|
||||||
|
db.query(models.Record).delete()
|
||||||
|
db.commit()
|
||||||
|
|
||||||
print("Rewriting db: " + datetime.now().isoformat())
|
print("Rewriting db: " + datetime.now().isoformat())
|
||||||
|
|
||||||
with psycopg.connect(**db_credentials) as connection:
|
for i, row in parser.df.iterrows():
|
||||||
with connection.cursor() as cursor:
|
row = row.where((pd.notnull(row)), None)
|
||||||
with cursor.copy(sql_statement) as copy:
|
db.add(models.Record(**row.to_dict()))
|
||||||
for _, row in parser.df.iterrows():
|
db.commit()
|
||||||
row = row.where((pd.notnull(row)), None)
|
|
||||||
copy.write_row(row.to_list())
|
|
||||||
|
|
||||||
print(f"Fetched in {datetime.now() - fetch_start}\n{parser}")
|
print(f"Fetched in {datetime.now() - fetch_start}\n{parser}")
|
||||||
|
22
runner/models.py
Normal file
22
runner/models.py
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
from sqlalchemy import Column, DateTime, Float, Integer, String
|
||||||
|
|
||||||
|
from .database import Base
|
||||||
|
|
||||||
|
|
||||||
|
class Record(Base):
|
||||||
|
__tablename__ = "records"
|
||||||
|
|
||||||
|
id = Column(Integer, primary_key=True, index=True)
|
||||||
|
index = Column(Integer)
|
||||||
|
region = Column(String, nullable=True)
|
||||||
|
area = Column(String, nullable=True)
|
||||||
|
town = Column(String, nullable=True)
|
||||||
|
street = Column(String, nullable=True)
|
||||||
|
start = Column(DateTime)
|
||||||
|
finish = Column(DateTime)
|
||||||
|
branch = Column(String, nullable=True)
|
||||||
|
res = Column(String, nullable=True)
|
||||||
|
comment = Column(String, nullable=True)
|
||||||
|
building_id = Column(Integer, nullable=True)
|
||||||
|
lat = Column(Float, nullable=True)
|
||||||
|
lng = Column(Float, nullable=True)
|
Loading…
x
Reference in New Issue
Block a user