@ -1,3 +1,224 @@
<< << << < HEAD
#подключение библиотек
from fastapi import FastAPI , Response , Path , Depends , Body , Form , Query , status , HTTPException , APIRouter , UploadFile , File
from fastapi . responses import HTMLResponse , FileResponse , JSONResponse , RedirectResponse
from fastapi . staticfiles import StaticFiles
from fastapi . security import OAuth2PasswordRequestForm , OAuth2PasswordBearer
from fastapi . templating import Jinja2Templates
from fastapi . requests import Request
from pydantic import json
from starlette . staticfiles import StaticFiles
import requests
from uuid import uuid4
import ast
import pathlib
import shutil
import os
from . utils import *
from . models import Announcement , Trashbox , UserDatabase , Base
from . db import engine , SessionLocal
from . import schema
Base . metadata . create_all ( bind = engine )
db = SessionLocal ( )
app = FastAPI ( )
templates = Jinja2Templates ( directory = " ./front/dist " )
app . mount ( " /static " , StaticFiles ( directory = " ./front/dist " ) )
app . mount ( " /uploads " , StaticFiles ( directory = " ./uploads " ) )
@app.get ( " /api/announcements " ) #адрес объявлений
def annoncements_list ( user_id : int = None , metro : str = None , category : str = None , booked_by : int = - 1 ) :
# Считываем данные из Body и отображаем их на странице.
# В последствии будем вставлять данные в html-форму
a = db . query ( Announcement )
b = db . query ( Announcement )
c = db . query ( Announcement )
d = db . query ( Announcement )
e = db . query ( Announcement )
if user_id != None :
b = a . filter ( Announcement . user_id == user_id )
if metro != None :
c = a . filter ( Announcement . metro == metro )
if category != None :
d = a . filter ( Announcement . category == category )
if booked_by != - 1 :
e = a . filter ( Announcement . booked_by == booked_by )
if not any ( [ category , user_id , metro ] ) and booked_by == - 1 :
result = a . all ( )
else :
result = b . intersect ( c , d , e ) . all ( )
return { " Success " : True , " list_of_announcements " : result }
@app.get ( " /api/announcement " ) #адрес объявлений
def single_annoncement ( user_id : int ) :
# Считываем данные из Body и отображаем их на странице.
# В последствии будем вставлять данные в html-форму
try :
annoncement = db . get ( Announcement , user_id )
return { " id " : annoncement . id , " user_id " : annoncement . user_id , " name " : annoncement . name ,
" category " : annoncement . category , " best_by " : annoncement . best_by , " address " : annoncement . address ,
" description " : annoncement . description , " metro " : annoncement . metro , " latitude " : annoncement . latitude ,
" longtitude " : annoncement . longtitude , " trashId " : annoncement . trashId , " src " : annoncement . src ,
" booked_by " : annoncement . booked_by }
except :
return { " Answer " : False } #если неуданый доступ, то сообщаем о б этом
# Занести объявление в базу
@app.put ( " /api/announcement " ) #адрес объявлений
def put_in_db ( name : Annotated [ str , Form ( ) ] , category : Annotated [ str , Form ( ) ] , bestBy : Annotated [ int , Form ( ) ] , address : Annotated [ str , Form ( ) ] , longtitude : Annotated [ float , Form ( ) ] , latitude : Annotated [ float , Form ( ) ] , description : Annotated [ str , Form ( ) ] , src : UploadFile , metro : Annotated [ str , Form ( ) ] , trashId : Annotated [ int , Form ( ) ] = None ) :
# try:
userId = 1 # temporary
uploaded_name = " "
f = src . file
f . seek ( 0 , os . SEEK_END )
if f . tell ( ) > 0 :
f . seek ( 0 )
destination = pathlib . Path ( " ./uploads/ " + str ( hash ( src . file ) ) + pathlib . Path ( src . filename ) . suffix . lower ( ) )
with destination . open ( ' wb ' ) as buffer :
shutil . copyfileobj ( src . file , buffer )
uploaded_name = " /uploads/ " + destination . name
temp_ancmt = Announcement ( user_id = userId , name = name , category = category , best_by = bestBy , address = address , longtitude = longtitude , latitude = latitude , description = description , src = uploaded_name , metro = metro , trashId = trashId )
db . add ( temp_ancmt ) # добавляем в бд
db . commit ( ) # сохраняем изменения
db . refresh ( temp_ancmt ) # обновляем состояние объекта
return { " Answer " : True }
# except:
# return {"Answer" : False}
# Удалить объявления из базы
@app.delete ( " /api/announcement " ) #адрес объявления
def delete_from_db ( data = Body ( ) ) : #функция удаления объекта из БД
try :
db . delete ( user_id = data . user_id ) #удаление из БД
db . commit ( ) # сохраняем изменения
return { " Answer " : True }
except :
return { " Answer " : False }
# Забронировать объявление
@app.post ( " /api/book " )
def change_book_status ( data : schema . Book ) :
try :
# Получаем id пользователя, который бронирует объявление
temp_user_id = 1
# Находим объявление по данному id
announcement_to_change = db . query ( Announcement ) . filter ( id == data . id ) . first ( )
# Изменяем поле booked_status на полученный id
announcement_to_change . booked_status = temp_user_id
return { " Success " : True }
except :
return { " Success " : False }
@app.post ( " /api/signup " )
def create_user ( data = Body ( ) ) :
if db . query ( UserDatabase ) . filter ( UserDatabase . email == data [ " email " ] ) . first ( ) == None :
new_user = UserDatabase ( id = data [ " id " ] , email = data [ " email " ] , password = data [ " password " ] , name = data [ " name " ] , surname = data [ " surname " ] )
db . add ( new_user )
db . commit ( )
db . refresh ( new_user ) # обновляем состояние объекта
return { " Success " : True }
return { " Success " : False , " Message " : " Пользователь с таким email уже зарегестрирован. " }
@app.post ( " /api/token " , response_model = Token )
async def login_for_access_token (
form_data : Annotated [ OAuth2PasswordRequestForm , Depends ( ) ]
) :
# разобраться с первым параметром
user = authenticate_user ( db . query ( UserDatabase ) . all ( ) , form_data . username , form_data . password )
if not user :
raise HTTPException (
status_code = status . HTTP_401_UNAUTHORIZED ,
detail = " Incorrect username or password " ,
headers = { " WWW-Authenticate " : " Bearer " } ,
)
access_token_expires = timedelta ( minutes = ACCESS_TOKEN_EXPIRE_MINUTES )
access_token = create_access_token (
data = { " user_id " : user . id } , expires_delta = access_token_expires
)
return { " access_token " : access_token }
@app.get ( " /api/users/me/ " , response_model = User )
async def read_users_me (
current_user : Annotated [ User , Depends ( get_current_active_user ) ]
) :
return current_user
@app.get ( " /api/users/me/items/ " )
async def read_own_items (
current_user : Annotated [ User , Depends ( get_current_active_user ) ]
) :
return [ { " Current user name " : current_user . name , " Current user surname " : current_user . surname } ]
@app.get ( " /api/trashbox " )
def get_trashboxes ( lat : float , lng : float ) : #крутая функция для работы с api
BASE_URL = ' https://geointelect2.gate.petersburg.ru ' #адрес сайта и мой токин
my_token = ' eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJhU1RaZm42bHpTdURYcUttRkg1SzN5UDFhT0FxUkhTNm9OendMUExaTXhFIn0.eyJleHAiOjE3Nzg2NTk4MjEsImlhdCI6MTY4Mzk2NTQyMSwianRpIjoiOTI2ZGMyNmEtMGYyZi00OTZiLWI0NTUtMWQyYWM5YmRlMTZkIiwiaXNzIjoiaHR0cHM6Ly9rYy5wZXRlcnNidXJnLnJ1L3JlYWxtcy9lZ3MtYXBpIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6ImJjYjQ2NzljLTU3ZGItNDU5ZC1iNWUxLWRlOGI4Yzg5MTMwMyIsInR5cCI6IkJlYXJlciIsImF6cCI6ImFkbWluLXJlc3QtY2xpZW50Iiwic2Vzc2lvbl9zdGF0ZSI6IjE2MGU1ZGVkLWFmMjMtNDkyNS05OTc1LTRhMzM0ZjVmNTkyOSIsImFjciI6IjEiLCJhbGxvd2VkLW9yaWdpbnMiOlsiLyoiXSwicmVhbG1fYWNjZXNzIjp7InJvbGVzIjpbImRlZmF1bHQtcm9sZXMtZWdzLWFwaSIsIm9mZmxpbmVfYWNjZXNzIiwidW1hX2F1dGhvcml6YXRpb24iXX0sInJlc291cmNlX2FjY2VzcyI6eyJhY2NvdW50Ijp7InJvbGVzIjpbIm1hbmFnZS1hY2NvdW50IiwibWFuYWdlLWFjY291bnQtbGlua3MiLCJ2aWV3LXByb2ZpbGUiXX19LCJzY29wZSI6ImVtYWlsIHByb2ZpbGUiLCJzaWQiOiIxNjBlNWRlZC1hZjIzLTQ5MjUtOTk3NS00YTMzNGY1ZjU5MjkiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsIm5hbWUiOiLQktC70LDQtNC40LzQuNGAINCv0LrQvtCy0LvQtdCyIiwicHJlZmVycmVkX3VzZXJuYW1lIjoiZTBmYzc2OGRhOTA4MjNiODgwZGQzOGVhMDJjMmQ5NTciLCJnaXZlbl9uYW1lIjoi0JLQu9Cw0LTQuNC80LjRgCIsImZhbWlseV9uYW1lIjoi0K_QutC-0LLQu9C10LIifQ.BRyUIyY-KKnZ9xqTNa9vIsfKF0UN2VoA9h4NN4y7IgBVLiiS-j43QbeE6qgjIQo0pV3J8jtCAIPvJbO-Ex-GNkw_flgMiGHhKEpsHPW3WK-YZ-XsZJzVQ_pOmLte-Kql4z97WJvolqiXT0nMo2dlX2BGvNs6JNbupvcuGwL4YYpekYAaFNYMQrxi8bSN-R7FIqxP-gzZDAuQSWRRSUqVBLvmgRhphTM-FAx1sX833oXL9tR7ze3eDR_obSV0y6cKVIr4eIlKxFd82qiMrN6A6CTUFDeFjeAGERqeBPnJVXU36MHu7Ut7eOVav9OUARARWRkrZRkqzTfZ1iqEBq5Tsg '
head = { ' Authorization ' : ' Bearer {} ' . format ( my_token ) }
my_data = {
' x ' : f " { lng } " ,
' y ' : f " { lat } " ,
' limit ' : ' 1 '
}
response = requests . post ( f " { BASE_URL } /nearest_recycling/get " , headers = head , data = my_data )
infos = response . json ( )
trashboxes = [ ]
for trashbox in infos [ " results " ] :
temp_dict = { }
for obj in trashbox [ " Objects " ] :
coord_list = obj [ " geometry " ]
temp_dict [ " Lat " ] = coord_list [ " coordinates " ] [ 1 ]
temp_dict [ " Lng " ] = coord_list [ " coordinates " ] [ 0 ]
properties = obj [ " properties " ]
temp_dict [ " Name " ] = properties [ " title " ]
temp_dict [ " Address " ] = properties [ " address " ]
temp_dict [ " Categories " ] = properties [ " content_text " ] . split ( ' , ' )
trashboxes . append ( temp_dict )
uniq_trashboxes = [ ast . literal_eval ( el1 ) for el1 in set ( [ str ( el2 ) for el2 in trashboxes ] ) ]
return JSONResponse ( uniq_trashboxes )
@app.get ( " / { rest_of_path:path} " )
async def react_app ( req : Request , rest_of_path : str ) :
return templates . TemplateResponse ( ' index.html ' , { ' request ' : req } )
== == == =
#подключение библиотек
from fastapi import FastAPI , Response , Path , Depends , Body , Form , Query , status , HTTPException , APIRouter , UploadFile , File
from fastapi . responses import HTMLResponse , FileResponse , JSONResponse , RedirectResponse
@ -216,3 +437,4 @@ def get_trashboxes(lat:float, lng:float):#крутая функция для р
@app.get ( " / { rest_of_path:path} " )
async def react_app ( req : Request , rest_of_path : str ) :
return templates . TemplateResponse ( ' index.html ' , { ' request ' : req } )
>> >> >> > 3668e8 c33f71b7a79a0c83d41a106d9b55e2df71