[FastAPI] <5: Security - 4. password(and hashing)가 포함된 OAuth2, Bearer가 포함된 JWT tokens >
0. 이전 글
앞에서 작성했던 모든 개념을 활용해 JWT 토큰과 비밀번호 해싱을 사용한 안전한 어플리케이션을 만들어보자.
1. Hash 및 비밀번호 확인
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated = "auto")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
verify_password
함수는 검증할 일반 텍스트 비밀번호plain_password
와 이미 해싱되어있는hashed_password
두 가지 매개변수를 받는다.두 비밀번호가
pwd_context
에 정의된 규칙에 따라 동일하다면True
, 아니라면False
를 반환한다.get_password_hash
는 입력받은 일반 password를 안전하게 해싱하는 역할을 한다.authenticate_user
는 사용자 인증을 수행한다.유저가 올바른 경우엔 user를 반환하고, 올바르지 않은 경우엔 False를 반환한다.
2. JWT 토큰 처리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from jose import JWTError, jwt
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class Token(BaseModel):
access_token: str
token_type: str
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
create_access_token
은 주어진 매개변수 및 상수(SECRET_KEY
,ALGORITHM
, …)으로 클라이언트에게 부여할 엑세스 토큰을 생성한다.- 원본 데이터를 변형하지 않기 위해
to_encode
에 data를 복사해 할당한다. expires_delta
가 주어진다면 엑세스 토큰의 만료 시간을 주어진 시간으로 설정한다.- 주어지지 않는다면 15분을 기본적으로 설정한다.
- payload의 만료시간(
exp
)을 설정해 암호화 한 후 서명하고 리턴한다.
3. 종속성 업데이트
이전에 작성한 get_current_user와 동일한 토큰을 받되, 이번엔 위에서 생성한 JWT 토큰을 사용해보자.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code = status.HTTP_401_UNAUTHORIZED,
detail = "Could not validate credentials",
headers = {"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
get_current_user
는 현재 이 사용자를 가져오는 비동기 함수이다. 사용자의 JWT 토큰을 매개변수로 받는다.try - except
문을 사용해 사용자의 JWT 토큰을 해독한다. 유저 네임이 없으면 인증 예외(401)가 발생한다. 그 외의JWTError
를 받으면 except로 처리해 인증 예외를 발생시킨다.- 검색한 사용자가 데이터베이스에 존재하지 않는다면 인증 예외를 발생시킨다.
- 모두 유효하다면 현재 유저를 반환한다.
4. /token
경로 작업 업데이트
상기 작성한 함수들을 사용해 실제 경로에서 JWT 엑세스 토큰을 생성하고 반환한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@app.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
/token
경로로 사용자 입력을 받는다.- 유저의 이름과 비밀번호를 확인해 존재하지 않는다면 401 예외를 발생시킨다.
- 유저가 존재한다면 엑세스 토큰의 만료 시간을 30분으로 설정한 후 엑세스 토큰을 반환한다.
5. 전체 코드
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from datetime import datetime, timedelta, timezone
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)],
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/users/me/", response_model=User)
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)],
):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: Annotated[User, Depends(get_current_active_user)],
):
return [{"item_id": "Foo", "owner": current_user.username}]