From 5fddedb7091ff14473b204486896776c5e1bbf01 Mon Sep 17 00:00:00 2001
From: dm1sh <me@dmitriy.icu>
Date: Wed, 11 Oct 2023 23:14:51 +0300
Subject: [PATCH] Switched from sqlalchemy to pure psycopg

---
 runner/__main__.py   |  4 ----
 runner/config.py     |  2 ++
 runner/controller.py | 25 -------------------------
 runner/database.py   | 38 ++++++++++----------------------------
 runner/job.py        | 39 ++++++++++++++++++++++++++-------------
 runner/models.py     | 22 ----------------------
 6 files changed, 38 insertions(+), 92 deletions(-)
 delete mode 100644 runner/controller.py
 delete mode 100644 runner/models.py

diff --git a/runner/__main__.py b/runner/__main__.py
index 4dc4f37..40752e4 100644
--- a/runner/__main__.py
+++ b/runner/__main__.py
@@ -1,13 +1,9 @@
 import schedule
 
-from . import models
 from .config import REFETCH_PERIOD_H
-from .database import engine
 from .job import job
 from .scheduler import run_continuously
 
-models.Base.metadata.create_all(bind=engine)
-
 schedule.every(REFETCH_PERIOD_H).hours.do(job)
 stop_run_continuously = run_continuously()
 
diff --git a/runner/config.py b/runner/config.py
index d9b9cd6..2e94e42 100644
--- a/runner/config.py
+++ b/runner/config.py
@@ -7,3 +7,5 @@ POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "lenenergo")
 POSTGRES_DB = os.environ.get("POSTGRES_DB", "lenenergo")
 POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")
 POSTGRES_PORT = int(os.environ.get("POSTGRES_PORT", "5432"))
+
+DB_URL = os.environ.get("DB_URL", None)
diff --git a/runner/controller.py b/runner/controller.py
deleted file mode 100644
index 1976959..0000000
--- a/runner/controller.py
+++ /dev/null
@@ -1,25 +0,0 @@
-from sqlalchemy.orm import Session
-
-from . import models, schemas
-
-
-def create_record(db: Session, record: schemas.Record):
-    db_record = models.Record(
-        region=record.region,
-        area=record.area,
-        town=record.town,
-        street=record.street,
-        start=record.start,
-        finish=record.finish,
-        branch=record.branch,
-        res=record.res,
-        comment=record.comment,
-        building_id=record.building_id,
-        lat=record.lat,
-        lng=record.lng,
-    )
-    db.add(db_record)
-    db.commit()
-    db.refresh(db_record)
-
-    return db_record
diff --git a/runner/database.py b/runner/database.py
index f438836..0da4e76 100644
--- a/runner/database.py
+++ b/runner/database.py
@@ -1,37 +1,19 @@
-from typing import Generator
-
-from sqlalchemy import URL, create_engine
-from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy.orm import Session, sessionmaker
-
 from .config import (
     POSTGRES_DB,
     POSTGRES_HOST,
     POSTGRES_PASSWORD,
     POSTGRES_PORT,
     POSTGRES_USER,
+    DB_URL,
 )
 
-engine = create_engine(
-    URL.create(
-        "postgresql+psycopg",
-        username=POSTGRES_USER,
-        password=POSTGRES_PASSWORD,
-        host=POSTGRES_HOST,
-        port=POSTGRES_PORT,
-        database=POSTGRES_DB,
-    ),
-    client_encoding="utf8",
-)
-SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
+db_credentials = {"conninfo": DB_URL}
 
-Base = declarative_base()
-
-
-# Dependency
-def get_db() -> Generator[Session, None, None]:
-    db = SessionLocal()
-    try:
-        yield db
-    finally:
-        db.close()
+if not DB_URL:
+    db_credentials = {
+        "host": POSTGRES_HOST,
+        "port": POSTGRES_PORT,
+        "dbname": POSTGRES_DB,
+        "user": POSTGRES_USER,
+        "password": POSTGRES_PASSWORD,
+    }
diff --git a/runner/job.py b/runner/job.py
index 77a7ebc..e4060f3 100644
--- a/runner/job.py
+++ b/runner/job.py
@@ -1,28 +1,41 @@
 from datetime import datetime
-
-import pandas as pd
 from parser import pipeline
 
-from . import models
-from .database import get_db
+import pandas as pd
+import psycopg
+
+from .database import db_credentials
+
+sql_statement = """COPY records (
+    index,
+    region,
+    area,
+    town,
+    street,
+    branch,
+    res,
+    comment,
+    building_id,
+    lat,
+    lng,
+    start,
+    finish
+) FROM STDIN"""
 
 
 def job():
     fetch_start = datetime.now()
     print("Starting refetch job: " + fetch_start.isoformat())
 
-    db = next(get_db())
-
     parser = pipeline()
 
-    db.query(models.Record).delete()
-    db.commit()
-
     print("Rewriting db: " + datetime.now().isoformat())
 
-    for i, row in parser.df.iterrows():
-        row = row.where((pd.notnull(row)), None)
-        db.add(models.Record(**row.to_dict()))
-        db.commit()
+    with psycopg.connect(**db_credentials) as connection:
+        with connection.cursor() as cursor:
+            with cursor.copy(sql_statement) as copy:
+                for _, row in parser.df.iterrows():
+                    row = row.where((pd.notnull(row)), None)
+                    copy.write_row(row.to_list())
 
     print(f"Fetched in {datetime.now() - fetch_start}\n{parser}")
diff --git a/runner/models.py b/runner/models.py
deleted file mode 100644
index e27d5ed..0000000
--- a/runner/models.py
+++ /dev/null
@@ -1,22 +0,0 @@
-from sqlalchemy import Column, DateTime, Float, Integer, String
-
-from .database import Base
-
-
-class Record(Base):
-    __tablename__ = "records"
-
-    id = Column(Integer, primary_key=True, index=True)
-    index = Column(Integer)
-    region = Column(String, nullable=True)
-    area = Column(String, nullable=True)
-    town = Column(String, nullable=True)
-    street = Column(String, nullable=True)
-    start = Column(DateTime)
-    finish = Column(DateTime)
-    branch = Column(String, nullable=True)
-    res = Column(String, nullable=True)
-    comment = Column(String, nullable=True)
-    building_id = Column(Integer, nullable=True)
-    lat = Column(Float, nullable=True)
-    lng = Column(Float, nullable=True)