porridger/back/main.py

236 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#подключение библиотек
from fastapi import FastAPI, Response, Path, Depends, Body, Form, Query, status, HTTPException, APIRouter, UploadFile, File
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
from fastapi.templating import Jinja2Templates
from fastapi.requests import Request
from pydantic import json
from typing import Any, Annotated, List, Union
from starlette.staticfiles import StaticFiles
import requests
from uuid import uuid4
import random
import datetime
import ast
import pathlib
import shutil
import os
from .db import Base, engine, SessionLocal, database
from .service import add_poems_to_db, generate_poem, check_obsolete, get_query_results
from .scheduler import app as app_rocketry
from . import schemas, models, utils
Base.metadata.create_all(bind=engine)
app = FastAPI()
templates = Jinja2Templates(directory="./front/dist")
app.mount("/static", StaticFiles(directory = "./front/dist"))
if not os.path.exists("./uploads"):
os.mkdir("./uploads")
app.mount("/uploads", StaticFiles(directory = "./uploads"))
# Записываем стихи в базу данных, если их еще нет (запускать только если стихов в базе нет).
# add_poems_to_db(database)
# Каждый день в 00.00 проверяем все объявления (сравниваем текущую дату и срок годности)
# session_rocketry = app_rocketry.session
@app.get("/api/announcements", response_model=List[schemas.Announcement])#адрес объявлений
def annoncements_list(obsolete: Union[bool, None] = False, user_id: Union[int, None] = None, metro: Union[str, None] = None,
category: Union[str, None] = None):
# Считываем данные из Body и отображаем их на странице.
# В последствии будем вставлять данные в html-форму
params_to_sort = schemas.SortAnnouncements(obsolete=obsolete, user_id=user_id, metro=metro, category=category)
# Фильтруем по сроку годности
# not_expired = check_obsolete(current_date=datetime.date.today())
# Фильтруем по другим параметрам и делаем пересечение с not_expired
# result = not_expired.intersect(get_query_results(params_to_sort))
result = get_query_results(db=database, schema=params_to_sort)
return result
@app.get("/api/announcement")#адрес объявления
def single_annoncement(user_id:int):
# Считываем данные из Body и отображаем их на странице.
# В последствии будем вставлять данные в html-форму
try:
annoncement = database.get(models.Announcement, user_id)
return {"id": annoncement.id, "user_id": annoncement.user_id, "name": annoncement.name,
"category": annoncement.category, "best_by": annoncement.best_by, "address": annoncement.address,
"description": annoncement.description, "metro": annoncement.metro, "latitude": annoncement.latitude,
"longtitude":annoncement.longtitude, "trashId": annoncement.trashId, "src":annoncement.src,
"booked_by":annoncement.booked_by}
except:
return {"Answer" : False} #если неуданый доступ, то сообщаем об этом
# Занести объявление в базу данных
@app.put("/api/announcement")#адрес объявлений
def put_in_db(name: Annotated[str, Form()], category: Annotated[str, Form()], bestBy: Annotated[datetime.date, Form()],
address: Annotated[str, Form()], longtitude: Annotated[float, Form()], latitude: Annotated[float, Form()],
description: Annotated[str, Form()], src: UploadFile, metro: Annotated[str, Form()],
trashId: Annotated[int, Form()] = None):
# try:
userId = 1 # temporary
uploaded_name = ""
f = src.file
f.seek(0, os.SEEK_END)
if f.tell() > 0:
f.seek(0)
destination = pathlib.Path("./uploads/" + str(hash(src.file)) + pathlib.Path(src.filename).suffix.lower())
with destination.open('wb') as buffer:
shutil.copyfileobj(src.file, buffer)
uploaded_name = "/uploads/"+destination.name
temp_ancmt = models.Announcement(user_id=userId, name=name, category=category, best_by=bestBy, address=address, longtitude=longtitude, latitude=latitude, description=description, src=uploaded_name, metro=metro, trashId=trashId, booked_by=-1)
database.add(temp_ancmt) # добавляем в бд
database.commit() # сохраняем изменения
database.refresh(temp_ancmt) # обновляем состояние объекта
return {"Answer" : True}
# except:
# return {"Answer" : False}
# Удалить объявления из базы
@app.delete("/api/announcement") #адрес объявления
def delete_from_db(annoncement: schemas.DelAnnouncement): # функция удаления объекта из БД
try:
to_delete = database.query(models.Announcement).filter(models.Announcement.id==announcement.id).first()
database.delete(to_delete) # удаление из БД
database.commit() # сохраняем изменения
return {"Answer" : True}
except:
return {"Answer" : False}
# Забронировать объявление
@app.post("/api/book")
def change_book_status(data: schemas.Book):
try:
# Находим объявление по данному id
announcement_to_change = database.query(models.Announcement).filter(Announcement.id == data.id).first()
# Изменяем поле booked_status на полученный id
announcement_to_change.booked_status += 1
return {"Success": True}
except:
return {"Success": False}
# reginstration
# {"id":1, "email":"poopka@mail.ru", "password":"good", "name":"Vasya", "surname":"Poopkin"}
@app.post("/api/signup")
def create_user(data = Body()):
if database.query(models.User).filter(models.User.email == data["email"]).first() == None:
new_user = models.User(email=data["email"], hashed_password=get_password_hash(data["password"]),
name=data["name"], surname=data["surname"])
database.add(new_user)
database.commit()
database.refresh(new_user) # обновляем состояние объекта
return {"Success": True}
return {"Success": False, "Message": "Пользователь с таким email уже зарегестрирован"}
@app.post("/api/token", response_model=schemas.Token)
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
):
user = utils.authenticate_user(database, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = utils.timedelta(minutes=utils.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = utils.create_access_token(
data={"user_id": user.id}, expires_delta=access_token_expires
)
return {"access_token":access_token}
@app.get("/api/users/me/", response_model=schemas.User) #
async def read_users_me(current_user: Annotated[schemas.User, Depends(utils.get_current_active_user)]):
return current_user
@app.get("/api/users/me/items/")
async def read_own_items(
current_user: Annotated[schemas.User, Depends(utils.get_current_active_user)]
):
return [{"Current user name": current_user.name, "Current user surname": current_user.surname}]
# начисляем баллы пользователю
@app.post("/api/user/rating")
def add_points(data: schemas.AddPoints, current_user: Annotated[schemas.User, Depends(utils.get_current_user)]):
if current_user.id != data.user_id:
user = utils.get_user(data.user_id)
if not user:
raise HTTPException(status_code=404, detail="Item not found")
user.num_of_ratings += 1
user.rating = (user.rating + data.rate)/user.num_of_ratings
return {"Success": True}
# получаем данные о баллах пользователя
@app.get("/api/user/rating")
def add_points(user_id: int):
user = utils.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="Item not found")
return {"rating": user.rating}
@app.get("/api/trashbox", response_model=List[schemas.TrashboxResponse])
def get_trashboxes(lat:float, lng:float):#крутая функция для работы с api
BASE_URL='https://geointelect2.gate.petersburg.ru'#адрес сайта и мой токин
my_token='eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJhU1RaZm42bHpTdURYcUttRkg1SzN5UDFhT0FxUkhTNm9OendMUExaTXhFIn0.eyJleHAiOjE3ODM3ODk4NjgsImlhdCI6MTY4OTA5NTQ2OCwianRpIjoiNDUzNjQzZTgtYTkyMi00NTI4LWIzYmMtYWJiYTNmYjkyNTkxIiwiaXNzIjoiaHR0cHM6Ly9rYy5wZXRlcnNidXJnLnJ1L3JlYWxtcy9lZ3MtYXBpIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6ImJjYjQ2NzljLTU3ZGItNDU5ZC1iNWUxLWRlOGI4Yzg5MTMwMyIsInR5cCI6IkJlYXJlciIsImF6cCI6ImFkbWluLXJlc3QtY2xpZW50Iiwic2Vzc2lvbl9zdGF0ZSI6ImM2ZDJiOTZhLWMxNjMtNDAxZS05ZjMzLTI0MmE0NDcxMDY5OCIsImFjciI6IjEiLCJhbGxvd2VkLW9yaWdpbnMiOlsiLyoiXSwicmVhbG1fYWNjZXNzIjp7InJvbGVzIjpbImRlZmF1bHQtcm9sZXMtZWdzLWFwaSIsIm9mZmxpbmVfYWNjZXNzIiwidW1hX2F1dGhvcml6YXRpb24iXX0sInJlc291cmNlX2FjY2VzcyI6eyJhY2NvdW50Ijp7InJvbGVzIjpbIm1hbmFnZS1hY2NvdW50IiwibWFuYWdlLWFjY291bnQtbGlua3MiLCJ2aWV3LXByb2ZpbGUiXX19LCJzY29wZSI6ImVtYWlsIHByb2ZpbGUiLCJzaWQiOiJjNmQyYjk2YS1jMTYzLTQwMWUtOWYzMy0yNDJhNDQ3MTA2OTgiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsIm5hbWUiOiLQktC70LDQtNC40LzQuNGAINCv0LrQvtCy0LvQtdCyIiwicHJlZmVycmVkX3VzZXJuYW1lIjoiZTBmYzc2OGRhOTA4MjNiODgwZGQzOGVhMDJjMmQ5NTciLCJnaXZlbl9uYW1lIjoi0JLQu9Cw0LTQuNC80LjRgCIsImZhbWlseV9uYW1lIjoi0K_QutC-0LLQu9C10LIifQ.E2bW0B-c6W5Lj63eP_G8eI453NlDMnW05l11TZT0GSsAtGayXGaolHtWrmI90D5Yxz7v9FGkkCmcUZYy1ywAdO9dDt_XrtFEJWFpG-3csavuMjXmqfQQ9SmPwDw-3toO64NuZVv6qVqoUlPPj57sLx4bLtVbB4pdqgyJYcrDHg7sgwz4d1Z3tAeUfSpum9s5ZfELequfpLoZMXn6CaYZhePaoK-CxeU3KPBPTPOVPKZZ19s7QY10VdkxLULknqf9opdvLs4j8NMimtwoIiHNBFlgQz10Cr7bhDKWugfvSRsICouniIiBJo76wrj5T92s-ztf1FShJuqnQcKE_QLd2A'
head = {'Authorization': 'Bearer {}'.format(my_token)}
my_data={
'x' : f"{lng}",
'y' : f"{lat}",
'limit' : '1'
}
response = requests.post(f"{BASE_URL}/nearest_recycling/get", headers=head, data=my_data)
infos = response.json()
trashboxes = []
for trashbox in infos["results"]:
temp_dict = {}
for obj in trashbox["Objects"]:
coord_list = obj["geometry"]
temp_dict["Lat"] = coord_list["coordinates"][1]
temp_dict["Lng"] = coord_list["coordinates"][0]
properties = obj["properties"]
temp_dict["Name"] = properties["title"]
temp_dict["Address"] = properties["address"]
temp_dict["Categories"] = properties["content_text"].split(',')
trashboxes.append(temp_dict)
uniq_trashboxes = [ast.literal_eval(el1) for el1 in set([str(el2) for el2 in trashboxes])]
return JSONResponse(uniq_trashboxes)
@app.get("/{rest_of_path:path}")
async def react_app(req: Request, rest_of_path: str):
return templates.TemplateResponse('index.html', { 'request': req })
@app.post("api/announcement/dispose")
def dispose(despose_data: schemas.DisposeData, current_user: Annotated[schemas.User, Depends(utils.get_current_user)]):
current_user.rating += 60