Добавить в остальные функции проверку токена

This commit is contained in:
DmitryGantimurov 2023-07-16 16:10:37 +03:00
parent 8fc85e415f
commit 8bae63231b
3 changed files with 12 additions and 54 deletions

View File

@ -24,7 +24,6 @@ from .db import engine, SessionLocal
from . import schema
Base.metadata.create_all(bind=engine)
db = SessionLocal()
@ -35,7 +34,8 @@ app = FastAPI()
templates = Jinja2Templates(directory="./front/dist")
app.mount("/static", StaticFiles(directory = "./front/dist"))
app.mount("/uploads", StaticFiles(directory = "./uploads"))
# app.mount("/uploads", StaticFiles(directory = "./uploads"))
@app.get("/api/announcements")#адрес объявлений
def annoncements_list(user_id: int = None, metro: str = None, category: str = None, booked_by: int = -1):
@ -86,7 +86,7 @@ def single_annoncement(user_id:int):
# Занести объявление в базу
@app.put("/api/announcement")#адрес объявлений
def put_in_db(name: Annotated[str, Form()], category: Annotated[str, Form()], bestBy: Annotated[int, Form()], address: Annotated[str, Form()], longtitude: Annotated[float, Form()], latitude: Annotated[float, Form()], description: Annotated[str, Form()], src: Annotated[UploadFile | None, File()], metro: Annotated[str, Form()], trashId: Annotated[int | None, Form()] = -1):
def put_in_db(name: Annotated[str, Form()], category: Annotated[str, Form()], bestBy: Annotated[int, Form()], address: Annotated[str, Form()], longtitude: Annotated[float, Form()], latitude: Annotated[float, Form()], description: Annotated[str, Form()], src: UploadFile, metro: Annotated[str, Form()], trashId: Annotated[int, Form()] = None):
# try:
userId = 1 # temporary
@ -139,7 +139,7 @@ def change_book_status(data: schema.Book):
@app.post("/api/signup")
def create_user(data = Body()):
if db.query(UserDatabase).filter(User.email == data["email"]).first() == None:
if db.query(UserDatabase).filter(UserDatabase.email == data["email"]).first() == None:
new_user = UserDatabase(id=data["id"], email=data["email"], password=data["password"], name=data["name"], surname=data["surname"])
db.add(new_user)
db.commit()
@ -152,6 +152,7 @@ def create_user(data = Body()):
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
):
# разобраться с первым параметром
user = authenticate_user(db.query(UserDatabase).all(), form_data.username, form_data.password)
if not user:
raise HTTPException(
@ -163,7 +164,7 @@ async def login_for_access_token(
access_token = create_access_token(
data={"user_id": user.id}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
return {"access_token": access_token}
@app.get("/api/users/me/", response_model=User)

View File

@ -1,7 +1,7 @@
from sqlalchemy import Column, Integer, String
from .db import Base
# from db import Base
class UserDatabase(Base):#класс пользователя
__tablename__ = "users"

View File

@ -1,47 +1,3 @@
# from passlib.context import CryptContext
# import os
# from datetime import datetime, timedelta
# from typing import Union, Any
# from jose import jwt
# ACCESS_TOKEN_EXPIRE_MINUTES = 30 # 30 minutes
# REFRESH_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days
# ALGORITHM = "HS256"
# # В предположении, что попыток взлома не будет, возьмем простейший ключ
# JWT_SECRET_KEY = "secret key" # может также быть os.environ["JWT_SECRET_KEY"]
# JWT_REFRESH_SECRET_KEY = "refresh secret key" # может также быть os.environ["JWT_REFRESH_SECRET_KEY"]
# def get_hashed_password(password: str) -> str:
# return password_context.hash(password)
# def verify_password(password: str, hashed_pass: str) -> bool:
# return password_context.verify(password, hashed_pass)
# def create_access_token(subject: Union[str, Any], expires_delta: int = None) -> str:
# if expires_delta is not None:
# expires_delta = datetime.utcnow() + expires_delta
# else:
# expires_delta = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# to_encode = {"exp": expires_delta, "sub": str(subject)}
# encoded_jwt = jwt.encode(to_encode, JWT_SECRET_KEY, ALGORITHM)
# return encoded_jwt
# def create_refresh_token(subject: Union[str, Any], expires_delta: int = None) -> str:
# if expires_delta is not None:
# expires_delta = datetime.utcnow() + expires_delta
# else:
# expires_delta = datetime.utcnow() + timedelta(minutes=REFRESH_TOKEN_EXPIRE_MINUTES)
# to_encode = {"exp": expires_delta, "sub": str(subject)}
# encoded_jwt = jwt.encode(to_encode, JWT_REFRESH_SECRET_KEY, ALGORITHM)
# return encoded_jwt
from datetime import datetime, timedelta
from typing import Annotated, Union
@ -79,7 +35,7 @@ class TokenData(BaseModel):
class User(BaseModel):
email: str
# email: str
email: Union[str, None] = None
# password: str
# password: Union[str, None] = None
@ -103,16 +59,17 @@ def get_password_hash(password):
return pwd_context.hash(password)
# проблема здесь
def get_user(db, email: str):
user = None
for person_with_correct_email in db:
for person_with_correct_email in db.query(UserDatabase):
if person_with_correct_email.email == email:
user = person_with_correct_email
break
return user #UserInDB(user_email)
def authenticate_user(db, email: str, password: str):
user = get_user(db, email)
if not user: