porridger/back/api.py

280 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#подключение библиотек
from fastapi import FastAPI, Response, Path, Depends, Body, Form, Query, status, HTTPException, APIRouter, UploadFile, File
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
from fastapi.templating import Jinja2Templates
from fastapi.requests import Request
from pydantic import json
from typing import Any, Annotated, List, Union
from starlette.staticfiles import StaticFiles
import requests
from uuid import uuid4
import random
import datetime
import asyncio
import ast
import pathlib
import shutil
import os
from .db import Base, engine, SessionLocal, database, Session
from .scheduler import app as app_rocketry
from . import schemas, models, utils, service
Base.metadata.create_all(bind=engine)
app = FastAPI()
templates = Jinja2Templates(directory="./front/dist")
app.mount("/static", StaticFiles(directory = "./front/dist"))
if not os.path.exists("./uploads"):
os.mkdir("./uploads")
app.mount("/uploads", StaticFiles(directory = "./uploads"))
# Записываем стихи в базу данных, если их еще нет (запускать только если стихов в базе нет).
# add_poems_to_db(database)
@app.get("/api/announcements", response_model=List[schemas.Announcement])#адрес объявлений
def announcements_list(db: Annotated[Session, Depends(utils.get_db)], obsolete: Union[bool, None] = False, user_id: Union[int, None] = None,
metro: Union[str, None] = None,category: Union[str, None] = None):
# параметры для сортировки (схема pydantic schemas.SortAnnouncements)
params_to_sort = schemas.SortAnnouncements(obsolete=obsolete, user_id=user_id, metro=metro, category=category)
# получаем результат
result = get_query_results(db=db, schema=params_to_sort)
return result
@app.get("/api/announcement") #
def single_announcement(ann_id:int, db: Annotated[Session, Depends(utils.get_db)]): # передаем индекс обявления
# Считываем данные из Body и отображаем их на странице.
# В последствии будем вставлять данные в html-форму
try:
announcement = db.get(models.Announcement, ann_id)
return {"id": announcement.id, "user_id": announcement.user_id, "name": announcement.name,
"category": announcement.category, "best_by": announcement.best_by, "address": announcement.address,
"description": announcement.description, "metro": announcement.metro, "latitude": announcement.latitude,
"longtitude":announcement.longtitude, "trashId": announcement.trashId, "src":announcement.src,
"booked_by":announcement.booked_by}
except:
return {"Answer" : False} #если неуданый доступ, то сообщаем об этом
# Занести объявление в базу данных
@app.put("/api/announcement")#адрес объявлений
def put_in_db(name: Annotated[str, Form()], category: Annotated[str, Form()], bestBy: Annotated[datetime.date, Form()],
address: Annotated[str, Form()], longtitude: Annotated[float, Form()], latitude: Annotated[float, Form()],
description: Annotated[str, Form()], metro: Annotated[str, Form()], current_user: Annotated[schemas.User, Depends(utils.get_current_active_user)],
db: Annotated[Session, Depends(utils.get_db)], src: Union[UploadFile, None] = None, trashId: Annotated[int, Form()] = None):
try:
uploaded_name = ""
if src:
f = src.file
f.seek(0, os.SEEK_END)
if f.tell() > 0:
f.seek(0)
destination = pathlib.Path("./uploads/" + str(hash(f)) + pathlib.Path(src.filename).suffix.lower())
with destination.open('wb') as buffer:
shutil.copyfileobj(f, buffer)
uploaded_name = "/uploads/"+destination.name
temp_ancmt = models.Announcement(user_id=current_user.id, name=name, category=category, best_by=bestBy,
address=address, longtitude=longtitude, latitude=latitude, description=description, metro=metro,
trashId=trashId, src=uploaded_name, booked_by=0)
db.add(temp_ancmt) # добавляем в бд
db.commit() # сохраняем изменения
db.refresh(temp_ancmt) # обновляем состояние объекта
return {"Answer" : True}
except:
return {"Answer" : False}
# Удалить объявления из базы
@app.delete("/api/announcement") #адрес объявления
def delete_from_db(announcement: schemas.DelAnnouncement, db: Annotated[Session, Depends(utils.get_db)]): # функция удаления объекта из БД
try:
to_delete = db.query(models.Announcement).filter(models.Announcement.id==announcement.id).first()
db.delete(to_delete) # удаление из БД
db.commit() # сохраняем изменения
return {"Answer" : True}
except:
return {"Answer" : False}
# Забронировать объявление
@app.post("/api/book")
def change_book_status(data: schemas.Book, current_user: Annotated[schemas.User, Depends(utils.get_current_user)],
db: Annotated[Session, Depends(utils.get_db)]):
# Находим объявление по данному id
announcement_to_change = db.query(models.Announcement).filter(models.Announcement.id == data.id).first()
# Проверяем, что объявление с данным id существует
if not announcement_to_change:
raise HTTPException(status_code=404, detail="Item not found")
# Проверяем, что объявление бронирует не владелец
if current_user.id == announcement_to_change.user_id:
raise HTTPException(status_code=403, detail="A user can't book his announcement")
else:
# Инкрементируем поле booked_by на 1
announcement_to_change.booked_by += 1
db.commit()
db.refresh(announcement_to_change)
return {"Success": True}
# reginstration
@app.post("/api/signup")
def create_user(nickname: Annotated[str, Form()], password: Annotated[str, Form()], db: Annotated[Session, Depends(utils.get_db)],
name: Annotated[str, Form()]=None, surname: Annotated[str, Form()]=None, avatar: Annotated[UploadFile, Form()]=None):
if db.query(models.User).filter(models.User.nickname == nickname).first() == None:
new_user = models.User(nickname=nickname, hashed_password=utils.get_password_hash(password),
name=name, surname=surname, reg_date=datetime.date.today())
db.add(new_user)
db.commit()
db.refresh(new_user) # обновляем состояние объекта
return {"Success": True}
return {"Success": False, "Message": "Пользователь с таким email уже зарегестрирован"}
@app.post("/api/token", response_model=schemas.Token)
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: Annotated[Session, Depends(utils.get_db)]
):
user = utils.authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = utils.timedelta(minutes=utils.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = utils.create_access_token(
data={"user_id": user.id}, expires_delta=access_token_expires
)
return {"access_token":access_token}
@app.get("/api/users/me", response_model=schemas.User) #
async def read_users_me(current_user: Annotated[schemas.User, Depends(utils.get_current_active_user)]):
return current_user
# изменяем рейтинг пользователя
@app.post("/api/user/rating")
def add_points(data: schemas.AddRating, current_user: Annotated[schemas.User, Depends(utils.get_current_user)], db: Annotated[Session, Depends(utils.get_db)]):
if current_user.id != data.user_id:
user = utils.get_user_by_id(db, data.user_id)
if not user:
raise HTTPException(status_code=404, detail="Item not found")
user.rating = (user.rating*user.num_of_ratings + data.rate)/(user.num_of_ratings + 1)
user.num_of_ratings += 1
db.commit()
db.refresh(user) # обновляем состояние объекта
return {"Success": True}
# получаем рейтинг пользователя
@app.get("/api/user/rating")
def add_points(user_id: int, db: Annotated[Session, Depends(utils.get_db)]):
user = utils.get_user_by_id(db, user_id=user_id)
if not user:
raise HTTPException(status_code=404, detail="Item not found")
return {"rating": user.rating}
# Отправляем стихи
@app.get("/api/user/poem") # пока не работает
def poems_to_front(db: Annotated[Session, Depends(utils.get_db)]): # db: Annotated[Session, Depends(utils.get_db)]
kolvo_stixov = db.query(models.Poems).count() # пока количество стихотворений = 101
if kolvo_stixov > 1:
rand_id = random.randint(1, kolvo_stixov) # номер стихотворения
poem_json = dict()
poem = db.query(models.Poems).filter(models.Poems.id == rand_id).first()
poem_json = {"id": rand_id, "title": poem.title, "text": poem.text, "author": poem.author}
return poem_json
else:
raise HTTPException(status_code=404, detail="Poems not found")
@app.get("/api/trashbox", response_model=List[schemas.TrashboxResponse])
def get_trashboxes(Lat:float, Lng:float, Category:str):#крутая функция для работы с api
# json, передаваемый стороннему API
BASE_URL= "https://geointelect2.gate.petersburg.ru"
my_token="eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJhU1RaZm42bHpTdURYcUttRkg1SzN5UDFhT0FxUkhTNm9OendMUExaTXhFIn0.eyJleHAiOjE3ODYyMjUzMzMsImlhdCI6MTY5MTUzMDkzMywianRpIjoiYjU0MmU3MTQtYzJkMS00NTY2LWJkY2MtYmQ5NzA0ODY1ZjgzIiwiaXNzIjoiaHR0cHM6Ly9rYy5wZXRlcnNidXJnLnJ1L3JlYWxtcy9lZ3MtYXBpIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6ImJjYjQ2NzljLTU3ZGItNDU5ZC1iNWUxLWRlOGI4Yzg5MTMwMyIsInR5cCI6IkJlYXJlciIsImF6cCI6ImFkbWluLXJlc3QtY2xpZW50Iiwic2Vzc2lvbl9zdGF0ZSI6IjJhOTgwMzUyLTY1M2QtNGZlZC1iMDI1LWQ1N2U0NDRjZmM3NiIsImFjciI6IjEiLCJhbGxvd2VkLW9yaWdpbnMiOlsiLyoiXSwicmVhbG1fYWNjZXNzIjp7InJvbGVzIjpbImRlZmF1bHQtcm9sZXMtZWdzLWFwaSIsIm9mZmxpbmVfYWNjZXNzIiwidW1hX2F1dGhvcml6YXRpb24iXX0sInJlc291cmNlX2FjY2VzcyI6eyJhY2NvdW50Ijp7InJvbGVzIjpbIm1hbmFnZS1hY2NvdW50IiwibWFuYWdlLWFjY291bnQtbGlua3MiLCJ2aWV3LXByb2ZpbGUiXX19LCJzY29wZSI6ImVtYWlsIHByb2ZpbGUiLCJzaWQiOiIyYTk4MDM1Mi02NTNkLTRmZWQtYjAyNS1kNTdlNDQ0Y2ZjNzYiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsIm5hbWUiOiLQktC70LDQtNC40LzQuNGAINCv0LrQvtCy0LvQtdCyIiwicHJlZmVycmVkX3VzZXJuYW1lIjoiZTBmYzc2OGRhOTA4MjNiODgwZGQzOGVhMDJjMmQ5NTciLCJnaXZlbl9uYW1lIjoi0JLQu9Cw0LTQuNC80LjRgCIsImZhbWlseV9uYW1lIjoi0K_QutC-0LLQu9C10LIifQ.FTKiC1hpWcOkmSW9QZpC-RY7Ko50jw1mDMfXIWYxlQ-zehLm2CLmOnHvYoOoI39k2OzeCIAB9ZdRrrGZc6G9Z1eFELUjNGEqKxSC1Phj9ATemKgbOKEttk-OGc-rFr9VPA8_SnfvLts6wTI2YK33YBIxCF5nCbnr4Qj3LeEQ0d6Hy8PO4ATrBF5EOeuAZRprvIEjXe_f8N9ONKckCPB-xFB4P2pZlVXGoCNoewGEcY3zXH4khezN6zcVr6tpc6G8dBv9EqT_v92IDSg-aXQk6ysA0cO0-6x5w1-_qU0iHGIAPsLNV9IKBoFbjc0JH6cWabldPRH12NP1trvYfqKDGQ"
head = {'Authorization': 'Bearer {}'.format(my_token)}
my_data={
'x' : f"{Lng}",
'y' : f"{Lat}",
'limit' : '1'
}
match Category:
case "PORRIDGE":
list_of_category = ["Опасные отходы", "Иное"]
case "Конспекты":
list_of_category = ["Бумага"]
case "Молочные продукты":
list_of_category = ["Стекло", "Тетра Пак", "Иное"]
case "Хлебобулочные изделия":
list_of_category = ["Пластик", "Иное"]
case "Моющие средства":
list_of_category = ["Пластик", "Опасные отходы", "Иное"]
case "Одежда":
list_of_category = ["Одежда"]
case "Фрукты и овощи":
list_of_category = ["Иное"]
case "Всякая всячина":
list_of_category = ["Металл", "Бумага", "Стекло", "Иное", "Тетра Пак", "Батарейки", "Крышечки", "Шины",
"Опасные отходы", "Лампочки", "Пластик"]
response = requests.post(f"{BASE_URL}/nearest_recycling/get", headers=head, data=my_data)
infos = response.json()
trashboxes = []
for trashbox in infos["results"]:
temp_dict = {}
for obj in trashbox["Objects"]:
coord_list = obj["geometry"]
temp_dict["Lat"] = coord_list["coordinates"][1]
temp_dict["Lng"] = coord_list["coordinates"][0]
properties = obj["properties"]
temp_dict["Name"] = properties["title"]
temp_dict["Address"] = properties["address"]
temp_dict["Categories"] = properties["content_text"].split(',')
for a in list_of_category:
if a in temp_dict["Categories"] and temp_dict not in trashboxes:
trashboxes.append(temp_dict)
uniq_trashboxes = [ast.literal_eval(el1) for el1 in set([str(el2) for el2 in trashboxes])]
return JSONResponse(uniq_trashboxes)
@app.get("/{rest_of_path:path}")
async def react_app(req: Request, rest_of_path: str):
return templates.TemplateResponse('index.html', { 'request': req })
@app.post("/api/announcement/dispose")
def dispose(data: schemas.DisposeRequest, current_user_schema: Annotated[schemas.User, Depends(utils.get_current_user)],
db: Annotated[Session, Depends(utils.get_db)]):
# Находим в бд текущего юзера
current_user = utils.get_user_by_id(db, current_user_schema.id)
# Начисляем баллы пользователю за утилизацию
current_user.points += 60
# В полученном json переходим к данным мусорки
data_trashbox = data.trashbox
new_trashox = models.Trashbox(user_id=current_user.id, date_of_choice=datetime.date.today(), name=data_trashbox.Name,
latitude=data_trashbox.Lat, longtitude=data_trashbox.Lng, address=data_trashbox.Address, categories=data_trashbox.Category)
db.add(new_trashox)
db.commit()
db.refresh(new_trashox) # обновляем состояние объекта