66 lines
1.9 KiB
Python
66 lines
1.9 KiB
Python
import os
|
|
import time
|
|
from typing import Any, Dict
|
|
from uuid import uuid4
|
|
|
|
import jwt
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
|
|
from modules.auth.supabase_bearer import verify_supabase_token_dep
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
TLSYNC_TOKEN_AUDIENCE = "tlsync"
|
|
DEFAULT_TLSYNC_TOKEN_TTL_SECONDS = 300
|
|
|
|
|
|
def _tlsync_token_ttl_seconds() -> int:
|
|
raw_value = os.getenv("TLSYNC_TOKEN_TTL_SECONDS")
|
|
if not raw_value:
|
|
return DEFAULT_TLSYNC_TOKEN_TTL_SECONDS
|
|
|
|
try:
|
|
ttl = int(raw_value)
|
|
except ValueError as exc:
|
|
raise HTTPException(status_code=500, detail="TLSync token TTL is misconfigured") from exc
|
|
|
|
if ttl <= 0 or ttl > 3600:
|
|
raise HTTPException(status_code=500, detail="TLSync token TTL is out of range")
|
|
return ttl
|
|
|
|
|
|
def create_tlsync_token(user_claims: Dict[str, Any]) -> Dict[str, Any]:
|
|
secret = os.getenv("TLSYNC_SECRET")
|
|
if not secret:
|
|
raise HTTPException(status_code=503, detail="TLSync authentication is not configured")
|
|
|
|
user_id = user_claims.get("sub")
|
|
if not user_id:
|
|
raise HTTPException(status_code=401, detail="Authenticated user id is missing")
|
|
|
|
ttl_seconds = _tlsync_token_ttl_seconds()
|
|
issued_at = int(time.time())
|
|
expires_at = issued_at + ttl_seconds
|
|
payload = {
|
|
"sub": user_id,
|
|
"aud": TLSYNC_TOKEN_AUDIENCE,
|
|
"iat": issued_at,
|
|
"exp": expires_at,
|
|
"jti": uuid4().hex,
|
|
}
|
|
|
|
token = jwt.encode(payload, secret, algorithm="HS256")
|
|
return {
|
|
"token": token,
|
|
"token_type": "Bearer",
|
|
"expires_in": ttl_seconds,
|
|
"expires_at": expires_at,
|
|
}
|
|
|
|
|
|
@router.get("/token")
|
|
async def get_tlsync_token(user_claims: Dict[str, Any] = Depends(verify_supabase_token_dep)) -> Dict[str, Any]:
|
|
"""Issue a short-lived TLSync token for an authenticated Supabase user."""
|
|
return create_tlsync_token(user_claims)
|