dm1sh a2b0b25233
Fixed unknown category trashboxes
Added specific error for trashboxes token expiration
2023-09-14 01:30:51 +03:00

334 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#подключение библиотек
from fastapi import FastAPI, Depends, Form, status, HTTPException, APIRouter, UploadFile
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.templating import Jinja2Templates
from fastapi.requests import Request
from typing import Any, Annotated, List, Union
from starlette.staticfiles import StaticFiles
from sqlalchemy.orm import Session
from sqlalchemy import select
import requests
from uuid import uuid4
import random
import datetime
import asyncio
import ast
import pathlib
import shutil
import os
from . import add_poems_and_filters, auth_utils, orm_models, pydantic_schemas
from .config import TRASHBOXES_BASE_URL, TRASHBOXES_TOKEN
# создаем приложение Fastapi
app = FastAPI()
# Jinja2 - шаблоны
templates = Jinja2Templates(directory="./front/dist")
# хранение картинок для стихов
app.mount("/poem_pic", StaticFiles(directory = "./poem_pic"))
# создаем эндпоинт для хранения статических файлов
app.mount("/static", StaticFiles(directory = "./front/dist"))
# проверяем, что папка uploads еще не создана
if not os.path.exists("./uploads"):
os.mkdir("./uploads")
# создаем эндпоинт для хранения файлов пользователя
app.mount("/uploads", StaticFiles(directory = "./uploads"))
# эндпоинт для возвращения согласия в pdf
@app.get("/privacy_policy.pdf")
async def privacy_policy():
return FileResponse("./privacy_policy.pdf")
# получение списка объявлений
@app.get("/api/announcements", response_model=List[pydantic_schemas.Announcement])#адрес объявлений
async def announcements_list(db: Annotated[Session, Depends(auth_utils.get_session)], obsolete: Union[bool, None] = False, user_id: Union[int, None] = None,
metro: Union[str, None] = None,category: Union[str, None] = None):
# параметры для сортировки (схема pydantic schemas.SortAnnouncements)
params_to_sort = pydantic_schemas.SortAnnouncements(obsolete=obsolete, user_id=user_id, metro=metro, category=category)
# получаем результат
result = await add_poems_and_filters.filter_ann(db=db, schema=params_to_sort)
return result
# получаем данные одного объявления
@app.get("/api/announcement", response_model=pydantic_schemas.AnnResponce)
async def single_announcement(ann_id:int, db: Annotated[Session, Depends(auth_utils.get_session)]): # передаем индекс обявления
# Считываем данные из Body и отображаем их на странице.
# В последствии будем вставлять данные в html-форму
announcement = await db.get(orm_models.Announcement, ann_id)
#announcement = await db.execute(select(orm_models.Announcement)).scalars().all()
if not announcement:
raise HTTPException(status_code=404, detail="Item not found")
return announcement
# Занести объявление в базу данных
@app.put("/api/announcement")
async def put_in_db(name: Annotated[str, Form()], category: Annotated[str, Form()], bestBy: Annotated[datetime.date, Form()],
address: Annotated[str, Form()], longtitude: Annotated[float, Form()], latitude: Annotated[float, Form()],
description: Annotated[str, Form()], metro: Annotated[str, Form()], current_user: Annotated[pydantic_schemas.User, Depends(auth_utils.get_current_active_user)],
db: Annotated[Session, Depends(auth_utils.get_session)], src: Union[UploadFile, None] = None, trashId: Annotated[int, Form()] = None):
# имя загруженного файла по умолчанию - пустая строка
uploaded_name = ""
# если пользователь загрузил картинку
if src:
# процесс сохранения картинки
f = src.file
f.seek(0, os.SEEK_END)
if f.tell() > 0:
f.seek(0)
destination = pathlib.Path("./uploads/" + str(hash(f)) + pathlib.Path(src.filename).suffix.lower())
with destination.open('wb') as buffer:
shutil.copyfileobj(f, buffer)
# изменяем название директории загруженного файла
uploaded_name = "/uploads/" + destination.name
# создаем объект Announcement
temp_ancmt = orm_models.Announcement(user_id=current_user.id, name=name, category=category, best_by=bestBy,
address=address, longtitude=longtitude, latitude=latitude, description=description, metro=metro,
trashId=trashId, src=uploaded_name, booked_by=0)
try:
db.add(temp_ancmt) # добавляем в бд
await db.commit() # сохраняем изменения
await db.refresh(temp_ancmt) # обновляем состояние объекта
return {"Success": True}
except:
raise HTTPException(status_code=500, detail="problem with adding object to db")
# Удалить объявления из базы
@app.delete("/api/announcement") #адрес объявления
async def delete_from_db(announcement: pydantic_schemas.DelAnnouncement, db: Annotated[Session, Depends(auth_utils.get_session)]): # функция удаления объекта из БД
# находим объект с заданным id в бд
#to_delete = db.query(orm_models.Announcement).filter(orm_models.Announcement.id==announcement.id).first()
query = await db.execute(select(orm_models.Announcement).where(orm_models.Announcement.id==announcement.id))
to_delete = query.scalars().first()
if not to_delete:
raise HTTPException(status_code=404, detail="Item not found. Can't delete")
try:
await db.delete(to_delete) # удаление из БД
await db.commit() # сохраняем изменения
return {"Success": True}
except:
raise HTTPException(status_code=500, detail="Problem with adding to database")
# Забронировать объявление
@app.post("/api/book")
async def change_book_status(data: pydantic_schemas.Book, current_user: Annotated[pydantic_schemas.User, Depends(auth_utils.get_current_user)],
db: Annotated[Session, Depends(auth_utils.get_session)]):
# Находим объявление по данному id
#announcement_to_change = db.query(orm_models.Announcement).filter(orm_models.Announcement.id == data.id).first()
query = await db.execute(select(orm_models.Announcement).where(orm_models.Announcement.id == data.id))
announcement_to_change = query.scalars().first()
# Проверяем, что объявление с данным id существует
if not announcement_to_change:
raise HTTPException(status_code=404, detail="Item not found")
# Проверяем, что объявление бронирует не владелец
if current_user.id == announcement_to_change.user_id:
raise HTTPException(status_code=403, detail="A user can't book his announcement")
else:
# Инкрементируем поле booked_by на 1
announcement_to_change.booked_by += 1
# фиксируем изменения в бд
await db.commit()
await db.refresh(announcement_to_change)
return {"Success": True}
# reginstration
@app.post("/api/signup")
async def create_user(nickname: Annotated[str, Form()], password: Annotated[str, Form()], db: Annotated[Session, Depends(auth_utils.get_session)],
name: Annotated[str, Form()]=None, surname: Annotated[str, Form()]=None, avatar: Annotated[UploadFile, Form()]=None):
# проверяем, что юзера с введенным никнеймом не существует в бд
#if db.query(orm_models.User).filter(orm_models.User.nickname == nickname).first() == None:
query_user = await db.execute(select(orm_models.User).where(orm_models.User.nickname == nickname))
user_with_entered_nick = query_user.scalars().first()
if user_with_entered_nick == None:
# создаем нового юзера
new_user = orm_models.User(nickname=nickname, hashed_password=auth_utils.get_password_hash(password),
name=name, surname=surname, reg_date=datetime.date.today())
# добавляем в бд
db.add(new_user)
await db.commit()
await db.refresh(new_user) # обновляем состояние объекта
return {"Success": True}
return {"Success": False, "Message": "Пользователь с таким email уже зарегестрирован"}
# функция для генерации токена после успешного входа пользователя
@app.post("/api/token", response_model=pydantic_schemas.Token)
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: Annotated[Session, Depends(auth_utils.get_session)]
):
# пробуем найти юзера в бд по введенным паролю и никнейму
user = await auth_utils.authenticate_user(db, form_data.username, form_data.password)
# если не нашли - кидаем ошибку
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# задаем временной интервал, в течение которого токен можно использовать
access_token_expires = auth_utils.timedelta(minutes=auth_utils.ACCESS_TOKEN_EXPIRE_MINUTES)
# создаем токен
access_token = auth_utils.create_access_token(
data={"user_id": user.id}, expires_delta=access_token_expires
)
return {"access_token":access_token}
# получаем данные успешно вошедшего пользователя
@app.get("/api/users/me", response_model=pydantic_schemas.User) #
def read_users_me(current_user: Annotated[pydantic_schemas.User, Depends(auth_utils.get_current_active_user)]):
return current_user
# изменяем рейтинг пользователя
@app.post("/api/user/rating")
async def add_points(data: pydantic_schemas.AddRating, current_user: Annotated[pydantic_schemas.User, Depends(auth_utils.get_current_user)], db: Annotated[Session, Depends(auth_utils.get_session)]):
# проверяем,
if current_user.id != data.user_id:
user = await auth_utils.get_user_by_id(db, data.user_id)
if not user:
raise HTTPException(status_code=404, detail="Item not found")
user.rating = (user.rating*user.num_of_ratings + data.rate)/(user.num_of_ratings + 1)
user.num_of_ratings += 1
await db.commit()
await db.refresh(user) # обновляем состояние объекта
return {"Success": True}
# получаем рейтинг пользователя
@app.get("/api/user/rating")
async def add_points(user_id: int, db: Annotated[Session, Depends(auth_utils.get_session)]):
user = await auth_utils.get_user_by_id(db, user_id=user_id)
if not user:
raise HTTPException(status_code=404, detail="Item not found")
return {"rating": user.rating}
# Отправляем стихи
@app.get("/api/user/poem", response_model=pydantic_schemas.Poem)
async def poems_to_front(db: Annotated[Session, Depends(auth_utils.get_session)]):
#num_of_poems = db.query(orm_models.Poems).count() # определяем кол-во стихов в бд
query = await db.execute(select(orm_models.Poems)) # определяем кол-во стихов в бд
num_of_poems = len(query.scalars().all())
# если стихов в бд нет
if num_of_poems < 1:
await add_poems_and_filters.add_poems_to_db(db) # добавляем поэмы в базу данных
# после добавления стихов снова определяем кол-во стихов в бд
query = await db.execute(select(orm_models.Poems))
num_of_poems = len(query.scalars().all())
rand_id = random.randint(1, num_of_poems) # генерируем номер стихотворения
#poem = db.query(orm_models.Poems).filter(orm_models.Poems.id == rand_id).first() # находим стих в бд
query_poem = await db.execute(select(orm_models.Poems).where(orm_models.Poems.id == rand_id)) # находим стих в бд
poem = query_poem.scalars().first()
if not poem:
raise HTTPException(status_code=404, detail="Poem not found")
return poem
trashboxes_category = {
"PORRIDGE": ["Опасные отходы", "Иное"],
"conspects": ["Бумага"],
"milk": ["Стекло", "Тетра Пак", "Иное"],
"bred": ["Пластик", "Иное"],
"wathing": ["Пластик", "Опасные отходы", "Иное"],
"cloth": ["Одежда"],
"fruits_vegatables": ["Иное"],
"other_things": ["Металл", "Бумага", "Стекло", "Иное", "Тетра Пак", "Батарейки", "Крышечки", "Шины",
"Опасные отходы", "Лампочки", "Пластик"]
}
@app.get("/api/trashbox", response_model=List[pydantic_schemas.TrashboxResponse])
async def get_trashboxes(data: pydantic_schemas.TrashboxRequest = Depends()): #крутая функция для работы с api
# json, передаваемый стороннему API
head = {'Authorization': 'Bearer ' + TRASHBOXES_TOKEN}
# Данные пользователя (местоположение, количество мусорок, которое пользователь хочет видеть)
my_data={
'x' : f"{data.Lng}",
'y' : f"{data.Lat}",
'limit' : '1'
}
# Перевод категории с фронта на категорию с сайта
try:
list_of_category = trashboxes_category[data.Category]
except:
list_of_category = trashboxes_category['other_things']
# Получение ответа от стороннего апи
response = requests.post(TRASHBOXES_BASE_URL + "/nearest_recycling/get", headers=head, data=my_data, timeout=10)
infos = response.json()
if 'error' in infos and infos['error_description'] == 'Invalid bearer token':
raise HTTPException(status_code=502, detail="Invalid trashboxes token")
# Чтение ответа
trashboxes = []
for trashbox in infos["results"]:
temp_dict = {}
for obj in trashbox["Objects"]:
coord_list = obj["geometry"]
temp_dict["Lat"] = coord_list["coordinates"][1]
temp_dict["Lng"] = coord_list["coordinates"][0]
properties = obj["properties"]
temp_dict["Name"] = properties["title"]
temp_dict["Address"] = properties["address"]
temp_dict["Categories"] = properties["content_text"].split(',')
for a in list_of_category:
if a in temp_dict["Categories"] and temp_dict not in trashboxes:
trashboxes.append(temp_dict)
uniq_trashboxes = [pydantic_schemas.TrashboxResponse(**ast.literal_eval(el1)) for el1 in set([str(el2) for el2 in trashboxes])]
return uniq_trashboxes
@app.get("/{rest_of_path:path}")
async def react_app(req: Request, rest_of_path: str):
return templates.TemplateResponse('index.html', { 'request': req })
@app.post("/api/announcement/dispose")
async def dispose(data: pydantic_schemas.DisposeRequest, current_user_schema: Annotated[pydantic_schemas.User, Depends(auth_utils.get_current_user)],
db: Annotated[Session, Depends(auth_utils.get_session)]):
# Находим в бд текущего юзера
current_user = await auth_utils.get_user_by_id(db, current_user_schema.id)
# Начисляем баллы пользователю за утилизацию
current_user.points += 60
# В полученном json переходим к данным мусорки
data_trashbox = data.trashbox
# создаем запись models.Trashbox
new_trashox = orm_models.Trashbox(user_id=current_user.id, date_of_choice=datetime.date.today(), name=data_trashbox.Name,
latitude=data_trashbox.Lat, longtitude=data_trashbox.Lng, address=data_trashbox.Address, category=data_trashbox.Category)
# добавляем в бд
db.add(new_trashox)
# в соответствии с логикой api, после утилизации объявление пользователя удаляется
# находим объявление с айди data.ann_id
#ann_to_del = db.query(orm_models.Announcement).filter(orm_models.Announcement.id == data.ann_id).first() #
query_ann = await db.execute(select(orm_models.Announcement).where(orm_models.Announcement.id == data.ann_id)) # находим объявление в бд
ann_to_del = query_ann.scalars().first()
if not ann_to_del:
raise HTTPException(status_code=404, detail="Announcement not found")
# удаляем объявление из бд
await db.delete(ann_to_del)
await db.commit()
await db.refresh(new_trashox) # обновляем состояние объекта
return {"Success": True}