lenengro_parser/runner/controller.py

110 lines
2.8 KiB
Python

import datetime
from functools import reduce
from typing import List, Optional
from fastapi import HTTPException
from parser import get_building_id
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import BinaryExpression
from . import models, schemas
def create_record(db: Session, record: schemas.Record):
db_record = models.Record(
region=record.region,
area=record.area,
town=record.town,
street=record.street,
start=record.start,
finish=record.finish,
branch=record.branch,
res=record.res,
comment=record.comment,
building_id=record.building_id,
lat=record.lat,
lng=record.lng,
)
db.add(db_record)
db.commit()
db.refresh(db_record)
return db_record
def contains_lower(name, val):
if isinstance(val, str):
return getattr(models.Record, name).icontains(val)
else:
return getattr(models.Record, name) == val
def and_if_can(a: BinaryExpression, b: Optional[BinaryExpression]) -> BinaryExpression:
if b is not None:
return a & b
else:
return a
def search_each(db: Session, filters: schemas.RecordRequest) -> List[schemas.Record]:
query = None
if filters.start:
query = models.Record.start <= filters.start
if filters.finish:
query = and_if_can(models.Record.finish >= filters.finish, query)
filters = list(
filter(lambda x: x[1] is not None and x[0] not in ("start, finish"), filters)
)
query = reduce(
lambda acc, ftr: and_if_can(contains_lower(*ftr), acc), filters, query
)
if query is None:
res = db.query(models.Record).all()
else:
res = db.query(models.Record).filter(query).all()
return res
def search_all(db: Session, prompt: str) -> List[schemas.Record]:
prompt = prompt.strip()
query = reduce(
lambda acc, name: acc | contains_lower(name, prompt),
("region", "area", "town", "street", "branch", "res"),
contains_lower("comment", prompt),
)
building_id, *_ = get_building_id(prompt)
if building_id is not None:
query |= models.Record.building_id == building_id
res = db.query(models.Record).filter(query).all()
return res
def check_outage(db: Session, building_id: int) -> schemas.CheckResponse:
building_query = db.query(models.Record).filter(
(models.Record.building_id == building_id)
)
if building_query.count() == 0:
raise HTTPException(404, "No such building")
now = datetime.datetime.now()
res = building_query.filter(
(models.Record.start <= now) & (now <= models.Record.finish)
).first()
if res is None:
return {"is_outage": False}
return {"is_outage": True, "when_finish": res.finish}