プログラミング初心者です。
現在、FastAPIでWebアプリ開発を行っております。
質問内容についてですが、OAuth2のOAuth2Passwordを用いてログイン機能を作成しております。
しかし、認証がうまくいかず、困っております。
from typing import Optional from datetime import datetime, timedelta from fastapi import APIRouter, Depends, HTTPException, Form from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import jwt from passlib.context import CryptContext from settings.db import session from models.users import Users from routers.users import create_user from schemas.users import UserResponse, UserRequest from schemas.admin import Tokens router = APIRouter() pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") OAuth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False) def authenticate(name:str, password:str): """ユーザーの認証 """ session.commit() try: user = session.query(Users).filter(Users.name == name).first() except: session.rollback() if pwd_context.verify(password, user.password) == False: raise HTTPException(status_code=401, detail="パスワードが違います。") finally: session.remove() return user def create_tokens(user_id:int): """トークンの発行 """ access_payload={ "token_type": "access_token", "exp": datetime.utcnow() + timedelta(minutes=60), "user_id": user_id } refresh_payload={ "token_type": "refresh_token", "exp": datetime.utcnow() + timedelta(days=60), "user_id": user_id } access_token= jwt.encode(access_payload, "SECRET_KEY1234", algorithm="HS256") refresh_token= jwt.encode(refresh_payload, "SECRET_KEY1234", algorithm="HS256") user= session.query(Users).filter(Users.id == user_id).first() user.refresh_token=refresh_token session.commit() return {'access_token': access_token, 'refresh_token': refresh_token, 'token_type': 'bearer'} def get_current_user_from_token(token:str, token_type:str): """tokenでユーザーの取得 """ payload = jwt.decode(token, "SECRET_KEY1234", algorithm=["HS256"]) if payload["token_type"] != token_type: raise HTTPException(status_code=401, detail='トークンタイプが一致しません') session.commit() user = session.query(Users).filter(Users.id == payload.user_id).first() if token_type == 'refresh_token' and user.refresh_token != token: print(user.refresh_token, '¥n', token) raise HTTPException(status_code=401, detail='リフレッシュトークンが一致しません') return user async def get_current_user(token:str=Depends(OAuth2_scheme)): """アクセストークンからuserの取得""" return get_current_user_from_token(token, "access_token") async def get_current_user_with_refersh_token(token:str=Depends(OAuth2_scheme)): """リフレッシュトークンからuserの取得""" return get_current_user_from_token(token, "refreah_token") @router.post("/token", response_model=Tokens) async def login(form:OAuth2PasswordRequestForm=Depends()): """トークン発行 """ user = authenticate(form.username, form.password) return create_tokens(user.id) @router.get("/refresh_token", response_model=Tokens) async def refresh_token(current_user:Users=Depends(get_current_user_with_refersh_token)): """リフレッシュトークンでトークンの取得""" return create_tokens(current_user.id) @router.get("/users/me", response_model=UserResponse) async def read_user_me(current_user:Users=Depends(get_current_user)): """ログインユーザーの取得""" return current_user
from pydantic import BaseModel class Tokens(BaseModel): access_token:str refresh_token:str token_type:str class Config: orm_mode=True
from datetime import datetime from sqlalchemy import Column, String, Integer, DateTime from sqlalchemy.orm import relationship from passlib.context import CryptContext from settings.db import Base from .groups import users_groups pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") class Users(Base): """ユーザーアカウント.""" __tablename__ = "users" name = Column(String(50), nullable=False) email = Column(String(50), nullable=False, unique=True) password = Column(String(255), nullable=False) age = Column(Integer, nullable=False) gender = Column(String(3), nullable=False) height = Column(Integer, nullable=False) weight = Column(Integer, nullable=False) kind_of_sport = Column(String(6), nullable=False) type_of_team = Column(String(20), nullable=False) years_of_experience = Column(Integer, nullable=False) group_id = Column(Integer, nullable=True) last_login = Column(DateTime, nullable=False, default=datetime.now) created_at = Column(DateTime, nullable=False, default=datetime.now) updated_at = Column(DateTime, nullable=True, default=None) refresh_token = Column(String(255), nullable=True)
上記の様に実装しました。
![]
swagger上でログイン関数を実行するとトークンを発行できるのですが、Oauth2passworddbearerで認証を行うとnot foundのエラーが返ってきます。
どの部分でエラーが起こっているのかが分からない状況なのでどう解決していいのかも分かりません。
考えられる不具合など教えて頂けますと嬉しいです。
解決にご助力頂きたいです。
バッドをするには、ログインかつ
こちらの条件を満たす必要があります。