#подключение библиотек from fastapi import FastAPI, Response, Path, Depends, Body, Form, Query, status, HTTPException, APIRouter, UploadFile, File from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer from fastapi.templating import Jinja2Templates from fastapi.requests import Request from pydantic import json from typing import Any, Annotated, List, Union from starlette.staticfiles import StaticFiles from sqlalchemy.orm import Session import requests from uuid import uuid4 import random import datetime import asyncio import ast import pathlib import shutil import os from . import schemas, models, utils, service app = FastAPI() templates = Jinja2Templates(directory="./front/dist") app.mount("/static", StaticFiles(directory = "./front/dist")) if not os.path.exists("./uploads"): os.mkdir("./uploads") app.mount("/uploads", StaticFiles(directory = "./uploads")) # Записываем стихи в базу данных, если их еще нет (запускать только если стихов в базе нет). # add_poems_to_db(database) @app.get("/api/announcements", response_model=List[schemas.Announcement])#адрес объявлений def announcements_list(db: Annotated[Session, Depends(utils.get_db)], obsolete: Union[bool, None] = False, user_id: Union[int, None] = None, metro: Union[str, None] = None,category: Union[str, None] = None): # параметры для сортировки (схема pydantic schemas.SortAnnouncements) params_to_sort = schemas.SortAnnouncements(obsolete=obsolete, user_id=user_id, metro=metro, category=category) # получаем результат result = service.filter_ann(db=db, schema=params_to_sort) return result @app.get("/api/announcement") # def single_announcement(ann_id:int, db: Annotated[Session, Depends(utils.get_db)]): # передаем индекс обявления # Считываем данные из Body и отображаем их на странице. # В последствии будем вставлять данные в html-форму try: announcement = db.get(models.Announcement, ann_id) return {"id": announcement.id, "user_id": announcement.user_id, "name": announcement.name, "category": announcement.category, "best_by": announcement.best_by, "address": announcement.address, "description": announcement.description, "metro": announcement.metro, "latitude": announcement.latitude, "longtitude":announcement.longtitude, "trashId": announcement.trashId, "src":announcement.src, "booked_by":announcement.booked_by} except: return {"Answer" : False} #если неуданый доступ, то сообщаем об этом # Занести объявление в базу данных @app.put("/api/announcement")#адрес объявлений def put_in_db(name: Annotated[str, Form()], category: Annotated[str, Form()], bestBy: Annotated[datetime.date, Form()], address: Annotated[str, Form()], longtitude: Annotated[float, Form()], latitude: Annotated[float, Form()], description: Annotated[str, Form()], metro: Annotated[str, Form()], current_user: Annotated[schemas.User, Depends(utils.get_current_active_user)], db: Annotated[Session, Depends(utils.get_db)], src: Union[UploadFile, None] = None, trashId: Annotated[int, Form()] = None): try: uploaded_name = "" if src: f = src.file f.seek(0, os.SEEK_END) if f.tell() > 0: f.seek(0) destination = pathlib.Path("./uploads/" + str(hash(f)) + pathlib.Path(src.filename).suffix.lower()) with destination.open('wb') as buffer: shutil.copyfileobj(f, buffer) uploaded_name = "/uploads/"+destination.name temp_ancmt = models.Announcement(user_id=current_user.id, name=name, category=category, best_by=bestBy, address=address, longtitude=longtitude, latitude=latitude, description=description, metro=metro, trashId=trashId, src=uploaded_name, booked_by=0) db.add(temp_ancmt) # добавляем в бд db.commit() # сохраняем изменения db.refresh(temp_ancmt) # обновляем состояние объекта return {"Answer" : True} except: return {"Answer" : False} # Удалить объявления из базы @app.delete("/api/announcement") #адрес объявления def delete_from_db(announcement: schemas.DelAnnouncement, db: Annotated[Session, Depends(utils.get_db)]): # функция удаления объекта из БД try: to_delete = db.query(models.Announcement).filter(models.Announcement.id==announcement.id).first() db.delete(to_delete) # удаление из БД db.commit() # сохраняем изменения return {"Answer" : True} except: return {"Answer" : False} # Забронировать объявление @app.post("/api/book") def change_book_status(data: schemas.Book, current_user: Annotated[schemas.User, Depends(utils.get_current_user)], db: Annotated[Session, Depends(utils.get_db)]): # Находим объявление по данному id announcement_to_change = db.query(models.Announcement).filter(models.Announcement.id == data.id).first() # Проверяем, что объявление с данным id существует if not announcement_to_change: raise HTTPException(status_code=404, detail="Item not found") # Проверяем, что объявление бронирует не владелец if current_user.id == announcement_to_change.user_id: raise HTTPException(status_code=403, detail="A user can't book his announcement") else: # Инкрементируем поле booked_by на 1 announcement_to_change.booked_by += 1 db.commit() db.refresh(announcement_to_change) return {"Success": True} # reginstration @app.post("/api/signup") def create_user(nickname: Annotated[str, Form()], password: Annotated[str, Form()], db: Annotated[Session, Depends(utils.get_db)], name: Annotated[str, Form()]=None, surname: Annotated[str, Form()]=None, avatar: Annotated[UploadFile, Form()]=None): if db.query(models.User).filter(models.User.nickname == nickname).first() == None: new_user = models.User(nickname=nickname, hashed_password=utils.get_password_hash(password), name=name, surname=surname, reg_date=datetime.date.today()) db.add(new_user) db.commit() db.refresh(new_user) # обновляем состояние объекта return {"Success": True} return {"Success": False, "Message": "Пользователь с таким email уже зарегестрирован"} @app.post("/api/token", response_model=schemas.Token) async def login_for_access_token( form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: Annotated[Session, Depends(utils.get_db)] ): user = utils.authenticate_user(db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = utils.timedelta(minutes=utils.ACCESS_TOKEN_EXPIRE_MINUTES) access_token = utils.create_access_token( data={"user_id": user.id}, expires_delta=access_token_expires ) return {"access_token":access_token} @app.get("/api/users/me", response_model=schemas.User) # async def read_users_me(current_user: Annotated[schemas.User, Depends(utils.get_current_active_user)]): return current_user # изменяем рейтинг пользователя @app.post("/api/user/rating") def add_points(data: schemas.AddRating, current_user: Annotated[schemas.User, Depends(utils.get_current_user)], db: Annotated[Session, Depends(utils.get_db)]): if current_user.id != data.user_id: user = utils.get_user_by_id(db, data.user_id) if not user: raise HTTPException(status_code=404, detail="Item not found") user.rating = (user.rating*user.num_of_ratings + data.rate)/(user.num_of_ratings + 1) user.num_of_ratings += 1 db.commit() db.refresh(user) # обновляем состояние объекта return {"Success": True} # получаем рейтинг пользователя @app.get("/api/user/rating") def add_points(user_id: int, db: Annotated[Session, Depends(utils.get_db)]): user = utils.get_user_by_id(db, user_id=user_id) if not user: raise HTTPException(status_code=404, detail="Item not found") return {"rating": user.rating} # Отправляем стихи @app.get("/api/user/poem", response_model=schemas.Poem) # пока не работает def poems_to_front(db: Annotated[Session, Depends(utils.get_db)]): # db: Annotated[Session, Depends(utils.get_db)] num_of_poems = db.query(models.Poems).count() # определяем кол-во стихов в бд rand_id = random.randint(1, num_of_poems) # генерируем номер стихотворения poem = db.query(models.Poems).filter(models.Poems.id == rand_id).first() # находим стих в бд if not poem: raise HTTPException(status_code=404, detail="Poem not found") return poem @app.get("/api/trashbox", response_model=List[schemas.TrashboxResponse]) def get_trashboxes(data: schemas.TrashboxRequest = Depends()):#крутая функция для работы с api # json, передаваемый стороннему API BASE_URL= "https://geointelect2.gate.petersburg.ru" my_token="eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJhU1RaZm42bHpTdURYcUttRkg1SzN5UDFhT0FxUkhTNm9OendMUExaTXhFIn0.eyJleHAiOjE3ODYyMjUzMzMsImlhdCI6MTY5MTUzMDkzMywianRpIjoiYjU0MmU3MTQtYzJkMS00NTY2LWJkY2MtYmQ5NzA0ODY1ZjgzIiwiaXNzIjoiaHR0cHM6Ly9rYy5wZXRlcnNidXJnLnJ1L3JlYWxtcy9lZ3MtYXBpIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6ImJjYjQ2NzljLTU3ZGItNDU5ZC1iNWUxLWRlOGI4Yzg5MTMwMyIsInR5cCI6IkJlYXJlciIsImF6cCI6ImFkbWluLXJlc3QtY2xpZW50Iiwic2Vzc2lvbl9zdGF0ZSI6IjJhOTgwMzUyLTY1M2QtNGZlZC1iMDI1LWQ1N2U0NDRjZmM3NiIsImFjciI6IjEiLCJhbGxvd2VkLW9yaWdpbnMiOlsiLyoiXSwicmVhbG1fYWNjZXNzIjp7InJvbGVzIjpbImRlZmF1bHQtcm9sZXMtZWdzLWFwaSIsIm9mZmxpbmVfYWNjZXNzIiwidW1hX2F1dGhvcml6YXRpb24iXX0sInJlc291cmNlX2FjY2VzcyI6eyJhY2NvdW50Ijp7InJvbGVzIjpbIm1hbmFnZS1hY2NvdW50IiwibWFuYWdlLWFjY291bnQtbGlua3MiLCJ2aWV3LXByb2ZpbGUiXX19LCJzY29wZSI6ImVtYWlsIHByb2ZpbGUiLCJzaWQiOiIyYTk4MDM1Mi02NTNkLTRmZWQtYjAyNS1kNTdlNDQ0Y2ZjNzYiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsIm5hbWUiOiLQktC70LDQtNC40LzQuNGAINCv0LrQvtCy0LvQtdCyIiwicHJlZmVycmVkX3VzZXJuYW1lIjoiZTBmYzc2OGRhOTA4MjNiODgwZGQzOGVhMDJjMmQ5NTciLCJnaXZlbl9uYW1lIjoi0JLQu9Cw0LTQuNC80LjRgCIsImZhbWlseV9uYW1lIjoi0K_QutC-0LLQu9C10LIifQ.FTKiC1hpWcOkmSW9QZpC-RY7Ko50jw1mDMfXIWYxlQ-zehLm2CLmOnHvYoOoI39k2OzeCIAB9ZdRrrGZc6G9Z1eFELUjNGEqKxSC1Phj9ATemKgbOKEttk-OGc-rFr9VPA8_SnfvLts6wTI2YK33YBIxCF5nCbnr4Qj3LeEQ0d6Hy8PO4ATrBF5EOeuAZRprvIEjXe_f8N9ONKckCPB-xFB4P2pZlVXGoCNoewGEcY3zXH4khezN6zcVr6tpc6G8dBv9EqT_v92IDSg-aXQk6ysA0cO0-6x5w1-_qU0iHGIAPsLNV9IKBoFbjc0JH6cWabldPRH12NP1trvYfqKDGQ" head = {'Authorization': 'Bearer {}'.format(my_token)} # Данные пользователя (местоположение, количество мусорок, которое пользователь хочет видеть) my_data={ 'x' : f"{data.Lng}", 'y' : f"{data.Lat}", 'limit' : '1' } # Перевод категории с фронта на категорию с сайта match data.Category: case "PORRIDGE": list_of_category = ["Опасные отходы", "Иное"] case "conspects": list_of_category = ["Бумага"] case "milk": list_of_category = ["Стекло", "Тетра Пак", "Иное"] case "bred": list_of_category = ["Пластик", "Иное"] case "wathing": list_of_category = ["Пластик", "Опасные отходы", "Иное"] case "cloth": list_of_category = ["Одежда"] case "fruits_vegatables": list_of_category = ["Иное"] case "other_things": list_of_category = ["Металл", "Бумага", "Стекло", "Иное", "Тетра Пак", "Батарейки", "Крышечки", "Шины", "Опасные отходы", "Лампочки", "Пластик"] # Получение ответа от стороннего апи response = requests.post(f"{BASE_URL}/nearest_recycling/get", headers=head, data=my_data, timeout=10) infos = response.json() # Чтение ответа trashboxes = [] for trashbox in infos["results"]: temp_dict = {} for obj in trashbox["Objects"]: coord_list = obj["geometry"] temp_dict["Lat"] = coord_list["coordinates"][1] temp_dict["Lng"] = coord_list["coordinates"][0] properties = obj["properties"] temp_dict["Name"] = properties["title"] temp_dict["Address"] = properties["address"] temp_dict["Categories"] = properties["content_text"].split(',') for a in list_of_category: if a in temp_dict["Categories"] and temp_dict not in trashboxes: trashboxes.append(temp_dict) uniq_trashboxes = [schemas.TrashboxResponse(**ast.literal_eval(el1)) for el1 in set([str(el2) for el2 in trashboxes])] return uniq_trashboxes @app.get("/{rest_of_path:path}") async def react_app(req: Request, rest_of_path: str): return templates.TemplateResponse('index.html', { 'request': req }) @app.post("/api/announcement/dispose") def dispose(data: schemas.DisposeRequest, current_user_schema: Annotated[schemas.User, Depends(utils.get_current_user)], db: Annotated[Session, Depends(utils.get_db)]): # Находим в бд текущего юзера current_user = utils.get_user_by_id(db, current_user_schema.id) # Начисляем баллы пользователю за утилизацию current_user.points += 60 # В полученном json переходим к данным мусорки data_trashbox = data.trashbox # создаем запись models.Trashbox new_trashox = models.Trashbox(user_id=current_user.id, date_of_choice=datetime.date.today(), name=data_trashbox.Name, latitude=data_trashbox.Lat, longtitude=data_trashbox.Lng, address=data_trashbox.Address, category=data_trashbox.Category) # добавляем в бд db.add(new_trashox) # в соответствии с логикой api, после утилизации объявление пользователя удаляется # находим объявление с айди data.ann_id ann_to_del = db.query(models.Announcement).filter(models.Announcement.id == data.ann_id).first() # находим стих в бд if not ann_to_del: raise HTTPException(status_code=404, detail="Announcement not found") # удаляем объявление из бд db.delete(ann_to_del) db.commit() db.refresh(new_trashox) # обновляем состояние объекта return {"Success": True}