refactoring - 1

This commit is contained in:
2023-08-23 23:46:51 +03:00
parent fb3763b910
commit 6742c963ab
11 changed files with 215 additions and 185 deletions

View File

@ -1,12 +1,12 @@
#подключение библиотек
from fastapi import FastAPI, Response, Path, Depends, Body, Form, Query, status, HTTPException, APIRouter, UploadFile, File
from fastapi import FastAPI, Depends, Form, status, HTTPException, APIRouter, UploadFile
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.templating import Jinja2Templates
from fastapi.requests import Request
from pydantic import json
from typing import Any, Annotated, List, Union
from starlette.staticfiles import StaticFiles
from sqlalchemy.orm import Session
@ -22,56 +22,60 @@ import pathlib
import shutil
import os
from . import schemas, models, utils, service
from .db import database
from . import add_poems_and_filters, auth_utils, orm_models, pydantic_schemas
# создаем приложение Fastapi
app = FastAPI()
# Jinja2 - шаблоны
templates = Jinja2Templates(directory="./front/dist")
# создаем эндпоинт для хранения статических файлов
app.mount("/static", StaticFiles(directory = "./front/dist"))
# проверяем, что папка uploads еще не создана
if not os.path.exists("./uploads"):
os.mkdir("./uploads")
# создаем эндпоинт для хранения файлов пользователя
app.mount("/uploads", StaticFiles(directory = "./uploads"))
# Записываем стихи в базу данных, если их еще нет (запускать только если стихов в базе нет).
from .db import database
@app.get("/api/announcements", response_model=List[schemas.Announcement])#адрес объявлений
def announcements_list(db: Annotated[Session, Depends(utils.get_db)], obsolete: Union[bool, None] = False, user_id: Union[int, None] = None,
# получение списка объявлений
@app.get("/api/announcements", response_model=List[pydantic_schemas.Announcement])#адрес объявлений
def announcements_list(db: Annotated[Session, Depends(auth_utils.get_db)], obsolete: Union[bool, None] = False, user_id: Union[int, None] = None,
metro: Union[str, None] = None,category: Union[str, None] = None):
# параметры для сортировки (схема pydantic schemas.SortAnnouncements)
params_to_sort = schemas.SortAnnouncements(obsolete=obsolete, user_id=user_id, metro=metro, category=category)
params_to_sort = pydantic_schemas.SortAnnouncements(obsolete=obsolete, user_id=user_id, metro=metro, category=category)
# получаем результат
result = service.filter_ann(db=db, schema=params_to_sort)
result = add_poems_and_filters.filter_ann(db=db, schema=params_to_sort)
return result
@app.get("/api/announcement") #
def single_announcement(ann_id:int, db: Annotated[Session, Depends(utils.get_db)]): # передаем индекс обявления
# получаем данные одного объявления
@app.get("/api/announcement", response_model=pydantic_schemas.AnnResponce)
def single_announcement(ann_id:int, db: Annotated[Session, Depends(auth_utils.get_db)]): # передаем индекс обявления
# Считываем данные из Body и отображаем их на странице.
# В последствии будем вставлять данные в html-форму
try:
announcement = db.get(models.Announcement, ann_id)
return {"id": announcement.id, "user_id": announcement.user_id, "name": announcement.name,
"category": announcement.category, "best_by": announcement.best_by, "address": announcement.address,
"description": announcement.description, "metro": announcement.metro, "latitude": announcement.latitude,
"longtitude":announcement.longtitude, "trashId": announcement.trashId, "src":announcement.src,
"booked_by":announcement.booked_by}
announcement = db.get(orm_models.Announcement, ann_id)
return announcement
except:
return {"Answer" : False} #если неуданый доступ, то сообщаем об этом
# Занести объявление в базу данных
@app.put("/api/announcement")#адрес объявлений
@app.put("/api/announcement")
def put_in_db(name: Annotated[str, Form()], category: Annotated[str, Form()], bestBy: Annotated[datetime.date, Form()],
address: Annotated[str, Form()], longtitude: Annotated[float, Form()], latitude: Annotated[float, Form()],
description: Annotated[str, Form()], metro: Annotated[str, Form()], current_user: Annotated[schemas.User, Depends(utils.get_current_active_user)],
db: Annotated[Session, Depends(utils.get_db)], src: Union[UploadFile, None] = None, trashId: Annotated[int, Form()] = None):
description: Annotated[str, Form()], metro: Annotated[str, Form()], current_user: Annotated[pydantic_schemas.User, Depends(auth_utils.get_current_active_user)],
db: Annotated[Session, Depends(auth_utils.get_db)], src: Union[UploadFile, None] = None, trashId: Annotated[int, Form()] = None):
try:
# имя загруженного файла по умолчанию - пустая строка
uploaded_name = ""
# если пользователь загрузил картинку
if src:
# процесс сохранения картинки
f = src.file
f.seek(0, os.SEEK_END)
if f.tell() > 0:
@ -80,9 +84,11 @@ def put_in_db(name: Annotated[str, Form()], category: Annotated[str, Form()], be
with destination.open('wb') as buffer:
shutil.copyfileobj(f, buffer)
uploaded_name = "/uploads/"+destination.name
# изменяем название директории загруженного файла
uploaded_name = "/uploads/" + destination.name
temp_ancmt = models.Announcement(user_id=current_user.id, name=name, category=category, best_by=bestBy,
# создаем объект Announcement
temp_ancmt = orm_models.Announcement(user_id=current_user.id, name=name, category=category, best_by=bestBy,
address=address, longtitude=longtitude, latitude=latitude, description=description, metro=metro,
trashId=trashId, src=uploaded_name, booked_by=0)
db.add(temp_ancmt) # добавляем в бд
@ -95,9 +101,10 @@ def put_in_db(name: Annotated[str, Form()], category: Annotated[str, Form()], be
# Удалить объявления из базы
@app.delete("/api/announcement") #адрес объявления
def delete_from_db(announcement: schemas.DelAnnouncement, db: Annotated[Session, Depends(utils.get_db)]): # функция удаления объекта из БД
def delete_from_db(announcement: pydantic_schemas.DelAnnouncement, db: Annotated[Session, Depends(auth_utils.get_db)]): # функция удаления объекта из БД
try:
to_delete = db.query(models.Announcement).filter(models.Announcement.id==announcement.id).first()
# находим объект с заданным id в бд
to_delete = db.query(orm_models.Announcement).filter(orm_models.Announcement.id==announcement.id).first()
db.delete(to_delete) # удаление из БД
db.commit() # сохраняем изменения
return {"Answer" : True}
@ -107,10 +114,10 @@ def delete_from_db(announcement: schemas.DelAnnouncement, db: Annotated[Session,
# Забронировать объявление
@app.post("/api/book")
def change_book_status(data: schemas.Book, current_user: Annotated[schemas.User, Depends(utils.get_current_user)],
db: Annotated[Session, Depends(utils.get_db)]):
def change_book_status(data: pydantic_schemas.Book, current_user: Annotated[pydantic_schemas.User, Depends(auth_utils.get_current_user)],
db: Annotated[Session, Depends(auth_utils.get_db)]):
# Находим объявление по данному id
announcement_to_change = db.query(models.Announcement).filter(models.Announcement.id == data.id).first()
announcement_to_change = db.query(orm_models.Announcement).filter(orm_models.Announcement.id == data.id).first()
# Проверяем, что объявление с данным id существует
if not announcement_to_change:
raise HTTPException(status_code=404, detail="Item not found")
@ -120,20 +127,23 @@ def change_book_status(data: schemas.Book, current_user: Annotated[schemas.User,
else:
# Инкрементируем поле booked_by на 1
announcement_to_change.booked_by += 1
# фиксируем изменения в бд
db.commit()
db.refresh(announcement_to_change)
return {"Success": True}
# reginstration
@app.post("/api/signup")
def create_user(nickname: Annotated[str, Form()], password: Annotated[str, Form()], db: Annotated[Session, Depends(utils.get_db)],
def create_user(nickname: Annotated[str, Form()], password: Annotated[str, Form()], db: Annotated[Session, Depends(auth_utils.get_db)],
name: Annotated[str, Form()]=None, surname: Annotated[str, Form()]=None, avatar: Annotated[UploadFile, Form()]=None):
if db.query(models.User).filter(models.User.nickname == nickname).first() == None:
new_user = models.User(nickname=nickname, hashed_password=utils.get_password_hash(password),
# проверяем, что юзера с введенным никнеймом не существует в бд
if db.query(orm_models.User).filter(orm_models.User.nickname == nickname).first() == None:
# создаем нового юзера
new_user = orm_models.User(nickname=nickname, hashed_password=auth_utils.get_password_hash(password),
name=name, surname=surname, reg_date=datetime.date.today())
# добавляем в бд
db.add(new_user)
db.commit()
db.refresh(new_user) # обновляем состояние объекта
@ -141,34 +151,41 @@ def create_user(nickname: Annotated[str, Form()], password: Annotated[str, Form(
return {"Success": False, "Message": "Пользователь с таким email уже зарегестрирован"}
@app.post("/api/token", response_model=schemas.Token)
# функция для генерации токена после успешного входа пользователя
@app.post("/api/token", response_model=pydantic_schemas.Token)
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: Annotated[Session, Depends(utils.get_db)]
form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: Annotated[Session, Depends(auth_utils.get_db)]
):
user = utils.authenticate_user(db, form_data.username, form_data.password)
# пробуем найти юзера в бд по введенным паролю и никнейму
user = auth_utils.authenticate_user(db, form_data.username, form_data.password)
# если не нашли - кидаем ошибку
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = utils.timedelta(minutes=utils.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = utils.create_access_token(
# задаем временной интервал, в течение которого токен можно использовать
access_token_expires = auth_utils.timedelta(minutes=auth_utils.ACCESS_TOKEN_EXPIRE_MINUTES)
# создаем токен
access_token = auth_utils.create_access_token(
data={"user_id": user.id}, expires_delta=access_token_expires
)
return {"access_token":access_token}
@app.get("/api/users/me", response_model=schemas.User) #
async def read_users_me(current_user: Annotated[schemas.User, Depends(utils.get_current_active_user)]):
# получаем данные успешно вошедшего пользователя
@app.get("/api/users/me", response_model=pydantic_schemas.User) #
async def read_users_me(current_user: Annotated[pydantic_schemas.User, Depends(auth_utils.get_current_active_user)]):
return current_user
# изменяем рейтинг пользователя
@app.post("/api/user/rating")
def add_points(data: schemas.AddRating, current_user: Annotated[schemas.User, Depends(utils.get_current_user)], db: Annotated[Session, Depends(utils.get_db)]):
def add_points(data: pydantic_schemas.AddRating, current_user: Annotated[pydantic_schemas.User, Depends(auth_utils.get_current_user)], db: Annotated[Session, Depends(auth_utils.get_db)]):
# проверяем,
if current_user.id != data.user_id:
user = utils.get_user_by_id(db, data.user_id)
user = auth_utils.get_user_by_id(db, data.user_id)
if not user:
raise HTTPException(status_code=404, detail="Item not found")
user.rating = (user.rating*user.num_of_ratings + data.rate)/(user.num_of_ratings + 1)
@ -180,28 +197,28 @@ def add_points(data: schemas.AddRating, current_user: Annotated[schemas.User, De
# получаем рейтинг пользователя
@app.get("/api/user/rating")
def add_points(user_id: int, db: Annotated[Session, Depends(utils.get_db)]):
user = utils.get_user_by_id(db, user_id=user_id)
def add_points(user_id: int, db: Annotated[Session, Depends(auth_utils.get_db)]):
user = auth_utils.get_user_by_id(db, user_id=user_id)
if not user:
raise HTTPException(status_code=404, detail="Item not found")
return {"rating": user.rating}
# Отправляем стихи
@app.get("/api/user/poem", response_model=schemas.Poem) # пока не работает
def poems_to_front(db: Annotated[Session, Depends(utils.get_db)]): # db: Annotated[Session, Depends(utils.get_db)]
num_of_poems = db.query(models.Poems).count() # определяем кол-во стихов в бд
@app.get("/api/user/poem", response_model=pydantic_schemas.Poem) # пока не работает
def poems_to_front(db: Annotated[Session, Depends(auth_utils.get_db)]): # db: Annotated[Session, Depends(utils.get_db)]
num_of_poems = db.query(orm_models.Poems).count() # определяем кол-во стихов в бд
if num_of_poems < 1:
service.add_poems_to_db(database) # добавляем поэмы в базу данных
add_poems_and_filters.add_poems_to_db(database) # добавляем поэмы в базу данных
rand_id = random.randint(1, num_of_poems) # генерируем номер стихотворения
poem = db.query(models.Poems).filter(models.Poems.id == rand_id).first() # находим стих в бд
poem = db.query(orm_models.Poems).filter(orm_models.Poems.id == rand_id).first() # находим стих в бд
if not poem:
raise HTTPException(status_code=404, detail="Poem not found")
return poem
@app.get("/api/trashbox", response_model=List[schemas.TrashboxResponse])
def get_trashboxes(data: schemas.TrashboxRequest = Depends()):#крутая функция для работы с api
@app.get("/api/trashbox", response_model=List[pydantic_schemas.TrashboxResponse])
def get_trashboxes(data: pydantic_schemas.TrashboxRequest = Depends()):#крутая функция для работы с api
# json, передаваемый стороннему API
BASE_URL= "https://geointelect2.gate.petersburg.ru"
my_token="eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJhU1RaZm42bHpTdURYcUttRkg1SzN5UDFhT0FxUkhTNm9OendMUExaTXhFIn0.eyJleHAiOjE3ODYyMjUzMzMsImlhdCI6MTY5MTUzMDkzMywianRpIjoiYjU0MmU3MTQtYzJkMS00NTY2LWJkY2MtYmQ5NzA0ODY1ZjgzIiwiaXNzIjoiaHR0cHM6Ly9rYy5wZXRlcnNidXJnLnJ1L3JlYWxtcy9lZ3MtYXBpIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6ImJjYjQ2NzljLTU3ZGItNDU5ZC1iNWUxLWRlOGI4Yzg5MTMwMyIsInR5cCI6IkJlYXJlciIsImF6cCI6ImFkbWluLXJlc3QtY2xpZW50Iiwic2Vzc2lvbl9zdGF0ZSI6IjJhOTgwMzUyLTY1M2QtNGZlZC1iMDI1LWQ1N2U0NDRjZmM3NiIsImFjciI6IjEiLCJhbGxvd2VkLW9yaWdpbnMiOlsiLyoiXSwicmVhbG1fYWNjZXNzIjp7InJvbGVzIjpbImRlZmF1bHQtcm9sZXMtZWdzLWFwaSIsIm9mZmxpbmVfYWNjZXNzIiwidW1hX2F1dGhvcml6YXRpb24iXX0sInJlc291cmNlX2FjY2VzcyI6eyJhY2NvdW50Ijp7InJvbGVzIjpbIm1hbmFnZS1hY2NvdW50IiwibWFuYWdlLWFjY291bnQtbGlua3MiLCJ2aWV3LXByb2ZpbGUiXX19LCJzY29wZSI6ImVtYWlsIHByb2ZpbGUiLCJzaWQiOiIyYTk4MDM1Mi02NTNkLTRmZWQtYjAyNS1kNTdlNDQ0Y2ZjNzYiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsIm5hbWUiOiLQktC70LDQtNC40LzQuNGAINCv0LrQvtCy0LvQtdCyIiwicHJlZmVycmVkX3VzZXJuYW1lIjoiZTBmYzc2OGRhOTA4MjNiODgwZGQzOGVhMDJjMmQ5NTciLCJnaXZlbl9uYW1lIjoi0JLQu9Cw0LTQuNC80LjRgCIsImZhbWlseV9uYW1lIjoi0K_QutC-0LLQu9C10LIifQ.FTKiC1hpWcOkmSW9QZpC-RY7Ko50jw1mDMfXIWYxlQ-zehLm2CLmOnHvYoOoI39k2OzeCIAB9ZdRrrGZc6G9Z1eFELUjNGEqKxSC1Phj9ATemKgbOKEttk-OGc-rFr9VPA8_SnfvLts6wTI2YK33YBIxCF5nCbnr4Qj3LeEQ0d6Hy8PO4ATrBF5EOeuAZRprvIEjXe_f8N9ONKckCPB-xFB4P2pZlVXGoCNoewGEcY3zXH4khezN6zcVr6tpc6G8dBv9EqT_v92IDSg-aXQk6ysA0cO0-6x5w1-_qU0iHGIAPsLNV9IKBoFbjc0JH6cWabldPRH12NP1trvYfqKDGQ"
@ -250,7 +267,7 @@ def get_trashboxes(data: schemas.TrashboxRequest = Depends()):#крутая фу
for a in list_of_category:
if a in temp_dict["Categories"] and temp_dict not in trashboxes:
trashboxes.append(temp_dict)
uniq_trashboxes = [schemas.TrashboxResponse(**ast.literal_eval(el1)) for el1 in set([str(el2) for el2 in trashboxes])]
uniq_trashboxes = [pydantic_schemas.TrashboxResponse(**ast.literal_eval(el1)) for el1 in set([str(el2) for el2 in trashboxes])]
return uniq_trashboxes
@ -260,22 +277,22 @@ async def react_app(req: Request, rest_of_path: str):
@app.post("/api/announcement/dispose")
def dispose(data: schemas.DisposeRequest, current_user_schema: Annotated[schemas.User, Depends(utils.get_current_user)],
db: Annotated[Session, Depends(utils.get_db)]):
def dispose(data: pydantic_schemas.DisposeRequest, current_user_schema: Annotated[pydantic_schemas.User, Depends(auth_utils.get_current_user)],
db: Annotated[Session, Depends(auth_utils.get_db)]):
# Находим в бд текущего юзера
current_user = utils.get_user_by_id(db, current_user_schema.id)
current_user = auth_utils.get_user_by_id(db, current_user_schema.id)
# Начисляем баллы пользователю за утилизацию
current_user.points += 60
# В полученном json переходим к данным мусорки
data_trashbox = data.trashbox
# создаем запись models.Trashbox
new_trashox = models.Trashbox(user_id=current_user.id, date_of_choice=datetime.date.today(), name=data_trashbox.Name,
new_trashox = orm_models.Trashbox(user_id=current_user.id, date_of_choice=datetime.date.today(), name=data_trashbox.Name,
latitude=data_trashbox.Lat, longtitude=data_trashbox.Lng, address=data_trashbox.Address, category=data_trashbox.Category)
# добавляем в бд
db.add(new_trashox)
# в соответствии с логикой api, после утилизации объявление пользователя удаляется
# находим объявление с айди data.ann_id
ann_to_del = db.query(models.Announcement).filter(models.Announcement.id == data.ann_id).first() # находим стих в бд
ann_to_del = db.query(orm_models.Announcement).filter(orm_models.Announcement.id == data.ann_id).first() # находим стих в бд
if not ann_to_del:
raise HTTPException(status_code=404, detail="Announcement not found")
# удаляем объявление из бд