112 lines
2.7 KiB
Python
112 lines
2.7 KiB
Python
from typing import List, Optional
|
|
from functools import reduce
|
|
import datetime
|
|
|
|
from fastapi import HTTPException
|
|
from sqlalchemy import func, True_
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy.sql import operators
|
|
from sqlalchemy.sql.expression import BinaryExpression
|
|
|
|
from . import models, schemas
|
|
|
|
|
|
def create_record(db: Session, record: schemas.Record):
|
|
db_record = models.Record(
|
|
region=record.region,
|
|
area=record.area,
|
|
town=record.town,
|
|
street=record.street,
|
|
start=record.start,
|
|
finish=record.finish,
|
|
branch=record.branch,
|
|
res=record.res,
|
|
comment=record.comment,
|
|
building_id=record.building_id,
|
|
lat=record.lat,
|
|
lng=record.lng,
|
|
)
|
|
db.add(db_record)
|
|
db.commit()
|
|
db.refresh(db_record)
|
|
|
|
return db_record
|
|
|
|
|
|
def contains_lower(name, val):
|
|
if type(val) == str:
|
|
return getattr(models.Record, name).icontains(val)
|
|
else:
|
|
return getattr(models.Record, name) == val
|
|
|
|
|
|
def and_if_can(a: BinaryExpression, b: Optional[BinaryExpression]):
|
|
if b is not None:
|
|
return a & b
|
|
else:
|
|
return a
|
|
|
|
|
|
def search_each(db: Session, filters: schemas.RecordRequest) -> List[schemas.Record]:
|
|
query = None
|
|
|
|
if filters.start:
|
|
query = (models.Record.start <= filters.start)
|
|
if filters.finish:
|
|
query = and_if_can(models.Record.finish >= filters.finish, query)
|
|
|
|
filters = list(
|
|
filter(lambda x: x[1] is not None and x[0] not in ('start, finish'), filters))
|
|
|
|
query = reduce(lambda acc, ftr: and_if_can(
|
|
contains_lower(*ftr), acc), filters, query)
|
|
|
|
if query is None:
|
|
res = db.query(models.Record).all()
|
|
|
|
res = db.query(models.Record).filter(query).all()
|
|
|
|
return res
|
|
|
|
|
|
def search_all(db: Session, prompt: str) -> List[schemas.Record]:
|
|
prompt = prompt.strip()
|
|
|
|
query = reduce(lambda acc, name: acc | contains_lower(name, prompt), (
|
|
'region',
|
|
'area',
|
|
'town',
|
|
'street',
|
|
'branch',
|
|
'res'
|
|
), contains_lower('comment', prompt))
|
|
|
|
res = db.query(models.Record).filter(query).all()
|
|
|
|
return res
|
|
|
|
|
|
def check_outage(db: Session, building_id: int) -> schemas.CheckResponse:
|
|
building_query = db.query(models.Record).filter(
|
|
(models.Record.building_id == building_id))
|
|
|
|
if building_query.count() == 0:
|
|
raise HTTPException(404, 'No such building')
|
|
|
|
now = datetime.datetime.now()
|
|
|
|
res = building_query.filter(
|
|
(models.Record.start <= now) &
|
|
(now <= models.Record.finish)
|
|
).first()
|
|
|
|
if res is None:
|
|
return {
|
|
'is_outage': False
|
|
}
|
|
|
|
return {
|
|
'is_outage': True,
|
|
'when_finish': res.finish
|
|
}
|