Cho phép người dùng đăng ký
Cho phép người dùng đăng ký
Trước tiên, hãy cho phép người dùng đăng ký, vì tất cả các dịch vụ của chúng tôi cần được người dùng đã xác thực truy cập. Chúng tôi viết trình xử lý yêu cầu đầu tiên của chúng tôi bằng cách sử dụng lược đồ
UserCreate
và UserBase
được định nghĩa ở trên.@app.post("/api/users", response_model=schemas.User)
def signup(user_data: schemas.UserCreate, db: Session = Depends(get_db)):
"""add new user"""
user = crud.get_user_by_email(db, user_data.email)
if user:
raise HTTPException(status_code=409,
detail="Email already registered.")
signedup_user = crud.create_user(db, user_data)
return signedup_user
Có rất nhiều điều xảy ra trong đoạn mã ngắn này. Chúng tôi đã sử dụng trình trang trí để chỉ định động từ HTTP, URI và lược đồ của các phản hồi thành công. Để đảm bảo rằng người dùng đã gửi đúng dữ liệu, chúng tôi đã nhập gợi ý vào phần thân yêu cầu bằng một
UserCreate
lược đồ được xác định trước đó . Phương thức xác định một tham số khác để nhận một xử lý trên cơ sở dữ liệu — đây là hoạt động chèn phụ thuộc và sẽ được thảo luận sau trong hướng dẫn này.Bảo mật API của chúng tôi
Chúng tôi muốn có các tính năng bảo mật sau trong ứng dụng của mình:
- Băm mật khẩu
- Xác thực dựa trên JWT
Để băm mật khẩu, chúng ta có thể sử dụng Passlib. Hãy xác định các hàm xử lý băm mật khẩu và kiểm tra xem mật khẩu có đúng không.
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def authenticate_user(db, email: str, password: str):
user = crud.get_user_by_email(db, email)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
Để kích hoạt xác thực dựa trên JWT, chúng tôi cần tạo JWT cũng như giải mã chúng để lấy thông tin đăng nhập của người dùng. Chúng tôi xác định các chức năng sau để cung cấp chức năng này.
# install PyJWT
import jwt
from fastapi.security import OAuth2PasswordBearer
SECRET_KEY = os.environ['SECRET_KEY']
ALGORITHM = os.environ['ALGORITHM']
def create_access_token(*, data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def decode_access_token(db, token):
credentials_exception = HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = schemas.TokenData(email=email)
except PyJWTError:
raise credentials_exception
user = crud.get_user_by_email(db, email=token_data.email)
if user is None:
raise credentials_exception
return user
Phát hành mã thông báo khi đăng nhập thành công
Bây giờ, chúng ta sẽ xác định điểm cuối Đăng nhập và triển khai luồng mật khẩu OAuth2. Điểm cuối này sẽ nhận được email và mật khẩu. Chúng tôi sẽ kiểm tra thông tin xác thực dựa trên cơ sở dữ liệu và khi thành công, hãy cấp mã thông báo web JSON cho người dùng.
Để nhận được thông tin xác thực, chúng tôi sẽ sử dụng
OAuth2PasswordRequestForm
, đây là một phần của các tiện ích bảo mật của FastAPI.@app.post("/api/token", response_model=schemas.Token)
def login_for_access_token(db: Session = Depends(get_db),
form_data: OAuth2PasswordRequestForm = Depends()):
"""generate access token for valid credentials"""
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(data={"sub": user.email},
expires_delta=access_token_expires)
return {"access_token": access_token, "token_type": "bearer"}
Không có nhận xét nào