Switched from sqlalchemy to pure psycopg
This commit is contained in:
parent
f3d24fe701
commit
5fddedb709
@ -1,13 +1,9 @@
|
||||
import schedule
|
||||
|
||||
from . import models
|
||||
from .config import REFETCH_PERIOD_H
|
||||
from .database import engine
|
||||
from .job import job
|
||||
from .scheduler import run_continuously
|
||||
|
||||
models.Base.metadata.create_all(bind=engine)
|
||||
|
||||
schedule.every(REFETCH_PERIOD_H).hours.do(job)
|
||||
stop_run_continuously = run_continuously()
|
||||
|
||||
|
@ -7,3 +7,5 @@ POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "lenenergo")
|
||||
POSTGRES_DB = os.environ.get("POSTGRES_DB", "lenenergo")
|
||||
POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")
|
||||
POSTGRES_PORT = int(os.environ.get("POSTGRES_PORT", "5432"))
|
||||
|
||||
DB_URL = os.environ.get("DB_URL", None)
|
||||
|
@ -1,25 +0,0 @@
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from . import models, schemas
|
||||
|
||||
|
||||
def create_record(db: Session, record: schemas.Record):
|
||||
db_record = models.Record(
|
||||
region=record.region,
|
||||
area=record.area,
|
||||
town=record.town,
|
||||
street=record.street,
|
||||
start=record.start,
|
||||
finish=record.finish,
|
||||
branch=record.branch,
|
||||
res=record.res,
|
||||
comment=record.comment,
|
||||
building_id=record.building_id,
|
||||
lat=record.lat,
|
||||
lng=record.lng,
|
||||
)
|
||||
db.add(db_record)
|
||||
db.commit()
|
||||
db.refresh(db_record)
|
||||
|
||||
return db_record
|
@ -1,37 +1,19 @@
|
||||
from typing import Generator
|
||||
|
||||
from sqlalchemy import URL, create_engine
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from .config import (
|
||||
POSTGRES_DB,
|
||||
POSTGRES_HOST,
|
||||
POSTGRES_PASSWORD,
|
||||
POSTGRES_PORT,
|
||||
POSTGRES_USER,
|
||||
DB_URL,
|
||||
)
|
||||
|
||||
engine = create_engine(
|
||||
URL.create(
|
||||
"postgresql+psycopg",
|
||||
username=POSTGRES_USER,
|
||||
password=POSTGRES_PASSWORD,
|
||||
host=POSTGRES_HOST,
|
||||
port=POSTGRES_PORT,
|
||||
database=POSTGRES_DB,
|
||||
),
|
||||
client_encoding="utf8",
|
||||
)
|
||||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||
db_credentials = {"conninfo": DB_URL}
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
# Dependency
|
||||
def get_db() -> Generator[Session, None, None]:
|
||||
db = SessionLocal()
|
||||
try:
|
||||
yield db
|
||||
finally:
|
||||
db.close()
|
||||
if not DB_URL:
|
||||
db_credentials = {
|
||||
"host": POSTGRES_HOST,
|
||||
"port": POSTGRES_PORT,
|
||||
"dbname": POSTGRES_DB,
|
||||
"user": POSTGRES_USER,
|
||||
"password": POSTGRES_PASSWORD,
|
||||
}
|
||||
|
@ -1,28 +1,41 @@
|
||||
from datetime import datetime
|
||||
|
||||
import pandas as pd
|
||||
from parser import pipeline
|
||||
|
||||
from . import models
|
||||
from .database import get_db
|
||||
import pandas as pd
|
||||
import psycopg
|
||||
|
||||
from .database import db_credentials
|
||||
|
||||
sql_statement = """COPY records (
|
||||
index,
|
||||
region,
|
||||
area,
|
||||
town,
|
||||
street,
|
||||
branch,
|
||||
res,
|
||||
comment,
|
||||
building_id,
|
||||
lat,
|
||||
lng,
|
||||
start,
|
||||
finish
|
||||
) FROM STDIN"""
|
||||
|
||||
|
||||
def job():
|
||||
fetch_start = datetime.now()
|
||||
print("Starting refetch job: " + fetch_start.isoformat())
|
||||
|
||||
db = next(get_db())
|
||||
|
||||
parser = pipeline()
|
||||
|
||||
db.query(models.Record).delete()
|
||||
db.commit()
|
||||
|
||||
print("Rewriting db: " + datetime.now().isoformat())
|
||||
|
||||
for i, row in parser.df.iterrows():
|
||||
row = row.where((pd.notnull(row)), None)
|
||||
db.add(models.Record(**row.to_dict()))
|
||||
db.commit()
|
||||
with psycopg.connect(**db_credentials) as connection:
|
||||
with connection.cursor() as cursor:
|
||||
with cursor.copy(sql_statement) as copy:
|
||||
for _, row in parser.df.iterrows():
|
||||
row = row.where((pd.notnull(row)), None)
|
||||
copy.write_row(row.to_list())
|
||||
|
||||
print(f"Fetched in {datetime.now() - fetch_start}\n{parser}")
|
||||
|
@ -1,22 +0,0 @@
|
||||
from sqlalchemy import Column, DateTime, Float, Integer, String
|
||||
|
||||
from .database import Base
|
||||
|
||||
|
||||
class Record(Base):
|
||||
__tablename__ = "records"
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
index = Column(Integer)
|
||||
region = Column(String, nullable=True)
|
||||
area = Column(String, nullable=True)
|
||||
town = Column(String, nullable=True)
|
||||
street = Column(String, nullable=True)
|
||||
start = Column(DateTime)
|
||||
finish = Column(DateTime)
|
||||
branch = Column(String, nullable=True)
|
||||
res = Column(String, nullable=True)
|
||||
comment = Column(String, nullable=True)
|
||||
building_id = Column(Integer, nullable=True)
|
||||
lat = Column(Float, nullable=True)
|
||||
lng = Column(Float, nullable=True)
|
Loading…
x
Reference in New Issue
Block a user