포스트

[FastAPI] <5 : 보안 - 3. 패스워드와 Bearer를 이용한 간단한 OAuth2>

0. 이전 글

보안 - 1

보안 - 2

공식문서

이전 글들에서 설명했던 코드들을 활용해 유저 로그인, 유저 조회 기능을 완성해보자.

1. usernamepassword 얻기

FastAPI에서 제공하는 데이터 폼을 사용해 usernamepassword를 얻어보자.

FastAPI에선 OAuth2PasswordRequestFrom 을 활용해 얻어 올 수 있다.

1
2
3
4
5
6
7
8
9
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

app = FastAPI()

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    
    ## 이하 생략...

OAuth2PasswordRequestFrom 은 이하를 제공한다.

  • username
  • password
  • scope : 일반적으로 특정 보안 권한을 선언하는데 사용된다.
  • grant_type (선택사항)
  • client_id (선택사항)
  • client_secret (선택사항)

1.1 해당 유저가 존재하지 않는 경우

유저가 db에 존재하지 않는다면 HTTP 400(Bad Request)요청을 반환하도록 하자.

먼저 임시 유저 db를 만든다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}

이 db에 존재하지 않는다면 HTTP 400 예외를 반환하자.

1
2
3
4
5
6
7
8
9
10
from fastapi import Depends, FastAPI, HTTPException, status

app = FastAPI()

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    ## 이하 생략...

form_data.username 으로 유저 이름을 불러오고 해당 이름을 key로 가진 value를 user_dict으로 선언한다.

user_dict 이 존재하지 않는다면 HTTPException 이 발생한다.

1.2 패스워드 확인하기.

유저가 입력한 패스워드도 확인해야 한다. 이때 사용자가 입력한 비밀번호를 그대로 db에 저장하게 된다면 유출 시 문제가 있을 수 있다.

그렇기 때문에 사전에 지정한 문자열을 유저가 입력한 비밀번호에 추가한 후 db에 저장한다.

그렇게 한다면 비밀번호가 유출되어도, 사전에 지정한 문자열이 알려지지 않는다면 로그인 할 수 없다.

1
2
def fake_hash_password(password: str):
    return "fakehashed" + password

fake_hash_password 함수를 통과한다면 “fakehashed”라는 문자열이 비밀번호에 추가된다.

사용자 이름과 같은 dict을 불러온 후 비밀번호가 같은지 확인해보자.

1
2
3
4
5
6
7
8
class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None

class UserInDB(User):
    hashed_password: str

먼저 User와 UserInDB를 정의하자. User를 추후에 비밀번호 없이 사용 가능하게 UserInDB를 따로 만든다.

1
2
3
4
5
6
7
8
9
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

이때 사용자가 입력한 후 hash한 hashed_password 와 db에서 가져온 user.hashed_password 가 같은지 비교한 후, 다르다면 예외를 발생시킨다.

2. 토큰 반환하기

/token 엔드포인트는 응답이 Json 객체여야 한다. 반환되는 Json객체는 다음을 포함한다.

  • token_type : 이 예제에선 bearer 이다.
  • access_token : 유저에게 발행하는 목적의 토큰. 특정 유저를 식별하고 제 3자에 의한 위조를 방지해야 한다.
1
2
3
4
5
6
7
8
9
10
11
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}

3. 의존성 업데이트

이전에 작성했던 현재 접속한 유저를 확인하는 함수를 적용해보자.

또한 접속해 있더라도, 사용자가 옵션을 비활성화 한 경우에는 예외를 반환한다.

즉 사용자가 db에 존재하고, 인증되었으며, 활성 상태인 경우에만 사용자를 반환한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user
    
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

먼저 /users/me 엔드포인트로 이동한다면 get_current_active_user를 의존하고 있어 그 함수가 실행된다.

get_current_active_userget_current_user를 의존하고 있어, 의존하는 함수를 먼저 실행한다.

get_current_user 를 실행한다면 oauth2_scheme를 통해 요청 헤더에서 토큰을 받아온다.

fake_decode_token 함수를 사용해 해당 유저가 db에 있는지 확인해본다.

1
2
3
4
5
6
7
8
9
10
def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def fake_decode_token(token):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)
    return user

해당 엑세스 토큰이 적절하지 않다면 HTTP 401 (UNAUTHORIZED)를 띄워준다.

적절한 토큰이지만, 활성화된 유저가 아니라면 HTTP 400을 띄워준다.

4. 전체 코드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}

app = FastAPI()

def fake_hash_password(password: str):
    return "fakehashed" + password

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None

class UserInDB(User):
    hashed_password: str

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def fake_decode_token(token):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)
    return user

async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

http://127.0.0.1:8000/docs 로 진입한다면 대화형 문서를 통해 직접 확인해 볼 수 있다.



이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.