diff --git a/runner/__main__.py b/runner/__main__.py index 4dc4f37..40752e4 100644 --- a/runner/__main__.py +++ b/runner/__main__.py @@ -1,13 +1,9 @@ import schedule -from . import models from .config import REFETCH_PERIOD_H -from .database import engine from .job import job from .scheduler import run_continuously -models.Base.metadata.create_all(bind=engine) - schedule.every(REFETCH_PERIOD_H).hours.do(job) stop_run_continuously = run_continuously() diff --git a/runner/config.py b/runner/config.py index d9b9cd6..2e94e42 100644 --- a/runner/config.py +++ b/runner/config.py @@ -7,3 +7,5 @@ POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "lenenergo") POSTGRES_DB = os.environ.get("POSTGRES_DB", "lenenergo") POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost") POSTGRES_PORT = int(os.environ.get("POSTGRES_PORT", "5432")) + +DB_URL = os.environ.get("DB_URL", None) diff --git a/runner/controller.py b/runner/controller.py deleted file mode 100644 index 1976959..0000000 --- a/runner/controller.py +++ /dev/null @@ -1,25 +0,0 @@ -from sqlalchemy.orm import Session - -from . import models, schemas - - -def create_record(db: Session, record: schemas.Record): - db_record = models.Record( - region=record.region, - area=record.area, - town=record.town, - street=record.street, - start=record.start, - finish=record.finish, - branch=record.branch, - res=record.res, - comment=record.comment, - building_id=record.building_id, - lat=record.lat, - lng=record.lng, - ) - db.add(db_record) - db.commit() - db.refresh(db_record) - - return db_record diff --git a/runner/database.py b/runner/database.py index f438836..0da4e76 100644 --- a/runner/database.py +++ b/runner/database.py @@ -1,37 +1,19 @@ -from typing import Generator - -from sqlalchemy import URL, create_engine -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import Session, sessionmaker - from .config import ( POSTGRES_DB, POSTGRES_HOST, POSTGRES_PASSWORD, POSTGRES_PORT, POSTGRES_USER, + DB_URL, ) -engine = create_engine( - URL.create( - "postgresql+psycopg", - username=POSTGRES_USER, - password=POSTGRES_PASSWORD, - host=POSTGRES_HOST, - port=POSTGRES_PORT, - database=POSTGRES_DB, - ), - client_encoding="utf8", -) -SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) +db_credentials = {"conninfo": DB_URL} -Base = declarative_base() - - -# Dependency -def get_db() -> Generator[Session, None, None]: - db = SessionLocal() - try: - yield db - finally: - db.close() +if not DB_URL: + db_credentials = { + "host": POSTGRES_HOST, + "port": POSTGRES_PORT, + "dbname": POSTGRES_DB, + "user": POSTGRES_USER, + "password": POSTGRES_PASSWORD, + } diff --git a/runner/job.py b/runner/job.py index 77a7ebc..e4060f3 100644 --- a/runner/job.py +++ b/runner/job.py @@ -1,28 +1,41 @@ from datetime import datetime - -import pandas as pd from parser import pipeline -from . import models -from .database import get_db +import pandas as pd +import psycopg + +from .database import db_credentials + +sql_statement = """COPY records ( + index, + region, + area, + town, + street, + branch, + res, + comment, + building_id, + lat, + lng, + start, + finish +) FROM STDIN""" def job(): fetch_start = datetime.now() print("Starting refetch job: " + fetch_start.isoformat()) - db = next(get_db()) - parser = pipeline() - db.query(models.Record).delete() - db.commit() - print("Rewriting db: " + datetime.now().isoformat()) - for i, row in parser.df.iterrows(): - row = row.where((pd.notnull(row)), None) - db.add(models.Record(**row.to_dict())) - db.commit() + with psycopg.connect(**db_credentials) as connection: + with connection.cursor() as cursor: + with cursor.copy(sql_statement) as copy: + for _, row in parser.df.iterrows(): + row = row.where((pd.notnull(row)), None) + copy.write_row(row.to_list()) print(f"Fetched in {datetime.now() - fetch_start}\n{parser}") diff --git a/runner/models.py b/runner/models.py deleted file mode 100644 index e27d5ed..0000000 --- a/runner/models.py +++ /dev/null @@ -1,22 +0,0 @@ -from sqlalchemy import Column, DateTime, Float, Integer, String - -from .database import Base - - -class Record(Base): - __tablename__ = "records" - - id = Column(Integer, primary_key=True, index=True) - index = Column(Integer) - region = Column(String, nullable=True) - area = Column(String, nullable=True) - town = Column(String, nullable=True) - street = Column(String, nullable=True) - start = Column(DateTime) - finish = Column(DateTime) - branch = Column(String, nullable=True) - res = Column(String, nullable=True) - comment = Column(String, nullable=True) - building_id = Column(Integer, nullable=True) - lat = Column(Float, nullable=True) - lng = Column(Float, nullable=True)