Header Ads

  • Breaking News

    Cho phép người dùng đăng ký

    Cho phép người dùng đăng ký

    Trước tiên, hãy cho phép người dùng đăng ký, vì tất cả các dịch vụ của chúng tôi cần được người dùng đã xác thực truy cập. Chúng tôi viết trình xử lý yêu cầu đầu tiên của chúng tôi bằng cách sử dụng lược đồ UserCreatevà UserBaseđược định nghĩa ở trên.
    @app.post("/api/users", response_model=schemas.User)
    def signup(user_data: schemas.UserCreate, db: Session = Depends(get_db)):
       """add new user"""
       user = crud.get_user_by_email(db, user_data.email)
       if user:
        raise HTTPException(status_code=409,
                            detail="Email already registered.")
       signedup_user = crud.create_user(db, user_data)
       return signedup_user
    
    Có rất nhiều điều xảy ra trong đoạn mã ngắn này. Chúng tôi đã sử dụng trình trang trí để chỉ định động từ HTTP, URI và lược đồ của các phản hồi thành công. Để đảm bảo rằng người dùng đã gửi đúng dữ liệu, chúng tôi đã nhập gợi ý vào phần thân yêu cầu bằng một UserCreatelược đồ được xác định trước đó Phương thức xác định một tham số khác để nhận một xử lý trên cơ sở dữ liệu — đây là hoạt động chèn phụ thuộc và sẽ được thảo luận sau trong hướng dẫn này.

    Bảo mật API của chúng tôi

    Chúng tôi muốn có các tính năng bảo mật sau trong ứng dụng của mình:
    • Băm mật khẩu
    • Xác thực dựa trên JWT
    Để băm mật khẩu, chúng ta có thể sử dụng Passlib. Hãy xác định các hàm xử lý băm mật khẩu và kiểm tra xem mật khẩu có đúng không.
    from passlib.context import CryptContext
     
    pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
     
    def verify_password(plain_password, hashed_password):
       return pwd_context.verify(plain_password, hashed_password)
     
     
    def get_password_hash(password):
       return pwd_context.hash(password)
     
    def authenticate_user(db, email: str, password: str):
       user = crud.get_user_by_email(db, email)
       if not user:
        return False
       if not verify_password(password, user.hashed_password):
        return False
       return user
    
    Để kích hoạt xác thực dựa trên JWT, chúng tôi cần tạo JWT cũng như giải mã chúng để lấy thông tin đăng nhập của người dùng. Chúng tôi xác định các chức năng sau để cung cấp chức năng này.
    # install PyJWT
    import jwt
    from fastapi.security import OAuth2PasswordBearer
     
    SECRET_KEY = os.environ['SECRET_KEY']
    ALGORITHM = os.environ['ALGORITHM']
     
    def create_access_token(*, data: dict, expires_delta: timedelta = None):
       to_encode = data.copy()
       if expires_delta:
        expire = datetime.utcnow() + expires_delta
       else:
        expire = datetime.utcnow() + timedelta(minutes=15)
       to_encode.update({"exp": expire})
       encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
       return encoded_jwt
    def decode_access_token(db, token):
       credentials_exception = HTTPException(
        status_code=HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
       )
       try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        email: str = payload.get("sub")
        if email is None:
            raise credentials_exception
        token_data = schemas.TokenData(email=email)
       except PyJWTError:
        raise credentials_exception
       user = crud.get_user_by_email(db, email=token_data.email)
       if user is None:
        raise credentials_exception
       return user
    

    Phát hành mã thông báo khi đăng nhập thành công

    Bây giờ, chúng ta sẽ xác định điểm cuối Đăng nhập và triển khai luồng mật khẩu OAuth2. Điểm cuối này sẽ nhận được email và mật khẩu. Chúng tôi sẽ kiểm tra thông tin xác thực dựa trên cơ sở dữ liệu và khi thành công, hãy cấp mã thông báo web JSON cho người dùng.
    Để nhận được thông tin xác thực, chúng tôi sẽ sử dụng OAuth2PasswordRequestForm, đây là một phần của các tiện ích bảo mật của FastAPI.
    @app.post("/api/token", response_model=schemas.Token)
    def login_for_access_token(db: Session = Depends(get_db),
                           form_data: OAuth2PasswordRequestForm = Depends()):
       """generate access token for valid credentials"""
       user = authenticate_user(db, form_data.username, form_data.password)
       if not user:
        raise HTTPException(
            status_code=HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
       access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
       access_token = create_access_token(data={"sub": user.email},
                                       expires_delta=access_token_expires)
       return {"access_token": access_token, "token_type": "bearer"}
    

    Không có nhận xét nào

    Post Top Ad

    ad728

    Post Bottom Ad

    ad728