Sessions management

This commit is contained in:
Peter Vacho 2024-11-28 01:06:02 +01:00
parent 8e0837f7f4
commit b226432d77
Signed by: school
GPG key ID: 8CFC3837052871B4
2 changed files with 110 additions and 0 deletions

View file

@ -12,6 +12,7 @@ from src.db.init import initialize_db
from src.utils.logging import get_logger
from .auth import router as auth_router
from .sessions import router as sessions_router
log = get_logger(__name__)
@ -51,6 +52,7 @@ app = FastAPI(
)
app.include_router(auth_router)
app.include_router(sessions_router)
@app.get("/ping")

108
src/api/sessions.py Normal file
View file

@ -0,0 +1,108 @@
from datetime import datetime
from typing import Literal, final
from beanie import PydanticObjectId
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
from src.api.auth.jwt import invalidate_access_tokens
from src.db.models.token import Token
from src.utils.logging import get_logger
from .auth import AnyTokenDep, CurrentUserDep
__all__ = ["router"]
log = get_logger(__name__)
base_router = APIRouter(tags=["Session Management"])
sessions_router = APIRouter(tags=["Session Management"], prefix="/sessions")
@final
class UserSession(BaseModel):
"""Information about given user session."""
id: str
session_type: Literal["refresh", "access"]
parent_session_id: str | None
expires_at: datetime
created_at: datetime
revoked: bool
@classmethod
def from_token(cls, token: Token) -> "UserSession":
"""Construct UserSession from database Token object."""
if token.id is None:
raise RuntimeError("Never")
if token.parent_token and token.parent_token.id is None:
raise RuntimeError("Never")
return cls(
id=str(token.id),
session_type=token.tok_type,
parent_session_id=str(token.parent_token.id) if token.parent_token else None,
expires_at=token.expires_at,
created_at=token.issued_at,
revoked=token.revoked,
)
@base_router.get("/users/{user_id}/sessions")
async def get_user_sessions(user_id: PydanticObjectId, user: CurrentUserDep) -> list[UserSession]:
"""Get all sessions of given user.
Note that you can only access your own sessions.
"""
if user.id is None:
raise RuntimeError("Never")
if user.id != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You can only see your own sessions")
tokens = await Token.find(Token.user == user).to_list()
return [UserSession.from_token(token) for token in tokens]
@base_router.get("/session")
async def get_current_session(token_data: AnyTokenDep) -> UserSession:
"""Get information about the current session."""
_, db_token = token_data
return UserSession.from_token(db_token)
@sessions_router.get("/{session_id}")
async def get_session(session_id: PydanticObjectId, user: CurrentUserDep) -> UserSession:
"""Get details about given session."""
token = await Token.get(session_id)
if token is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No such session")
if token.user != user:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You can only see your own sessions")
return UserSession.from_token(token)
@sessions_router.delete("/{session_id}")
async def delete_session(session_id: PydanticObjectId, user: CurrentUserDep) -> None:
"""Invalidate the specified session.
If this is the active session, this will log you out.
"""
token = await Token.get(session_id)
if token is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No such session")
if token.user != user:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You can only delete your own sessions")
if token.revoked:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="This session is already invalidated")
if token.tok_type == "refresh":
await invalidate_access_tokens(token)
token.revoked = True
_ = await token.replace()
router = APIRouter()
router.include_router(base_router)
router.include_router(sessions_router)