Compare commits

..

2 Commits

Author SHA1 Message Date
5fddedb709
Switched from sqlalchemy to pure psycopg 2023-10-11 23:14:51 +03:00
f3d24fe701
Sorted parser lib exports, fixed preprocess types 2023-10-11 23:13:40 +03:00
8 changed files with 51 additions and 101 deletions

View File

@ -1,10 +1,12 @@
from .address import split_addresses from .address import split_addresses
from .building_id import ( from .building_id import (
async_fetch_building_id,
async_fetch_building_ids, async_fetch_building_ids,
concurrent_fetch_builing_ids, concurrent_fetch_builing_ids,
fetch_builing_ids, fetch_builing_ids,
get_building_id, get_building_id,
) )
from .lenenergo import LenenergoParser
from .preprocess import ( from .preprocess import (
COL_NS, COL_NS,
ICOL_NS, ICOL_NS,
@ -12,20 +14,20 @@ from .preprocess import (
preprocess_df, preprocess_df,
preprocess_read_df, preprocess_read_df,
) )
from .lenenergo import LenenergoParser
from .util import pipeline from .util import pipeline
__all__ = ( __all__ = (
"LenenergoParser", "async_fetch_building_id",
"split_addresses",
"get_building_id",
"fetch_builing_ids",
"async_fetch_building_ids", "async_fetch_building_ids",
"concurrent_fetch_builing_ids",
"preprocess_df",
"COL_NS", "COL_NS",
"ICOL_NS", "concurrent_fetch_builing_ids",
"preprocess_read_df", "fetch_builing_ids",
"get_building_id",
"group_by_index", "group_by_index",
"ICOL_NS",
"LenenergoParser",
"pipeline", "pipeline",
"preprocess_df",
"preprocess_read_df",
"split_addresses",
) )

View File

@ -34,6 +34,8 @@ def preprocess_df(df: pd.DataFrame) -> pd.DataFrame:
) )
df.drop(columns=[f"{a}_date", f"{a}_time"], inplace=True) df.drop(columns=[f"{a}_date", f"{a}_time"], inplace=True)
df = df.convert_dtypes()
return df return df

View File

@ -1,13 +1,9 @@
import schedule import schedule
from . import models
from .config import REFETCH_PERIOD_H from .config import REFETCH_PERIOD_H
from .database import engine
from .job import job from .job import job
from .scheduler import run_continuously from .scheduler import run_continuously
models.Base.metadata.create_all(bind=engine)
schedule.every(REFETCH_PERIOD_H).hours.do(job) schedule.every(REFETCH_PERIOD_H).hours.do(job)
stop_run_continuously = run_continuously() stop_run_continuously = run_continuously()

View File

@ -7,3 +7,5 @@ POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "lenenergo")
POSTGRES_DB = os.environ.get("POSTGRES_DB", "lenenergo") POSTGRES_DB = os.environ.get("POSTGRES_DB", "lenenergo")
POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost") POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")
POSTGRES_PORT = int(os.environ.get("POSTGRES_PORT", "5432")) POSTGRES_PORT = int(os.environ.get("POSTGRES_PORT", "5432"))
DB_URL = os.environ.get("DB_URL", None)

View File

@ -1,25 +0,0 @@
from sqlalchemy.orm import Session
from . import models, schemas
def create_record(db: Session, record: schemas.Record):
db_record = models.Record(
region=record.region,
area=record.area,
town=record.town,
street=record.street,
start=record.start,
finish=record.finish,
branch=record.branch,
res=record.res,
comment=record.comment,
building_id=record.building_id,
lat=record.lat,
lng=record.lng,
)
db.add(db_record)
db.commit()
db.refresh(db_record)
return db_record

View File

@ -1,37 +1,19 @@
from typing import Generator
from sqlalchemy import URL, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session, sessionmaker
from .config import ( from .config import (
POSTGRES_DB, POSTGRES_DB,
POSTGRES_HOST, POSTGRES_HOST,
POSTGRES_PASSWORD, POSTGRES_PASSWORD,
POSTGRES_PORT, POSTGRES_PORT,
POSTGRES_USER, POSTGRES_USER,
DB_URL,
) )
engine = create_engine( db_credentials = {"conninfo": DB_URL}
URL.create(
"postgresql+psycopg",
username=POSTGRES_USER,
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
database=POSTGRES_DB,
),
client_encoding="utf8",
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base() if not DB_URL:
db_credentials = {
"host": POSTGRES_HOST,
# Dependency "port": POSTGRES_PORT,
def get_db() -> Generator[Session, None, None]: "dbname": POSTGRES_DB,
db = SessionLocal() "user": POSTGRES_USER,
try: "password": POSTGRES_PASSWORD,
yield db }
finally:
db.close()

View File

@ -1,28 +1,41 @@
from datetime import datetime from datetime import datetime
import pandas as pd
from parser import pipeline from parser import pipeline
from . import models import pandas as pd
from .database import get_db import psycopg
from .database import db_credentials
sql_statement = """COPY records (
index,
region,
area,
town,
street,
branch,
res,
comment,
building_id,
lat,
lng,
start,
finish
) FROM STDIN"""
def job(): def job():
fetch_start = datetime.now() fetch_start = datetime.now()
print("Starting refetch job: " + fetch_start.isoformat()) print("Starting refetch job: " + fetch_start.isoformat())
db = next(get_db())
parser = pipeline() parser = pipeline()
db.query(models.Record).delete()
db.commit()
print("Rewriting db: " + datetime.now().isoformat()) print("Rewriting db: " + datetime.now().isoformat())
for i, row in parser.df.iterrows(): with psycopg.connect(**db_credentials) as connection:
with connection.cursor() as cursor:
with cursor.copy(sql_statement) as copy:
for _, row in parser.df.iterrows():
row = row.where((pd.notnull(row)), None) row = row.where((pd.notnull(row)), None)
db.add(models.Record(**row.to_dict())) copy.write_row(row.to_list())
db.commit()
print(f"Fetched in {datetime.now() - fetch_start}\n{parser}") print(f"Fetched in {datetime.now() - fetch_start}\n{parser}")

View File

@ -1,22 +0,0 @@
from sqlalchemy import Column, DateTime, Float, Integer, String
from .database import Base
class Record(Base):
__tablename__ = "records"
id = Column(Integer, primary_key=True, index=True)
index = Column(Integer)
region = Column(String, nullable=True)
area = Column(String, nullable=True)
town = Column(String, nullable=True)
street = Column(String, nullable=True)
start = Column(DateTime)
finish = Column(DateTime)
branch = Column(String, nullable=True)
res = Column(String, nullable=True)
comment = Column(String, nullable=True)
building_id = Column(Integer, nullable=True)
lat = Column(Float, nullable=True)
lng = Column(Float, nullable=True)