[FastAPI] <5 : 보안 - 3. 패스워드와 Bearer를 이용한 간단한 OAuth2>
0. 이전 글
이전 글들에서 설명했던 코드들을 활용해 유저 로그인, 유저 조회 기능을 완성해보자.
1. username
과 password
얻기
FastAPI에서 제공하는 데이터 폼을 사용해 username
과 password
를 얻어보자.
FastAPI에선 OAuth2PasswordRequestFrom
을 활용해 얻어 올 수 있다.
1
2
3
4
5
6
7
8
9
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
app = FastAPI()
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
## 이하 생략...
OAuth2PasswordRequestFrom
은 이하를 제공한다.
username
password
scope
: 일반적으로 특정 보안 권한을 선언하는데 사용된다.grant_type
(선택사항)client_id
(선택사항)client_secret
(선택사항)
1.1 해당 유저가 존재하지 않는 경우
유저가 db에 존재하지 않는다면 HTTP 400(Bad Request)요청을 반환하도록 하자.
먼저 임시 유저 db를 만든다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
이 db에 존재하지 않는다면 HTTP 400 예외를 반환하자.
1
2
3
4
5
6
7
8
9
10
from fastapi import Depends, FastAPI, HTTPException, status
app = FastAPI()
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
## 이하 생략...
form_data.username
으로 유저 이름을 불러오고 해당 이름을 key로 가진 value를 user_dict
으로 선언한다.
user_dict
이 존재하지 않는다면 HTTPException
이 발생한다.
1.2 패스워드 확인하기.
유저가 입력한 패스워드도 확인해야 한다. 이때 사용자가 입력한 비밀번호를 그대로 db에 저장하게 된다면 유출 시 문제가 있을 수 있다.
그렇기 때문에 사전에 지정한 문자열을 유저가 입력한 비밀번호에 추가한 후 db에 저장한다.
그렇게 한다면 비밀번호가 유출되어도, 사전에 지정한 문자열이 알려지지 않는다면 로그인 할 수 없다.
1
2
def fake_hash_password(password: str):
return "fakehashed" + password
fake_hash_password
함수를 통과한다면 “fakehashed”라는 문자열이 비밀번호에 추가된다.
사용자 이름과 같은 dict을 불러온 후 비밀번호가 같은지 확인해보자.
1
2
3
4
5
6
7
8
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB(User):
hashed_password: str
먼저 User와 UserInDB를 정의하자. User를 추후에 비밀번호 없이 사용 가능하게 UserInDB를 따로 만든다.
1
2
3
4
5
6
7
8
9
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
이때 사용자가 입력한 후 hash한 hashed_password
와 db에서 가져온 user.hashed_password
가 같은지 비교한 후, 다르다면 예외를 발생시킨다.
2. 토큰 반환하기
/token
엔드포인트는 응답이 Json 객체여야 한다. 반환되는 Json객체는 다음을 포함한다.
token_type
: 이 예제에선bearer
이다.access_token
: 유저에게 발행하는 목적의 토큰. 특정 유저를 식별하고 제 3자에 의한 위조를 방지해야 한다.
1
2
3
4
5
6
7
8
9
10
11
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
3. 의존성 업데이트
이전에 작성했던 현재 접속한 유저를 확인하는 함수를 적용해보자.
또한 접속해 있더라도, 사용자가 옵션을 비활성화 한 경우에는 예외를 반환한다.
즉 사용자가 db에 존재하고, 인증되었으며, 활성 상태인 경우에만 사용자를 반환한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
먼저 /users/me
엔드포인트로 이동한다면 get_current_active_user
를 의존하고 있어 그 함수가 실행된다.
get_current_active_user
는 get_current_user
를 의존하고 있어, 의존하는 함수를 먼저 실행한다.
get_current_user
를 실행한다면 oauth2_scheme
를 통해 요청 헤더에서 토큰을 받아온다.
fake_decode_token
함수를 사용해 해당 유저가 db에 있는지 확인해본다.
1
2
3
4
5
6
7
8
9
10
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
해당 엑세스 토큰이 적절하지 않다면 HTTP 401 (UNAUTHORIZED
)를 띄워준다.
적절한 토큰이지만, 활성화된 유저가 아니라면 HTTP 400을 띄워준다.
4. 전체 코드
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
app = FastAPI()
def fake_hash_password(password: str):
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
http://127.0.0.1:8000/docs 로 진입한다면 대화형 문서를 통해 직접 확인해 볼 수 있다.