334 lines
17 KiB
Python
334 lines
17 KiB
Python
#подключение библиотек
|
||
from fastapi import FastAPI, Depends, Form, status, HTTPException, APIRouter, UploadFile
|
||
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, RedirectResponse
|
||
from fastapi.staticfiles import StaticFiles
|
||
from fastapi.security import OAuth2PasswordRequestForm
|
||
from fastapi.templating import Jinja2Templates
|
||
from fastapi.requests import Request
|
||
|
||
|
||
from typing import Any, Annotated, List, Union
|
||
from starlette.staticfiles import StaticFiles
|
||
from sqlalchemy.orm import Session
|
||
from sqlalchemy import select
|
||
|
||
import requests
|
||
from uuid import uuid4
|
||
import random
|
||
import datetime
|
||
import asyncio
|
||
|
||
import ast
|
||
import pathlib
|
||
import shutil
|
||
import os
|
||
|
||
from . import add_poems_and_filters, auth_utils, orm_models, pydantic_schemas
|
||
|
||
from .config import TRASHBOXES_BASE_URL, TRASHBOXES_TOKEN
|
||
|
||
# создаем приложение Fastapi
|
||
app = FastAPI()
|
||
|
||
# Jinja2 - шаблоны
|
||
templates = Jinja2Templates(directory="./front/dist")
|
||
|
||
# хранение картинок для стихов
|
||
app.mount("/poem_pic", StaticFiles(directory = "./poem_pic"))
|
||
# создаем эндпоинт для хранения статических файлов
|
||
app.mount("/static", StaticFiles(directory = "./front/dist"))
|
||
# проверяем, что папка uploads еще не создана
|
||
if not os.path.exists("./uploads"):
|
||
os.mkdir("./uploads")
|
||
# создаем эндпоинт для хранения файлов пользователя
|
||
app.mount("/uploads", StaticFiles(directory = "./uploads"))
|
||
|
||
# эндпоинт для возвращения согласия в pdf
|
||
@app.get("/privacy_policy.pdf")
|
||
async def privacy_policy():
|
||
return FileResponse("./privacy_policy.pdf")
|
||
|
||
# получение списка объявлений
|
||
@app.get("/api/announcements", response_model=List[pydantic_schemas.Announcement])#адрес объявлений
|
||
async def announcements_list(db: Annotated[Session, Depends(auth_utils.get_session)], obsolete: Union[bool, None] = False, user_id: Union[int, None] = None,
|
||
metro: Union[str, None] = None,category: Union[str, None] = None):
|
||
# параметры для сортировки (схема pydantic schemas.SortAnnouncements)
|
||
params_to_sort = pydantic_schemas.SortAnnouncements(obsolete=obsolete, user_id=user_id, metro=metro, category=category)
|
||
# получаем результат
|
||
result = await add_poems_and_filters.filter_ann(db=db, schema=params_to_sort)
|
||
|
||
return result
|
||
|
||
|
||
# получаем данные одного объявления
|
||
@app.get("/api/announcement", response_model=pydantic_schemas.AnnResponce)
|
||
async def single_announcement(ann_id:int, db: Annotated[Session, Depends(auth_utils.get_session)]): # передаем индекс обявления
|
||
# Считываем данные из Body и отображаем их на странице.
|
||
# В последствии будем вставлять данные в html-форму
|
||
announcement = await db.get(orm_models.Announcement, ann_id)
|
||
#announcement = await db.execute(select(orm_models.Announcement)).scalars().all()
|
||
if not announcement:
|
||
raise HTTPException(status_code=404, detail="Item not found")
|
||
return announcement
|
||
|
||
|
||
# Занести объявление в базу данных
|
||
@app.put("/api/announcement")
|
||
async def put_in_db(name: Annotated[str, Form()], category: Annotated[str, Form()], bestBy: Annotated[datetime.date, Form()],
|
||
address: Annotated[str, Form()], longtitude: Annotated[float, Form()], latitude: Annotated[float, Form()],
|
||
description: Annotated[str, Form()], metro: Annotated[str, Form()], current_user: Annotated[pydantic_schemas.User, Depends(auth_utils.get_current_active_user)],
|
||
db: Annotated[Session, Depends(auth_utils.get_session)], src: Union[UploadFile, None] = None, trashId: Annotated[int, Form()] = None):
|
||
|
||
# имя загруженного файла по умолчанию - пустая строка
|
||
uploaded_name = ""
|
||
# если пользователь загрузил картинку
|
||
if src:
|
||
# процесс сохранения картинки
|
||
f = src.file
|
||
f.seek(0, os.SEEK_END)
|
||
if f.tell() > 0:
|
||
f.seek(0)
|
||
destination = pathlib.Path("./uploads/" + str(hash(f)) + pathlib.Path(src.filename).suffix.lower())
|
||
with destination.open('wb') as buffer:
|
||
shutil.copyfileobj(f, buffer)
|
||
|
||
# изменяем название директории загруженного файла
|
||
uploaded_name = "/uploads/" + destination.name
|
||
|
||
# создаем объект Announcement
|
||
temp_ancmt = orm_models.Announcement(user_id=current_user.id, name=name, category=category, best_by=bestBy,
|
||
address=address, longtitude=longtitude, latitude=latitude, description=description, metro=metro,
|
||
trashId=trashId, src=uploaded_name, booked_by=0)
|
||
try:
|
||
db.add(temp_ancmt) # добавляем в бд
|
||
await db.commit() # сохраняем изменения
|
||
await db.refresh(temp_ancmt) # обновляем состояние объекта
|
||
|
||
return {"Success": True}
|
||
except:
|
||
raise HTTPException(status_code=500, detail="problem with adding object to db")
|
||
|
||
|
||
# Удалить объявления из базы
|
||
@app.delete("/api/announcement") #адрес объявления
|
||
async def delete_from_db(announcement: pydantic_schemas.DelAnnouncement, db: Annotated[Session, Depends(auth_utils.get_session)]): # функция удаления объекта из БД
|
||
# находим объект с заданным id в бд
|
||
#to_delete = db.query(orm_models.Announcement).filter(orm_models.Announcement.id==announcement.id).first()
|
||
query = await db.execute(select(orm_models.Announcement).where(orm_models.Announcement.id==announcement.id))
|
||
to_delete = query.scalars().first()
|
||
if not to_delete:
|
||
raise HTTPException(status_code=404, detail="Item not found. Can't delete")
|
||
try:
|
||
await db.delete(to_delete) # удаление из БД
|
||
await db.commit() # сохраняем изменения
|
||
|
||
return {"Success": True}
|
||
except:
|
||
raise HTTPException(status_code=500, detail="Problem with adding to database")
|
||
|
||
|
||
# Забронировать объявление
|
||
@app.post("/api/book")
|
||
async def change_book_status(data: pydantic_schemas.Book, current_user: Annotated[pydantic_schemas.User, Depends(auth_utils.get_current_user)],
|
||
db: Annotated[Session, Depends(auth_utils.get_session)]):
|
||
# Находим объявление по данному id
|
||
#announcement_to_change = db.query(orm_models.Announcement).filter(orm_models.Announcement.id == data.id).first()
|
||
query = await db.execute(select(orm_models.Announcement).where(orm_models.Announcement.id == data.id))
|
||
announcement_to_change = query.scalars().first()
|
||
# Проверяем, что объявление с данным id существует
|
||
if not announcement_to_change:
|
||
raise HTTPException(status_code=404, detail="Item not found")
|
||
# Проверяем, что объявление бронирует не владелец
|
||
if current_user.id == announcement_to_change.user_id:
|
||
raise HTTPException(status_code=403, detail="A user can't book his announcement")
|
||
else:
|
||
# Инкрементируем поле booked_by на 1
|
||
announcement_to_change.booked_by += 1
|
||
# фиксируем изменения в бд
|
||
await db.commit()
|
||
await db.refresh(announcement_to_change)
|
||
return {"Success": True}
|
||
|
||
|
||
# reginstration
|
||
@app.post("/api/signup")
|
||
async def create_user(nickname: Annotated[str, Form()], password: Annotated[str, Form()], db: Annotated[Session, Depends(auth_utils.get_session)],
|
||
name: Annotated[str, Form()]=None, surname: Annotated[str, Form()]=None, avatar: Annotated[UploadFile, Form()]=None):
|
||
|
||
# проверяем, что юзера с введенным никнеймом не существует в бд
|
||
#if db.query(orm_models.User).filter(orm_models.User.nickname == nickname).first() == None:
|
||
query_user = await db.execute(select(orm_models.User).where(orm_models.User.nickname == nickname))
|
||
user_with_entered_nick = query_user.scalars().first()
|
||
if user_with_entered_nick == None:
|
||
# создаем нового юзера
|
||
new_user = orm_models.User(nickname=nickname, hashed_password=auth_utils.get_password_hash(password),
|
||
name=name, surname=surname, reg_date=datetime.date.today())
|
||
# добавляем в бд
|
||
db.add(new_user)
|
||
await db.commit()
|
||
await db.refresh(new_user) # обновляем состояние объекта
|
||
return {"Success": True}
|
||
return {"Success": False, "Message": "Пользователь с таким email уже зарегестрирован"}
|
||
|
||
|
||
# функция для генерации токена после успешного входа пользователя
|
||
@app.post("/api/token", response_model=pydantic_schemas.Token)
|
||
async def login_for_access_token(
|
||
form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: Annotated[Session, Depends(auth_utils.get_session)]
|
||
):
|
||
# пробуем найти юзера в бд по введенным паролю и никнейму
|
||
user = await auth_utils.authenticate_user(db, form_data.username, form_data.password)
|
||
# если не нашли - кидаем ошибку
|
||
if not user:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="Incorrect username or password",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
# задаем временной интервал, в течение которого токен можно использовать
|
||
access_token_expires = auth_utils.timedelta(minutes=auth_utils.ACCESS_TOKEN_EXPIRE_MINUTES)
|
||
# создаем токен
|
||
access_token = auth_utils.create_access_token(
|
||
data={"user_id": user.id}, expires_delta=access_token_expires
|
||
)
|
||
return {"access_token":access_token}
|
||
|
||
|
||
# получаем данные успешно вошедшего пользователя
|
||
@app.get("/api/users/me", response_model=pydantic_schemas.User) #
|
||
def read_users_me(current_user: Annotated[pydantic_schemas.User, Depends(auth_utils.get_current_active_user)]):
|
||
return current_user
|
||
|
||
|
||
# изменяем рейтинг пользователя
|
||
@app.post("/api/user/rating")
|
||
async def add_points(data: pydantic_schemas.AddRating, current_user: Annotated[pydantic_schemas.User, Depends(auth_utils.get_current_user)], db: Annotated[Session, Depends(auth_utils.get_session)]):
|
||
# проверяем,
|
||
if current_user.id != data.user_id:
|
||
user = await auth_utils.get_user_by_id(db, data.user_id)
|
||
if not user:
|
||
raise HTTPException(status_code=404, detail="Item not found")
|
||
user.rating = (user.rating*user.num_of_ratings + data.rate)/(user.num_of_ratings + 1)
|
||
user.num_of_ratings += 1
|
||
await db.commit()
|
||
await db.refresh(user) # обновляем состояние объекта
|
||
return {"Success": True}
|
||
|
||
|
||
# получаем рейтинг пользователя
|
||
@app.get("/api/user/rating")
|
||
async def add_points(user_id: int, db: Annotated[Session, Depends(auth_utils.get_session)]):
|
||
user = await auth_utils.get_user_by_id(db, user_id=user_id)
|
||
if not user:
|
||
raise HTTPException(status_code=404, detail="Item not found")
|
||
return {"rating": user.rating}
|
||
|
||
|
||
# Отправляем стихи
|
||
@app.get("/api/user/poem", response_model=pydantic_schemas.Poem)
|
||
async def poems_to_front(db: Annotated[Session, Depends(auth_utils.get_session)]):
|
||
#num_of_poems = db.query(orm_models.Poems).count() # определяем кол-во стихов в бд
|
||
query = await db.execute(select(orm_models.Poems)) # определяем кол-во стихов в бд
|
||
num_of_poems = len(query.scalars().all())
|
||
# если стихов в бд нет
|
||
if num_of_poems < 1:
|
||
await add_poems_and_filters.add_poems_to_db(db) # добавляем поэмы в базу данных
|
||
# после добавления стихов снова определяем кол-во стихов в бд
|
||
query = await db.execute(select(orm_models.Poems))
|
||
num_of_poems = len(query.scalars().all())
|
||
rand_id = random.randint(1, num_of_poems) # генерируем номер стихотворения
|
||
#poem = db.query(orm_models.Poems).filter(orm_models.Poems.id == rand_id).first() # находим стих в бд
|
||
query_poem = await db.execute(select(orm_models.Poems).where(orm_models.Poems.id == rand_id)) # находим стих в бд
|
||
poem = query_poem.scalars().first()
|
||
if not poem:
|
||
raise HTTPException(status_code=404, detail="Poem not found")
|
||
return poem
|
||
|
||
trashboxes_category = {
|
||
"PORRIDGE": ["Опасные отходы", "Иное"],
|
||
"conspects": ["Бумага"],
|
||
"milk": ["Стекло", "Тетра Пак", "Иное"],
|
||
"bred": ["Пластик", "Иное"],
|
||
"wathing": ["Пластик", "Опасные отходы", "Иное"],
|
||
"cloth": ["Одежда"],
|
||
"fruits_vegatables": ["Иное"],
|
||
"other_things": ["Металл", "Бумага", "Стекло", "Иное", "Тетра Пак", "Батарейки", "Крышечки", "Шины",
|
||
"Опасные отходы", "Лампочки", "Пластик"]
|
||
}
|
||
|
||
@app.get("/api/trashbox", response_model=List[pydantic_schemas.TrashboxResponse])
|
||
async def get_trashboxes(data: pydantic_schemas.TrashboxRequest = Depends()): #крутая функция для работы с api
|
||
# json, передаваемый стороннему API
|
||
head = {'Authorization': 'Bearer ' + TRASHBOXES_TOKEN}
|
||
# Данные пользователя (местоположение, количество мусорок, которое пользователь хочет видеть)
|
||
my_data={
|
||
'x' : f"{data.Lng}",
|
||
'y' : f"{data.Lat}",
|
||
'limit' : '1'
|
||
}
|
||
# Перевод категории с фронта на категорию с сайта
|
||
try:
|
||
list_of_category = trashboxes_category[data.Category]
|
||
except:
|
||
list_of_category = trashboxes_category['other_things']
|
||
|
||
# Получение ответа от стороннего апи
|
||
response = requests.post(TRASHBOXES_BASE_URL + "/nearest_recycling/get", headers=head, data=my_data, timeout=10)
|
||
infos = response.json()
|
||
|
||
if 'error' in infos and infos['error_description'] == 'Invalid bearer token':
|
||
raise HTTPException(status_code=502, detail="Invalid trashboxes token")
|
||
|
||
# Чтение ответа
|
||
trashboxes = []
|
||
for trashbox in infos["results"]:
|
||
temp_dict = {}
|
||
for obj in trashbox["Objects"]:
|
||
coord_list = obj["geometry"]
|
||
temp_dict["Lat"] = coord_list["coordinates"][1]
|
||
temp_dict["Lng"] = coord_list["coordinates"][0]
|
||
|
||
properties = obj["properties"]
|
||
temp_dict["Name"] = properties["title"]
|
||
temp_dict["Address"] = properties["address"]
|
||
temp_dict["Categories"] = properties["content_text"].split(',')
|
||
for a in list_of_category:
|
||
if a in temp_dict["Categories"] and temp_dict not in trashboxes:
|
||
trashboxes.append(temp_dict)
|
||
uniq_trashboxes = [pydantic_schemas.TrashboxResponse(**ast.literal_eval(el1)) for el1 in set([str(el2) for el2 in trashboxes])]
|
||
return uniq_trashboxes
|
||
|
||
|
||
@app.get("/{rest_of_path:path}")
|
||
async def react_app(req: Request, rest_of_path: str):
|
||
return templates.TemplateResponse('index.html', { 'request': req })
|
||
|
||
|
||
@app.post("/api/announcement/dispose")
|
||
async def dispose(data: pydantic_schemas.DisposeRequest, current_user_schema: Annotated[pydantic_schemas.User, Depends(auth_utils.get_current_user)],
|
||
db: Annotated[Session, Depends(auth_utils.get_session)]):
|
||
# Находим в бд текущего юзера
|
||
current_user = await auth_utils.get_user_by_id(db, current_user_schema.id)
|
||
# Начисляем баллы пользователю за утилизацию
|
||
current_user.points += 60
|
||
# В полученном json переходим к данным мусорки
|
||
data_trashbox = data.trashbox
|
||
# создаем запись models.Trashbox
|
||
new_trashox = orm_models.Trashbox(user_id=current_user.id, date_of_choice=datetime.date.today(), name=data_trashbox.Name,
|
||
latitude=data_trashbox.Lat, longtitude=data_trashbox.Lng, address=data_trashbox.Address, category=data_trashbox.Category)
|
||
# добавляем в бд
|
||
db.add(new_trashox)
|
||
# в соответствии с логикой api, после утилизации объявление пользователя удаляется
|
||
# находим объявление с айди data.ann_id
|
||
#ann_to_del = db.query(orm_models.Announcement).filter(orm_models.Announcement.id == data.ann_id).first() #
|
||
query_ann = await db.execute(select(orm_models.Announcement).where(orm_models.Announcement.id == data.ann_id)) # находим объявление в бд
|
||
ann_to_del = query_ann.scalars().first()
|
||
if not ann_to_del:
|
||
raise HTTPException(status_code=404, detail="Announcement not found")
|
||
# удаляем объявление из бд
|
||
await db.delete(ann_to_del)
|
||
await db.commit()
|
||
await db.refresh(new_trashox) # обновляем состояние объекта
|
||
return {"Success": True}
|
||
|