obsolete filter fixed

This commit is contained in:
DmitryGantimurov 2023-08-10 19:51:11 +03:00
parent 02db525e8b
commit 53f91567e2
5 changed files with 300 additions and 273 deletions

259
back/api.py Normal file
View File

@ -0,0 +1,259 @@
#подключение библиотек
from fastapi import FastAPI, Response, Path, Depends, Body, Form, Query, status, HTTPException, APIRouter, UploadFile, File
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
from fastapi.templating import Jinja2Templates
from fastapi.requests import Request
from pydantic import json
from typing import Any, Annotated, List, Union
from starlette.staticfiles import StaticFiles
import requests
from uuid import uuid4
import random
import datetime
import asyncio
import ast
import pathlib
import shutil
import os
from .db import Base, engine, SessionLocal, database, Session
from .service import add_poems_to_db, generate_poem, check_obsolete, get_query_results
from .scheduler import app as app_rocketry
from . import schemas, models, utils
Base.metadata.create_all(bind=engine)
app = FastAPI()
templates = Jinja2Templates(directory="./front/dist")
app.mount("/static", StaticFiles(directory = "./front/dist"))
if not os.path.exists("./uploads"):
os.mkdir("./uploads")
app.mount("/uploads", StaticFiles(directory = "./uploads"))
# Записываем стихи в базу данных, если их еще нет (запускать только если стихов в базе нет).
# add_poems_to_db(database)
@app.get("/api/announcements", response_model=List[schemas.Announcement])#адрес объявлений
def announcements_list(obsolete: Union[bool, None] = False, user_id: Union[int, None] = None, metro: Union[str, None] = None,
category: Union[str, None] = None):
# параметры для сортировки (схема pydantic schemas.SortAnnouncements)
params_to_sort = schemas.SortAnnouncements(obsolete=obsolete, user_id=user_id, metro=metro, category=category)
# получаем результат
result = get_query_results(db=database, schema=params_to_sort)
return result
@app.get("/api/announcement")#адрес объявления
def single_announcement(user_id:int):
# Считываем данные из Body и отображаем их на странице.
# В последствии будем вставлять данные в html-форму
try:
announcement = database.get(models.Announcement, user_id)
return {"id": announcement.id, "user_id": announcement.user_id, "name": announcement.name,
"category": announcement.category, "best_by": announcement.best_by, "address": announcement.address,
"description": announcement.description, "metro": announcement.metro, "latitude": announcement.latitude,
"longtitude":announcement.longtitude, "trashId": announcement.trashId, "src":announcement.src,
"booked_by":announcement.booked_by}
except:
return {"Answer" : False} #если неуданый доступ, то сообщаем об этом
# Занести объявление в базу данных
@app.put("/api/announcement")#адрес объявлений
def put_in_db(name: Annotated[str, Form()], category: Annotated[str, Form()], bestBy: Annotated[datetime.date, Form()],
address: Annotated[str, Form()], longtitude: Annotated[float, Form()], latitude: Annotated[float, Form()],
description: Annotated[str, Form()], metro: Annotated[str, Form()], current_user: Annotated[schemas.User, Depends(utils.get_current_active_user)],
src: Union[UploadFile, None] = None, trashId: Annotated[int, Form()] = None):
try:
uploaded_name = ""
if src:
f = src.file
f.seek(0, os.SEEK_END)
if f.tell() > 0:
f.seek(0)
destination = pathlib.Path("./uploads/" + str(hash(f)) + pathlib.Path(src.filename).suffix.lower())
with destination.open('wb') as buffer:
shutil.copyfileobj(f, buffer)
uploaded_name = "/uploads/"+destination.name
temp_ancmt = models.Announcement(user_id=current_user.id, name=name, category=category, best_by=bestBy,
address=address, longtitude=longtitude, latitude=latitude, description=description, metro=metro,
trashId=trashId, booked_by=0)
database.add(temp_ancmt) # добавляем в бд
database.commit() # сохраняем изменения
database.refresh(temp_ancmt) # обновляем состояние объекта
return {"Answer" : True}
except:
return {"Answer" : False}
# Удалить объявления из базы
@app.delete("/api/announcement") #адрес объявления
def delete_from_db(announcement: schemas.DelAnnouncement): # функция удаления объекта из БД
try:
to_delete = database.query(models.Announcement).filter(models.Announcement.id==announcement.id).first()
database.delete(to_delete) # удаление из БД
database.commit() # сохраняем изменения
return {"Answer" : True}
except:
return {"Answer" : False}
# Забронировать объявление
@app.post("/api/book")
def change_book_status(data: schemas.Book):
try:
# Находим объявление по данному id
announcement_to_change = database.query(models.Announcement).filter(Announcement.id == data.id).first()
# Изменяем поле booked_status на полученный id
announcement_to_change.booked_status += 1
return {"Success": True}
except:
return {"Success": False}
# reginstration
@app.post("/api/signup")
def create_user(nickname: Annotated[str, Form()], password: Annotated[str, Form()], name: Annotated[str, Form()]=None,
surname: Annotated[str, Form()]=None, avatar: Annotated[UploadFile, Form()]=None):
if database.query(models.User).filter(models.User.nickname == nickname).first() == None:
new_user = models.User(nickname=nickname, hashed_password=utils.get_password_hash(password),
name=name, surname=surname, reg_date=datetime.date.today())
database.add(new_user)
database.commit()
database.refresh(new_user) # обновляем состояние объекта
return {"Success": True}
return {"Success": False, "Message": "Пользователь с таким email уже зарегестрирован"}
@app.post("/api/token", response_model=schemas.Token)
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
):
user = utils.authenticate_user(database, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = utils.timedelta(minutes=utils.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = utils.create_access_token(
data={"user_id": user.id}, expires_delta=access_token_expires
)
return {"access_token":access_token}
@app.get("/api/users/me", response_model=schemas.User) #
async def read_users_me(current_user: Annotated[schemas.User, Depends(utils.get_current_active_user)]):
return current_user
@app.get("/api/users/me/items")
async def read_own_items(
current_user: Annotated[schemas.User, Depends(utils.get_current_active_user)]
):
return [{"Current user name": current_user.name, "Current user surname": current_user.surname}]
# изменяем рейтинг пользователя
@app.post("/api/user/rating")
def add_points(data: schemas.AddRating, current_user: Annotated[schemas.User, Depends(utils.get_current_user)]):
if current_user.id != data.user_id:
user = utils.get_user_by_id(db=database, user_id=data.user_id)
if not user:
raise HTTPException(status_code=404, detail="Item not found")
user.rating = (user.rating*user.num_of_ratings + data.rate)/(user.num_of_ratings + 1)
user.num_of_ratings += 1
database.commit()
database.refresh(user) # обновляем состояние объекта
return {"Success": True}
# получаем рейтинг пользователя
@app.get("/api/user/rating")
def add_points(user_id: int, db: Annotated[Session, Depends(utils.get_db)]):
user = utils.get_user_by_id(db, user_id=user_id)
if not user:
raise HTTPException(status_code=404, detail="Item not found")
return {"rating": user.rating}
# Отправляем стихи
@app.get("/api/user/poem") # пока не работает
def poems_to_front(): # db: Annotated[Session, Depends(utils.get_db)]
kolvo_stixov = database.query(models.Poems).count() # пока количество стихотворений = 101
if kolvo_stixov > 1:
rand_id = random.randint(1, kolvo_stixov) # номер стихотворения
poem_json = dict()
poem = database.query(models.Poems).filter(models.Poems.id == rand_id).first()
poem_json = {"id": rand_id, "title": poem.title, "text": poem.text, "author": poem.author}
return poem_json
else:
raise HTTPException(status_code=404, detail="Poems not found")
@app.get("/api/trashbox", response_model=List[schemas.TrashboxResponse])
def get_trashboxes(data: schemas.TrashboxRequest):#крутая функция для работы с api
BASE_URL='https://geointelect2.gate.petersburg.ru'#адрес сайта и мой токин
my_token='eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJhU1RaZm42bHpTdURYcUttRkg1SzN5UDFhT0FxUkhTNm9OendMUExaTXhFIn0.eyJleHAiOjE3ODM3ODk4NjgsImlhdCI6MTY4OTA5NTQ2OCwianRpIjoiNDUzNjQzZTgtYTkyMi00NTI4LWIzYmMtYWJiYTNmYjkyNTkxIiwiaXNzIjoiaHR0cHM6Ly9rYy5wZXRlcnNidXJnLnJ1L3JlYWxtcy9lZ3MtYXBpIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6ImJjYjQ2NzljLTU3ZGItNDU5ZC1iNWUxLWRlOGI4Yzg5MTMwMyIsInR5cCI6IkJlYXJlciIsImF6cCI6ImFkbWluLXJlc3QtY2xpZW50Iiwic2Vzc2lvbl9zdGF0ZSI6ImM2ZDJiOTZhLWMxNjMtNDAxZS05ZjMzLTI0MmE0NDcxMDY5OCIsImFjciI6IjEiLCJhbGxvd2VkLW9yaWdpbnMiOlsiLyoiXSwicmVhbG1fYWNjZXNzIjp7InJvbGVzIjpbImRlZmF1bHQtcm9sZXMtZWdzLWFwaSIsIm9mZmxpbmVfYWNjZXNzIiwidW1hX2F1dGhvcml6YXRpb24iXX0sInJlc291cmNlX2FjY2VzcyI6eyJhY2NvdW50Ijp7InJvbGVzIjpbIm1hbmFnZS1hY2NvdW50IiwibWFuYWdlLWFjY291bnQtbGlua3MiLCJ2aWV3LXByb2ZpbGUiXX19LCJzY29wZSI6ImVtYWlsIHByb2ZpbGUiLCJzaWQiOiJjNmQyYjk2YS1jMTYzLTQwMWUtOWYzMy0yNDJhNDQ3MTA2OTgiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsIm5hbWUiOiLQktC70LDQtNC40LzQuNGAINCv0LrQvtCy0LvQtdCyIiwicHJlZmVycmVkX3VzZXJuYW1lIjoiZTBmYzc2OGRhOTA4MjNiODgwZGQzOGVhMDJjMmQ5NTciLCJnaXZlbl9uYW1lIjoi0JLQu9Cw0LTQuNC80LjRgCIsImZhbWlseV9uYW1lIjoi0K_QutC-0LLQu9C10LIifQ.E2bW0B-c6W5Lj63eP_G8eI453NlDMnW05l11TZT0GSsAtGayXGaolHtWrmI90D5Yxz7v9FGkkCmcUZYy1ywAdO9dDt_XrtFEJWFpG-3csavuMjXmqfQQ9SmPwDw-3toO64NuZVv6qVqoUlPPj57sLx4bLtVbB4pdqgyJYcrDHg7sgwz4d1Z3tAeUfSpum9s5ZfELequfpLoZMXn6CaYZhePaoK-CxeU3KPBPTPOVPKZZ19s7QY10VdkxLULknqf9opdvLs4j8NMimtwoIiHNBFlgQz10Cr7bhDKWugfvSRsICouniIiBJo76wrj5T92s-ztf1FShJuqnQcKE_QLd2A'
head = {'Authorization': 'Bearer {}'.format(my_token)}
my_data={
'x' : f"{data.Lng}",
'y' : f"{data.Lat}",
'limit' : '1'
}
response = requests.post(f"{BASE_URL}/nearest_recycling/get", headers=head, data=my_data)
infos = response.json()
trashboxes = []
for trashbox in infos["results"]:
temp_dict = {}
for obj in trashbox["Objects"]:
coord_list = obj["geometry"]
temp_dict["Lat"] = coord_list["coordinates"][1]
temp_dict["Lng"] = coord_list["coordinates"][0]
properties = obj["properties"]
temp_dict["Name"] = properties["title"]
temp_dict["Address"] = properties["address"]
temp_dict["Categories"] = properties["content_text"].split(',')
trashboxes.append(temp_dict)
uniq_trashboxes = [schemas.TrashboxResponse(**ast.literal_eval(el1)) for el1 in set([str(el2) for el2 in trashboxes])]
return uniq_trashboxes
@app.get("/{rest_of_path:path}")
async def react_app(req: Request, rest_of_path: str):
return templates.TemplateResponse('index.html', { 'request': req })
@app.post("/api/announcement/dispose")
def dispose(data: schemas.DisposeRequest, current_user_schema: Annotated[schemas.User, Depends(utils.get_current_user)],
db: Annotated[Session, Depends(utils.get_db)]):
# Находим в бд текущего юзера
current_user = utils.get_user_by_id(database, current_user_schema.id)
# Начисляем баллы пользователю за утилизацию
current_user.points += 60
# В полученном json переходим к данным мусорки
data_trashbox = data.trashbox
new_trashox = models.Trashbox(user_id=current_user.id, date_of_choice=datetime.date.today(), name=data_trashbox.Name,
latitude=data_trashbox.Lat, longtitude=data_trashbox.Lng, address=data_trashbox.Address, categories=data_trashbox.Category)
db.add(new_trashox)
db.commit()
db.refresh(new_trashox) # обновляем состояние объекта

View File

@ -1,259 +1,27 @@
#подключение библиотек import asyncio
from fastapi import FastAPI, Response, Path, Depends, Body, Form, Query, status, HTTPException, APIRouter, UploadFile, File import uvicorn
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
from fastapi.templating import Jinja2Templates
from fastapi.requests import Request
from pydantic import json from .api import app as app_fastapi
from typing import Any, Annotated, List, Union
from starlette.staticfiles import StaticFiles
import requests
from uuid import uuid4
import random
import datetime
import ast
import pathlib
import shutil
import os
from .db import Base, engine, SessionLocal, database
from .service import add_poems_to_db, generate_poem, check_obsolete, get_query_results
from .scheduler import app as app_rocketry from .scheduler import app as app_rocketry
from . import schemas, models, utils
Base.metadata.create_all(bind=engine) class Server(uvicorn.Server):
"""Customized uvicorn.Server
app = FastAPI() Uvicorn server overrides signals and we need to include
Rocketry to the signals."""
templates = Jinja2Templates(directory="./front/dist") def handle_exit(self, sig: int, frame) -> None:
app_rocketry.session.shut_down()
app.mount("/static", StaticFiles(directory = "./front/dist")) return super().handle_exit(sig, frame)
if not os.path.exists("./uploads"):
os.mkdir("./uploads")
app.mount("/uploads", StaticFiles(directory = "./uploads"))
# Записываем стихи в базу данных, если их еще нет (запускать только если стихов в базе нет).
# add_poems_to_db(database)
# Каждый день в 00.00 проверяем все объявления (сравниваем текущую дату и срок годности) async def main():
# session_rocketry = app_rocketry.session "Run scheduler and the API"
server = Server(config=uvicorn.Config(app_fastapi, workers=1, loop="asyncio"))
api = asyncio.create_task(server.serve())
sched = asyncio.create_task(app_rocketry.serve())
@app.get("/api/announcements", response_model=List[schemas.Announcement])#адрес объявлений await asyncio.wait([sched, api])
def announcements_list(obsolete: Union[bool, None] = False, user_id: Union[int, None] = None, metro: Union[str, None] = None,
category: Union[str, None] = None):
# Считываем данные из Body и отображаем их на странице.
# В последствии будем вставлять данные в html-форму
params_to_sort = schemas.SortAnnouncements(obsolete=obsolete, user_id=user_id, metro=metro, category=category)
# Фильтруем по сроку годности
# not_expired = check_obsolete(current_date=datetime.date.today())
# Фильтруем по другим параметрам и делаем пересечение с not_expired
# result = not_expired.intersect(get_query_results(params_to_sort))
result = get_query_results(db=database, schema=params_to_sort)
return result if __name__ == "__main__":
asyncio.run(main())
@app.get("/api/announcement")#адрес объявления
def single_announcement(user_id:int):
# Считываем данные из Body и отображаем их на странице.
# В последствии будем вставлять данные в html-форму
try:
announcement = database.get(models.Announcement, user_id)
return {"id": announcement.id, "user_id": announcement.user_id, "name": announcement.name,
"category": announcement.category, "best_by": announcement.best_by, "address": announcement.address,
"description": announcement.description, "metro": announcement.metro, "latitude": announcement.latitude,
"longtitude":announcement.longtitude, "trashId": announcement.trashId, "src":announcement.src,
"booked_by":announcement.booked_by}
except:
return {"Answer" : False} #если неуданый доступ, то сообщаем об этом
# Занести объявление в базу данных
@app.put("/api/announcement")#адрес объявлений
def put_in_db(name: Annotated[str, Form()], category: Annotated[str, Form()], bestBy: Annotated[datetime.date, Form()],
address: Annotated[str, Form()], longtitude: Annotated[float, Form()], latitude: Annotated[float, Form()],
description: Annotated[str, Form()], metro: Annotated[str, Form()], current_user: Annotated[schemas.User, Depends(utils.get_current_active_user)],
src: Union[UploadFile, None] = None, trashId: Annotated[int, Form()] = None):
try:
uploaded_name = ""
if src:
f = src.file
f.seek(0, os.SEEK_END)
if f.tell() > 0:
f.seek(0)
destination = pathlib.Path("./uploads/" + str(hash(f)) + pathlib.Path(src.filename).suffix.lower())
with destination.open('wb') as buffer:
shutil.copyfileobj(f, buffer)
uploaded_name = "/uploads/"+destination.name
temp_ancmt = models.Announcement(user_id=current_user.id, name=name, category=category, best_by=bestBy,
address=address, longtitude=longtitude, latitude=latitude, description=description, metro=metro,
trashId=trashId, booked_by=0)
database.add(temp_ancmt) # добавляем в бд
database.commit() # сохраняем изменения
database.refresh(temp_ancmt) # обновляем состояние объекта
return {"Answer" : True}
except:
return {"Answer" : False}
# Удалить объявления из базы
@app.delete("/api/announcement") #адрес объявления
def delete_from_db(announcement: schemas.DelAnnouncement): # функция удаления объекта из БД
try:
to_delete = database.query(models.Announcement).filter(models.Announcement.id==announcement.id).first()
database.delete(to_delete) # удаление из БД
database.commit() # сохраняем изменения
return {"Answer" : True}
except:
return {"Answer" : False}
# Забронировать объявление
@app.post("/api/book")
def change_book_status(data: schemas.Book):
try:
# Находим объявление по данному id
announcement_to_change = database.query(models.Announcement).filter(Announcement.id == data.id).first()
# Изменяем поле booked_status на полученный id
announcement_to_change.booked_status += 1
return {"Success": True}
except:
return {"Success": False}
# reginstration
@app.post("/api/signup")
def create_user(nickname: Annotated[str, Form()], password: Annotated[str, Form()], name: Annotated[str, Form()]=None,
surname: Annotated[str, Form()]=None, avatar: Annotated[UploadFile, Form()]=None):
if database.query(models.User).filter(models.User.nickname == nickname).first() == None:
new_user = models.User(nickname=nickname, hashed_password=utils.get_password_hash(password),
name=name, surname=surname, reg_date=datetime.date.today())
database.add(new_user)
database.commit()
database.refresh(new_user) # обновляем состояние объекта
return {"Success": True}
return {"Success": False, "Message": "Пользователь с таким email уже зарегестрирован"}
@app.post("/api/token", response_model=schemas.Token)
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
):
user = utils.authenticate_user(database, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = utils.timedelta(minutes=utils.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = utils.create_access_token(
data={"user_id": user.id}, expires_delta=access_token_expires
)
return {"access_token":access_token}
@app.get("/api/users/me", response_model=schemas.User) #
async def read_users_me(current_user: Annotated[schemas.User, Depends(utils.get_current_active_user)]):
return current_user
@app.get("/api/users/me/items")
async def read_own_items(
current_user: Annotated[schemas.User, Depends(utils.get_current_active_user)]
):
return [{"Current user name": current_user.name, "Current user surname": current_user.surname}]
# изменяем рейтинг пользователя
@app.post("/api/user/rating")
def add_points(data: schemas.AddRating, current_user: Annotated[schemas.User, Depends(utils.get_current_user)]):
if current_user.id != data.user_id:
user = utils.get_user_by_id(db=database, user_id=data.user_id)
if not user:
raise HTTPException(status_code=404, detail="Item not found")
user.rating = (user.rating*user.num_of_ratings + data.rate)/(user.num_of_ratings + 1)
user.num_of_ratings += 1
database.commit()
database.refresh(user) # обновляем состояние объекта
return {"Success": True}
# получаем рейтинг пользователя
@app.get("/api/user/rating")
def add_points(user_id: int):
user = utils.get_user_by_id(db=database, user_id=user_id)
if not user:
raise HTTPException(status_code=404, detail="Item not found")
return {"rating": user.rating}
# Отправляем стихи
@app.get("/api/user/poem") # пока не работает
def poems_to_front(): # db: Annotated[Session, Depends(utils.get_db)]
kolvo_stixov = database.query(models.Poems).count() # пока количество стихотворений = 101
if kolvo_stixov > 1:
rand_id = random.randint(1, kolvo_stixov) # номер стихотворения
poem_json = dict()
poem = database.query(models.Poems).filter(models.Poems.id == rand_id).first()
poem_json = {"id": rand_id, "title": poem.title, "text": poem.text, "author": poem.author}
return poem_json
else:
raise HTTPException(status_code=404, detail="Poems not found")
@app.get("/api/trashbox", response_model=List[schemas.TrashboxResponse])
def get_trashboxes(lat:float, lng:float):#крутая функция для работы с api
BASE_URL='https://geointelect2.gate.petersburg.ru'#адрес сайта и мой токин
my_token='eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJhU1RaZm42bHpTdURYcUttRkg1SzN5UDFhT0FxUkhTNm9OendMUExaTXhFIn0.eyJleHAiOjE3ODM3ODk4NjgsImlhdCI6MTY4OTA5NTQ2OCwianRpIjoiNDUzNjQzZTgtYTkyMi00NTI4LWIzYmMtYWJiYTNmYjkyNTkxIiwiaXNzIjoiaHR0cHM6Ly9rYy5wZXRlcnNidXJnLnJ1L3JlYWxtcy9lZ3MtYXBpIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6ImJjYjQ2NzljLTU3ZGItNDU5ZC1iNWUxLWRlOGI4Yzg5MTMwMyIsInR5cCI6IkJlYXJlciIsImF6cCI6ImFkbWluLXJlc3QtY2xpZW50Iiwic2Vzc2lvbl9zdGF0ZSI6ImM2ZDJiOTZhLWMxNjMtNDAxZS05ZjMzLTI0MmE0NDcxMDY5OCIsImFjciI6IjEiLCJhbGxvd2VkLW9yaWdpbnMiOlsiLyoiXSwicmVhbG1fYWNjZXNzIjp7InJvbGVzIjpbImRlZmF1bHQtcm9sZXMtZWdzLWFwaSIsIm9mZmxpbmVfYWNjZXNzIiwidW1hX2F1dGhvcml6YXRpb24iXX0sInJlc291cmNlX2FjY2VzcyI6eyJhY2NvdW50Ijp7InJvbGVzIjpbIm1hbmFnZS1hY2NvdW50IiwibWFuYWdlLWFjY291bnQtbGlua3MiLCJ2aWV3LXByb2ZpbGUiXX19LCJzY29wZSI6ImVtYWlsIHByb2ZpbGUiLCJzaWQiOiJjNmQyYjk2YS1jMTYzLTQwMWUtOWYzMy0yNDJhNDQ3MTA2OTgiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsIm5hbWUiOiLQktC70LDQtNC40LzQuNGAINCv0LrQvtCy0LvQtdCyIiwicHJlZmVycmVkX3VzZXJuYW1lIjoiZTBmYzc2OGRhOTA4MjNiODgwZGQzOGVhMDJjMmQ5NTciLCJnaXZlbl9uYW1lIjoi0JLQu9Cw0LTQuNC80LjRgCIsImZhbWlseV9uYW1lIjoi0K_QutC-0LLQu9C10LIifQ.E2bW0B-c6W5Lj63eP_G8eI453NlDMnW05l11TZT0GSsAtGayXGaolHtWrmI90D5Yxz7v9FGkkCmcUZYy1ywAdO9dDt_XrtFEJWFpG-3csavuMjXmqfQQ9SmPwDw-3toO64NuZVv6qVqoUlPPj57sLx4bLtVbB4pdqgyJYcrDHg7sgwz4d1Z3tAeUfSpum9s5ZfELequfpLoZMXn6CaYZhePaoK-CxeU3KPBPTPOVPKZZ19s7QY10VdkxLULknqf9opdvLs4j8NMimtwoIiHNBFlgQz10Cr7bhDKWugfvSRsICouniIiBJo76wrj5T92s-ztf1FShJuqnQcKE_QLd2A'
head = {'Authorization': 'Bearer {}'.format(my_token)}
my_data={
'x' : f"{lng}",
'y' : f"{lat}",
'limit' : '1'
}
response = requests.post(f"{BASE_URL}/nearest_recycling/get", headers=head, data=my_data)
infos = response.json()
trashboxes = []
for trashbox in infos["results"]:
temp_dict = {}
for obj in trashbox["Objects"]:
coord_list = obj["geometry"]
temp_dict["Lat"] = coord_list["coordinates"][1]
temp_dict["Lng"] = coord_list["coordinates"][0]
properties = obj["properties"]
temp_dict["Name"] = properties["title"]
temp_dict["Address"] = properties["address"]
temp_dict["Categories"] = properties["content_text"].split(',')
trashboxes.append(temp_dict)
uniq_trashboxes = [ast.literal_eval(el1) for el1 in set([str(el2) for el2 in trashboxes])]
return JSONResponse(uniq_trashboxes)
@app.get("/{rest_of_path:path}")
async def react_app(req: Request, rest_of_path: str):
return templates.TemplateResponse('index.html', { 'request': req })
@app.post("/api/announcement/dispose")
def dispose(data: schemas.DisposeRequest, current_user_schema: Annotated[schemas.User, Depends(utils.get_current_user)]):
current_user = utils.get_user_by_id(database, current_user_schema.id)
current_user.points += 60
new_trashox = models.Trashbox(user_id=current_user.id, date_of_choice=datetime.date.today(), name=data.Name,
latitude=data.Lat, longtitude=data.Lng, address=data.Address, categories=data.Categories)
database.add(new_trashox)
database.commit()
database.refresh(new_trashox) # обновляем состояние объекта

View File

@ -2,12 +2,13 @@ from . import service
from rocketry import Rocketry from rocketry import Rocketry
from rocketry.conds import daily from rocketry.conds import daily
import datetime import datetime
from .db import database
app = Rocketry() app = Rocketry(execution="async")
# Create some tasks: # Create task:
@app.task('daily')
@app.task("daily") async def daily_check():
def daily_check(): # Фильтруем по сроку годности
service.check_obsolete(current_date=datetime.date.today()) service.check_obsolete(database, current_date=datetime.date.today())

View File

@ -22,29 +22,16 @@ class Announcement(BaseModel):
longtitude: float longtitude: float
latitude: float latitude: float
description: str description: str
# src: Union[UploadFile, None] = None #изображение продукта в объявлении
src: Union[str, None] = None #изображение продукта в объявлении src: Union[str, None] = None #изображение продукта в объявлении
metro: str #ближайщее метро от адреса нахождения продукта metro: str #ближайщее метро от адреса нахождения продукта
trashId: Union[int, None] = None trashId: Union[int, None] = None
booked_by: Union[int, None] = 0 #статус бронирования (либо 0, либо айди бронирующего) booked_by: Union[int, None] = 0 #статус бронирования (либо 0, либо айди бронирующего)
obsolete: bool
class Config: class Config:
orm_mode = True orm_mode = True
arbitrary_types_allowed=True arbitrary_types_allowed=True
# Не используется
class PutAnnInDB(BaseModel):
name: Annotated[str, Form()]
category: Annotated[str, Form()]
best_by: Annotated[date, Form()]
address: Annotated[str, Form()]
longtitude: Annotated[float, Form()]
latitude: Annotated[float, Form()]
description: Annotated[str, Form()]
src: Annotated[Union[UploadFile, None], Form()] = None #изображение продукта в объявлении
metro: Annotated[str, Form()] #ближайщее метро от адреса нахождения продукта
class Token(BaseModel): class Token(BaseModel):
access_token: str access_token: str
@ -74,6 +61,7 @@ class Poem(BaseModel):
text: str text: str
author: str author: str
# Для "/api/trashbox"
class TrashboxBase(BaseModel): class TrashboxBase(BaseModel):
Lat: float Lat: float
Lng: float Lng: float
@ -86,10 +74,21 @@ class TrashboxResponse(TrashboxBase):
class TrashboxRequest(TrashboxBase): class TrashboxRequest(TrashboxBase):
Category: str Category: str
class DisposeRequest(TrashboxResponse):
# Для /api/announcement/dispose
class TrashboxSelected(BaseModel):
Lat: float
Lng: float
Name: str
Address: str
Category: str
class DisposeRequest(BaseModel):
ann_id: int ann_id: int
trashbox: TrashboxSelected
# схема для передачи параметров, по которым ведется фильтрация
class SortAnnouncements(BaseModel): class SortAnnouncements(BaseModel):
obsolete: Union[int, None] = False obsolete: Union[int, None] = False
user_id: Union[int, None] = None user_id: Union[int, None] = None

View File

@ -73,8 +73,8 @@ def check_obsolete(db: Annotated[Session, Depends(utils.get_db)], current_date:
announcements = db.query(models.Announcement).all() announcements = db.query(models.Announcement).all()
for ann in announcements: for ann in announcements:
if ann.best_by < current_date: if ann.best_by < current_date:
ann.obsolete = False
else:
ann.obsolete = True ann.obsolete = True
db.commit()
db.refresh(ann) # обновляем состояние объекта