235 lines
11 KiB
Python
235 lines
11 KiB
Python
#подключение библиотек
|
||
from fastapi import FastAPI, Response, Path, Depends, Body, Form, Query, status, HTTPException, APIRouter, UploadFile, File
|
||
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, RedirectResponse
|
||
from fastapi.staticfiles import StaticFiles
|
||
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
|
||
from fastapi.templating import Jinja2Templates
|
||
from fastapi.requests import Request
|
||
|
||
from pydantic import json
|
||
|
||
from starlette.staticfiles import StaticFiles
|
||
|
||
import requests
|
||
from uuid import uuid4
|
||
|
||
import ast
|
||
import pathlib
|
||
import shutil
|
||
import os
|
||
|
||
from .utils import *
|
||
from .db import Base, engine, SessionLocal, database
|
||
from .models import Announcement, Trashbox, UserDatabase
|
||
|
||
from . import schema
|
||
|
||
Base.metadata.create_all(bind=engine)
|
||
|
||
app = FastAPI()
|
||
|
||
templates = Jinja2Templates(directory="./front/dist")
|
||
|
||
app.mount("/static", StaticFiles(directory = "./front/dist"))
|
||
if not os.path.exists("./uploads"):
|
||
os.mkdir("C:/Users/38812/porridger/uploads")
|
||
app.mount("/uploads", StaticFiles(directory = "./uploads"))
|
||
|
||
|
||
# Функция, создающая сессию БД при каждом запросе к нашему API.
|
||
# Срабатывает до запуска остальных функций.
|
||
# Всегда закрывает сессию при окончании работы с ней
|
||
# @app.middleware("http")
|
||
# async def db_session_middleware(request: Request, call_next):
|
||
# response = Response("Internal server error", status_code=500)
|
||
# try:
|
||
# request.state.db = SessionLocal()
|
||
# response = await call_next(request)
|
||
# finally:
|
||
# request.state.db.close()
|
||
# return response
|
||
|
||
|
||
@app.get("/api/announcements")#адрес объявлений
|
||
def annoncements_list(user_id: int = None, metro: str = None, category: str = None, booked_by: int = 0):
|
||
# Считываем данные из Body и отображаем их на странице.
|
||
# В последствии будем вставлять данные в html-форму
|
||
|
||
a = database.query(Announcement)
|
||
b = database.query(Announcement)
|
||
c = database.query(Announcement)
|
||
d = database.query(Announcement)
|
||
e = database.query(Announcement)
|
||
|
||
if user_id != None:
|
||
b = a.filter(Announcement.user_id == user_id)
|
||
|
||
if metro != None:
|
||
c = a.filter(Announcement.metro == metro)
|
||
|
||
if category != None:
|
||
d = a.filter(Announcement.category == category)
|
||
|
||
if booked_by != -1:
|
||
e = a.filter(Announcement.booked_by == booked_by)
|
||
|
||
if not any([category, user_id, metro]) and booked_by == -1:
|
||
result = a.all()
|
||
|
||
else:
|
||
result = b.intersect(c, d, e).all()
|
||
|
||
return {"Success" : True, "list_of_announcements": result}
|
||
|
||
|
||
@app.get("/api/announcement")#адрес объявлений
|
||
def single_annoncement(user_id:int):
|
||
# Считываем данные из Body и отображаем их на странице.
|
||
# В последствии будем вставлять данные в html-форму
|
||
try:
|
||
annoncement = database.get(Announcement, user_id)
|
||
return {"id": annoncement.id, "user_id": annoncement.user_id, "name": annoncement.name,
|
||
"category": annoncement.category, "best_by": annoncement.best_by, "address": annoncement.address,
|
||
"description": annoncement.description, "metro": annoncement.metro, "latitude": annoncement.latitude,
|
||
"longtitude":annoncement.longtitude, "trashId": annoncement.trashId, "src":annoncement.src,
|
||
"booked_by":annoncement.booked_by}
|
||
except:
|
||
return {"Answer" : False} #если неуданый доступ, то сообщаем об этом
|
||
|
||
|
||
# Занести объявление в базу
|
||
@app.put("/api/announcement")#адрес объявлений
|
||
def put_in_db(name: Annotated[str, Form()], category: Annotated[str, Form()], bestBy: Annotated[int, Form()], address: Annotated[str, Form()], longtitude: Annotated[float, Form()], latitude: Annotated[float, Form()], description: Annotated[str, Form()], src: UploadFile, metro: Annotated[str, Form()], trashId: Annotated[int, Form()] = None):
|
||
# try:
|
||
userId = 1 # temporary
|
||
|
||
uploaded_name = ""
|
||
|
||
f = src.file
|
||
f.seek(0, os.SEEK_END)
|
||
if f.tell() > 0:
|
||
f.seek(0)
|
||
destination = pathlib.Path("./uploads/" + str(hash(src.file)) + pathlib.Path(src.filename).suffix.lower())
|
||
with destination.open('wb') as buffer:
|
||
shutil.copyfileobj(src.file, buffer)
|
||
|
||
uploaded_name = "/uploads/"+destination.name
|
||
|
||
temp_ancmt = Announcement(user_id=userId, name=name, category=category, best_by=bestBy, address=address, longtitude=longtitude, latitude=latitude, description=description, src=uploaded_name, metro=metro, trashId=trashId, booked_by=-1)
|
||
db.add(temp_ancmt) # добавляем в бд
|
||
db.commit() # сохраняем изменения
|
||
db.refresh(temp_ancmt) # обновляем состояние объекта
|
||
return {"Answer" : True}
|
||
# except:
|
||
# return {"Answer" : False}
|
||
|
||
|
||
# Удалить объявления из базы
|
||
@app.delete("/api/announcement") #адрес объявления
|
||
def delete_from_db(data = Body()):#функция удаления объекта из БД
|
||
try:
|
||
database.delete(user_id=data.user_id)#удаление из БД
|
||
database.commit() # сохраняем изменения
|
||
return {"Answer" : True}
|
||
except:
|
||
return {"Answer" : False}
|
||
|
||
|
||
# Забронировать объявление
|
||
@app.post("/api/book")
|
||
def change_book_status(data: schema.Book):
|
||
try:
|
||
# Получаем id пользователя, который бронирует объявление
|
||
temp_user_id = 1
|
||
# Находим объявление по данному id
|
||
announcement_to_change = database.query(Announcement).filter(id == data.id).first()
|
||
# Изменяем поле booked_status на полученный id
|
||
announcement_to_change.booked_status = temp_user_id
|
||
return {"Success": True}
|
||
except:
|
||
return {"Success": False}
|
||
|
||
# reginstration
|
||
# {"id":1, "email":"poopka@mail.ru", "password":"good", "name":"Vasya", "surname":"Poopkin"}
|
||
@app.post("/api/signup")
|
||
def create_user(data = Body()):
|
||
if database.query(UserDatabase).filter(UserDatabase.email == data["email"]).first() == None:
|
||
new_user = UserDatabase(id=data["id"], email=data["email"], password=data["password"],
|
||
hashed_password=get_password_hash(data["password"]), name=data["name"], surname=data["surname"])
|
||
database.add(new_user)
|
||
database.commit()
|
||
database.refresh(new_user) # обновляем состояние объекта
|
||
return {"Success": True}
|
||
return {"Success": False, "Message": "Пользователь с таким email уже зарегестрирован."}
|
||
|
||
|
||
@app.post("/api/token", response_model=Token)
|
||
async def login_for_access_token(
|
||
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
|
||
):
|
||
# разобраться с первым параметром
|
||
user = authenticate_user(database, form_data.username, form_data.password)
|
||
if not user:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="Incorrect username or password",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
||
access_token = create_access_token(
|
||
data={"user_id": user.id}, expires_delta=access_token_expires
|
||
)
|
||
return {"access_token":access_token}
|
||
|
||
|
||
# @app.get("/api/users/me/", response_model=schema.User)
|
||
# async def read_users_me( #!!!!!!!!!!!
|
||
# current_user: Annotated[schema.User, Depends(get_current_active_user)]
|
||
# ):
|
||
# return {"data": current_user}
|
||
|
||
|
||
# @app.get("/api/users/me/items/")
|
||
# async def read_own_items(
|
||
# current_user: Annotated[schema.User, Depends(get_current_active_user)]
|
||
# ):
|
||
# return [{"Current user name": current_user.name, "Current user surname": current_user.surname}]
|
||
|
||
|
||
|
||
@app.get("/api/trashbox")
|
||
def get_trashboxes(lat:float, lng:float):#крутая функция для работы с api
|
||
BASE_URL='https://geointelect2.gate.petersburg.ru'#адрес сайта и мой токин
|
||
my_token='eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJhU1RaZm42bHpTdURYcUttRkg1SzN5UDFhT0FxUkhTNm9OendMUExaTXhFIn0.eyJleHAiOjE3ODM3ODk4NjgsImlhdCI6MTY4OTA5NTQ2OCwianRpIjoiNDUzNjQzZTgtYTkyMi00NTI4LWIzYmMtYWJiYTNmYjkyNTkxIiwiaXNzIjoiaHR0cHM6Ly9rYy5wZXRlcnNidXJnLnJ1L3JlYWxtcy9lZ3MtYXBpIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6ImJjYjQ2NzljLTU3ZGItNDU5ZC1iNWUxLWRlOGI4Yzg5MTMwMyIsInR5cCI6IkJlYXJlciIsImF6cCI6ImFkbWluLXJlc3QtY2xpZW50Iiwic2Vzc2lvbl9zdGF0ZSI6ImM2ZDJiOTZhLWMxNjMtNDAxZS05ZjMzLTI0MmE0NDcxMDY5OCIsImFjciI6IjEiLCJhbGxvd2VkLW9yaWdpbnMiOlsiLyoiXSwicmVhbG1fYWNjZXNzIjp7InJvbGVzIjpbImRlZmF1bHQtcm9sZXMtZWdzLWFwaSIsIm9mZmxpbmVfYWNjZXNzIiwidW1hX2F1dGhvcml6YXRpb24iXX0sInJlc291cmNlX2FjY2VzcyI6eyJhY2NvdW50Ijp7InJvbGVzIjpbIm1hbmFnZS1hY2NvdW50IiwibWFuYWdlLWFjY291bnQtbGlua3MiLCJ2aWV3LXByb2ZpbGUiXX19LCJzY29wZSI6ImVtYWlsIHByb2ZpbGUiLCJzaWQiOiJjNmQyYjk2YS1jMTYzLTQwMWUtOWYzMy0yNDJhNDQ3MTA2OTgiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsIm5hbWUiOiLQktC70LDQtNC40LzQuNGAINCv0LrQvtCy0LvQtdCyIiwicHJlZmVycmVkX3VzZXJuYW1lIjoiZTBmYzc2OGRhOTA4MjNiODgwZGQzOGVhMDJjMmQ5NTciLCJnaXZlbl9uYW1lIjoi0JLQu9Cw0LTQuNC80LjRgCIsImZhbWlseV9uYW1lIjoi0K_QutC-0LLQu9C10LIifQ.E2bW0B-c6W5Lj63eP_G8eI453NlDMnW05l11TZT0GSsAtGayXGaolHtWrmI90D5Yxz7v9FGkkCmcUZYy1ywAdO9dDt_XrtFEJWFpG-3csavuMjXmqfQQ9SmPwDw-3toO64NuZVv6qVqoUlPPj57sLx4bLtVbB4pdqgyJYcrDHg7sgwz4d1Z3tAeUfSpum9s5ZfELequfpLoZMXn6CaYZhePaoK-CxeU3KPBPTPOVPKZZ19s7QY10VdkxLULknqf9opdvLs4j8NMimtwoIiHNBFlgQz10Cr7bhDKWugfvSRsICouniIiBJo76wrj5T92s-ztf1FShJuqnQcKE_QLd2A'
|
||
head = {'Authorization': 'Bearer {}'.format(my_token)}
|
||
|
||
my_data={
|
||
'x' : f"{lng}",
|
||
'y' : f"{lat}",
|
||
'limit' : '1'
|
||
}
|
||
|
||
response = requests.post(f"{BASE_URL}/nearest_recycling/get", headers=head, data=my_data)
|
||
infos = response.json()
|
||
|
||
trashboxes = []
|
||
for trashbox in infos["results"]:
|
||
temp_dict = {}
|
||
for obj in trashbox["Objects"]:
|
||
coord_list = obj["geometry"]
|
||
temp_dict["Lat"] = coord_list["coordinates"][1]
|
||
temp_dict["Lng"] = coord_list["coordinates"][0]
|
||
|
||
properties = obj["properties"]
|
||
temp_dict["Name"] = properties["title"]
|
||
temp_dict["Address"] = properties["address"]
|
||
temp_dict["Categories"] = properties["content_text"].split(',')
|
||
trashboxes.append(temp_dict)
|
||
|
||
uniq_trashboxes = [ast.literal_eval(el1) for el1 in set([str(el2) for el2 in trashboxes])]
|
||
return JSONResponse(uniq_trashboxes)
|
||
|
||
@app.get("/{rest_of_path:path}")
|
||
async def react_app(req: Request, rest_of_path: str):
|
||
return templates.TemplateResponse('index.html', { 'request': req })
|