포스트

[FastAPI] <5: Security - 4. password(and hashing)가 포함된 OAuth2, Bearer가 포함된 JWT tokens >

0. 이전 글

보안 - 1

보안 - 2

보안 - 3

공식문서

JWT 토큰에 대한 설명

앞에서 작성했던 모든 개념을 활용해 JWT 토큰과 비밀번호 해싱을 사용한 안전한 어플리케이션을 만들어보자.

1. Hash 및 비밀번호 확인

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated = "auto")

def verify_password(plain_password, hashed_password):
		return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    return pwd_context.hash(password)

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

  • verify_password 함수는 검증할 일반 텍스트 비밀번호 plain_password 와 이미 해싱되어있는 hashed_password 두 가지 매개변수를 받는다.

    두 비밀번호가 pwd_context 에 정의된 규칙에 따라 동일하다면 True, 아니라면 False를 반환한다.

  • get_password_hash 는 입력받은 일반 password를 안전하게 해싱하는 역할을 한다.
  • authenticate_user 는 사용자 인증을 수행한다.

    유저가 올바른 경우엔 user를 반환하고, 올바르지 않은 경우엔 False를 반환한다.

2. JWT 토큰 처리

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from jose import JWTError, jwt

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

class Token(BaseModel):
		access_token: str
		token_type: str

def create_access_token(data: dict, expires_delta: timedelta | None = None):
		to_encode = data.copy()
		if expires_delta:
				expire = datetime.now(timezone.utc) + expires_delta
		else:
				expire = datetime.now(timezone.utc) + timedelta(minutes=15)
		to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt
  • create_access_token 은 주어진 매개변수 및 상수(SECRET_KEY, ALGORITHM, …)으로 클라이언트에게 부여할 엑세스 토큰을 생성한다.
  • 원본 데이터를 변형하지 않기 위해 to_encode 에 data를 복사해 할당한다.
  • expires_delta 가 주어진다면 엑세스 토큰의 만료 시간을 주어진 시간으로 설정한다.
  • 주어지지 않는다면 15분을 기본적으로 설정한다.
  • payload의 만료시간(exp)을 설정해 암호화 한 후 서명하고 리턴한다.

3. 종속성 업데이트

이전에 작성한 get_current_user와 동일한 토큰을 받되, 이번엔 위에서 생성한 JWT 토큰을 사용해보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
		credentials_exception = HTTPException(
				status_code = status.HTTP_401_UNAUTHORIZED,
				detail = "Could not validate credentials",
				headers = {"WWW-Authenticate": "Bearer"},
		)
		try:
				payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user
		
  • get_current_user 는 현재 이 사용자를 가져오는 비동기 함수이다. 사용자의 JWT 토큰을 매개변수로 받는다.
  • try - except 문을 사용해 사용자의 JWT 토큰을 해독한다. 유저 네임이 없으면 인증 예외(401)가 발생한다. 그 외의 JWTError 를 받으면 except로 처리해 인증 예외를 발생시킨다.
  • 검색한 사용자가 데이터베이스에 존재하지 않는다면 인증 예외를 발생시킨다.
  • 모두 유효하다면 현재 유저를 반환한다.

4. /token 경로 작업 업데이트

상기 작성한 함수들을 사용해 실제 경로에서 JWT 엑세스 토큰을 생성하고 반환한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@app.post("/token")
async def login_for_access_token(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")
  • /token 경로로 사용자 입력을 받는다.
  • 유저의 이름과 비밀번호를 확인해 존재하지 않는다면 401 예외를 발생시킨다.
  • 유저가 존재한다면 엑세스 토큰의 만료 시간을 30분으로 설정한 후 엑세스 토큰을 반환한다.

5. 전체 코드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from datetime import datetime, timedelta, timezone
from typing import Annotated

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: str | None = None

class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None

class UserInDB(User):
    hashed_password: str

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    return pwd_context.hash(password)

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)],
):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

@app.post("/token")
async def login_for_access_token(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")

@app.get("/users/me/", response_model=User)
async def read_users_me(
    current_user: Annotated[User, Depends(get_current_active_user)],
):
    return current_user

@app.get("/users/me/items/")
async def read_own_items(
    current_user: Annotated[User, Depends(get_current_active_user)],
):
    return [{"item_id": "Foo", "owner": current_user.username}]


이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.