#подключение библиотек from fastapi import FastAPI, Response, Path, Depends, Body, Form, Query, status, HTTPException, APIRouter, UploadFile, File from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer from fastapi.templating import Jinja2Templates from fastapi.requests import Request from pydantic import json from typing import Any, Annotated, List, Union from starlette.staticfiles import StaticFiles import requests from uuid import uuid4 import random import datetime import ast import pathlib import shutil import os from dotenv import load_dotenv load_dotenv() from .db import Base, engine, SessionLocal, database from .service import add_poems_to_db, generate_poem, check_obsolete, get_query_results from .scheduler import app as app_rocketry from . import schemas, models, utils Base.metadata.create_all(bind=engine) app = FastAPI() templates = Jinja2Templates(directory="./front/dist") app.mount("/static", StaticFiles(directory = "./front/dist")) if not os.path.exists("./uploads"): os.mkdir("./uploads") app.mount("/uploads", StaticFiles(directory = "./uploads")) # Записываем стихи в базу данных, если их еще нет (запускать только если стихов в базе нет). # add_poems_to_db(database) # Каждый день в 00.00 проверяем все объявления (сравниваем текущую дату и срок годности) # session_rocketry = app_rocketry.session @app.get("/api/announcements", response_model=List[schemas.Announcement])#адрес объявлений def announcements_list(obsolete: Union[bool, None] = False, user_id: Union[int, None] = None, metro: Union[str, None] = None, category: Union[str, None] = None): # Считываем данные из Body и отображаем их на странице. # В последствии будем вставлять данные в html-форму params_to_sort = schemas.SortAnnouncements(obsolete=obsolete, user_id=user_id, metro=metro, category=category) # Фильтруем по сроку годности # not_expired = check_obsolete(current_date=datetime.date.today()) # Фильтруем по другим параметрам и делаем пересечение с not_expired # result = not_expired.intersect(get_query_results(params_to_sort)) result = get_query_results(db=database, schema=params_to_sort) return result @app.get("/api/announcement")#адрес объявления def single_announcement(user_id:int): # Считываем данные из Body и отображаем их на странице. # В последствии будем вставлять данные в html-форму try: announcement = database.get(models.Announcement, user_id) return {"id": announcement.id, "user_id": announcement.user_id, "name": announcement.name, "category": announcement.category, "best_by": announcement.best_by, "address": announcement.address, "description": announcement.description, "metro": announcement.metro, "latitude": announcement.latitude, "longtitude":announcement.longtitude, "trashId": announcement.trashId, "src":announcement.src, "booked_by":announcement.booked_by} except: return {"Answer" : False} #если неуданый доступ, то сообщаем об этом # Занести объявление в базу данных @app.put("/api/announcement")#адрес объявлений def put_in_db(name: Annotated[str, Form()], category: Annotated[str, Form()], bestBy: Annotated[datetime.date, Form()], address: Annotated[str, Form()], longtitude: Annotated[float, Form()], latitude: Annotated[float, Form()], description: Annotated[str, Form()], metro: Annotated[str, Form()], current_user: Annotated[schemas.User, Depends(utils.get_current_active_user)], src: Union[UploadFile, None] = None, trashId: Annotated[int, Form()] = None): try: uploaded_name = "" if src: f = src.file f.seek(0, os.SEEK_END) if f.tell() > 0: f.seek(0) destination = pathlib.Path("./uploads/" + str(hash(f)) + pathlib.Path(src.filename).suffix.lower()) with destination.open('wb') as buffer: shutil.copyfileobj(f, buffer) uploaded_name = "/uploads/"+destination.name temp_ancmt = models.Announcement(user_id=current_user.id, name=name, category=category, best_by=bestBy, address=address, longtitude=longtitude, latitude=latitude, description=description, metro=metro, trashId=trashId, booked_by=0) database.add(temp_ancmt) # добавляем в бд database.commit() # сохраняем изменения database.refresh(temp_ancmt) # обновляем состояние объекта return {"Answer" : True} except: return {"Answer" : False} # Удалить объявления из базы @app.delete("/api/announcement") #адрес объявления def delete_from_db(announcement: schemas.DelAnnouncement): # функция удаления объекта из БД try: to_delete = database.query(models.Announcement).filter(models.Announcement.id==announcement.id).first() database.delete(to_delete) # удаление из БД database.commit() # сохраняем изменения return {"Answer" : True} except: return {"Answer" : False} # Забронировать объявление @app.post("/api/book") def change_book_status(data: schemas.Book): try: # Находим объявление по данному id announcement_to_change = database.query(models.Announcement).filter(Announcement.id == data.id).first() # Изменяем поле booked_status на полученный id announcement_to_change.booked_status += 1 return {"Success": True} except: return {"Success": False} # reginstration @app.post("/api/signup") def create_user(nickname: Annotated[str, Form()], password: Annotated[str, Form()], name: Annotated[str, Form()]=None, surname: Annotated[str, Form()]=None, avatar: Annotated[UploadFile, Form()]=None): if database.query(models.User).filter(models.User.nickname == nickname).first() == None: new_user = models.User(nickname=nickname, hashed_password=utils.get_password_hash(password), name=name, surname=surname, reg_date=datetime.date.today()) database.add(new_user) database.commit() database.refresh(new_user) # обновляем состояние объекта return {"Success": True} return {"Success": False, "Message": "Пользователь с таким email уже зарегестрирован"} @app.post("/api/token", response_model=schemas.Token) async def login_for_access_token( form_data: Annotated[OAuth2PasswordRequestForm, Depends()] ): user = utils.authenticate_user(database, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = utils.timedelta(minutes=utils.ACCESS_TOKEN_EXPIRE_MINUTES) access_token = utils.create_access_token( data={"user_id": user.id}, expires_delta=access_token_expires ) return {"access_token":access_token} @app.get("/api/users/me", response_model=schemas.User) # async def read_users_me(current_user: Annotated[schemas.User, Depends(utils.get_current_active_user)]): return current_user @app.get("/api/users/me/items") async def read_own_items( current_user: Annotated[schemas.User, Depends(utils.get_current_active_user)] ): return [{"Current user name": current_user.name, "Current user surname": current_user.surname}] # изменяем рейтинг пользователя @app.post("/api/user/rating") def add_points(data: schemas.AddRating, current_user: Annotated[schemas.User, Depends(utils.get_current_user)]): if current_user.id != data.user_id: user = utils.get_user_by_id(db=database, user_id=data.user_id) if not user: raise HTTPException(status_code=404, detail="Item not found") user.rating = (user.rating*user.num_of_ratings + data.rate)/(user.num_of_ratings + 1) user.num_of_ratings += 1 database.commit() database.refresh(user) # обновляем состояние объекта return {"Success": True} # получаем рейтинг пользователя @app.get("/api/user/rating") def add_points(user_id: int): user = utils.get_user_by_id(db=database, user_id=user_id) if not user: raise HTTPException(status_code=404, detail="Item not found") return {"rating": user.rating} # Отправляем стихи @app.get("/api/user/poem") # пока не работает def poems_to_front(): # db: Annotated[Session, Depends(utils.get_db)] kolvo_stixov = database.query(models.Poems).count() # пока количество стихотворений = 101 if kolvo_stixov > 1: rand_id = random.randint(1, kolvo_stixov) # номер стихотворения poem_json = dict() poem = database.query(models.Poems).filter(models.Poems.id == rand_id).first() poem_json = {"id": rand_id, "title": poem.title, "text": poem.text, "author": poem.author} return poem_json else: raise HTTPException(status_code=404, detail="Poems not found") @app.get("/api/trashbox", response_model=List[schemas.TrashboxResponse]) def get_trashboxes(lat:float, lng:float):#крутая функция для работы с api #BASE_URL=os.getenv("DOMAIN") # адрес сайта #my_token=os.getenv("TOKEN") # токен BASE_URL= "https://geointelect2.gate.petersburg.ru" my_token="eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJhU1RaZm42bHpTdURYcUttRkg1SzN5UDFhT0FxUkhTNm9OendMUExaTXhFIn0.eyJleHAiOjE3ODYyMjUzMzMsImlhdCI6MTY5MTUzMDkzMywianRpIjoiYjU0MmU3MTQtYzJkMS00NTY2LWJkY2MtYmQ5NzA0ODY1ZjgzIiwiaXNzIjoiaHR0cHM6Ly9rYy5wZXRlcnNidXJnLnJ1L3JlYWxtcy9lZ3MtYXBpIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6ImJjYjQ2NzljLTU3ZGItNDU5ZC1iNWUxLWRlOGI4Yzg5MTMwMyIsInR5cCI6IkJlYXJlciIsImF6cCI6ImFkbWluLXJlc3QtY2xpZW50Iiwic2Vzc2lvbl9zdGF0ZSI6IjJhOTgwMzUyLTY1M2QtNGZlZC1iMDI1LWQ1N2U0NDRjZmM3NiIsImFjciI6IjEiLCJhbGxvd2VkLW9yaWdpbnMiOlsiLyoiXSwicmVhbG1fYWNjZXNzIjp7InJvbGVzIjpbImRlZmF1bHQtcm9sZXMtZWdzLWFwaSIsIm9mZmxpbmVfYWNjZXNzIiwidW1hX2F1dGhvcml6YXRpb24iXX0sInJlc291cmNlX2FjY2VzcyI6eyJhY2NvdW50Ijp7InJvbGVzIjpbIm1hbmFnZS1hY2NvdW50IiwibWFuYWdlLWFjY291bnQtbGlua3MiLCJ2aWV3LXByb2ZpbGUiXX19LCJzY29wZSI6ImVtYWlsIHByb2ZpbGUiLCJzaWQiOiIyYTk4MDM1Mi02NTNkLTRmZWQtYjAyNS1kNTdlNDQ0Y2ZjNzYiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsIm5hbWUiOiLQktC70LDQtNC40LzQuNGAINCv0LrQvtCy0LvQtdCyIiwicHJlZmVycmVkX3VzZXJuYW1lIjoiZTBmYzc2OGRhOTA4MjNiODgwZGQzOGVhMDJjMmQ5NTciLCJnaXZlbl9uYW1lIjoi0JLQu9Cw0LTQuNC80LjRgCIsImZhbWlseV9uYW1lIjoi0K_QutC-0LLQu9C10LIifQ.FTKiC1hpWcOkmSW9QZpC-RY7Ko50jw1mDMfXIWYxlQ-zehLm2CLmOnHvYoOoI39k2OzeCIAB9ZdRrrGZc6G9Z1eFELUjNGEqKxSC1Phj9ATemKgbOKEttk-OGc-rFr9VPA8_SnfvLts6wTI2YK33YBIxCF5nCbnr4Qj3LeEQ0d6Hy8PO4ATrBF5EOeuAZRprvIEjXe_f8N9ONKckCPB-xFB4P2pZlVXGoCNoewGEcY3zXH4khezN6zcVr6tpc6G8dBv9EqT_v92IDSg-aXQk6ysA0cO0-6x5w1-_qU0iHGIAPsLNV9IKBoFbjc0JH6cWabldPRH12NP1trvYfqKDGQ" head = {'Authorization': 'Bearer {}'.format(my_token)} my_data={ 'x' : f"{lng}", 'y' : f"{lat}", 'limit' : '1' } response = requests.post(f"{BASE_URL}/nearest_recycling/get", headers=head, data=my_data) infos = response.json() trashboxes = [] for trashbox in infos["results"]: temp_dict = {} for obj in trashbox["Objects"]: coord_list = obj["geometry"] temp_dict["Lat"] = coord_list["coordinates"][1] temp_dict["Lng"] = coord_list["coordinates"][0] properties = obj["properties"] temp_dict["Name"] = properties["title"] temp_dict["Address"] = properties["address"] temp_dict["Categories"] = properties["content_text"].split(',') trashboxes.append(temp_dict) uniq_trashboxes = [ast.literal_eval(el1) for el1 in set([str(el2) for el2 in trashboxes])] return JSONResponse(uniq_trashboxes) @app.get("/{rest_of_path:path}") async def react_app(req: Request, rest_of_path: str): return templates.TemplateResponse('index.html', { 'request': req }) @app.post("/api/announcement/dispose") def dispose(data: schemas.DisposeRequest, current_user_schema: Annotated[schemas.User, Depends(utils.get_current_user)]): current_user = utils.get_user_by_id(database, current_user_schema.id) current_user.points += 60 new_trashox = models.Trashbox(user_id=current_user.id, date_of_choice=datetime.date.today(), name=data.Name, latitude=data.Lat, longtitude=data.Lng, address=data.Address, categories=data.Categories) database.add(new_trashox) database.commit() database.refresh(new_trashox) # обновляем состояние объекта