Source code for derp.auth.providers.google

"""Google OAuth 2.0 provider implementation."""

from __future__ import annotations

import logging
from urllib.parse import urlencode

import httpx

from derp.auth.providers.base import BaseOAuthProvider, OAuthTokens, OAuthUserInfo
from derp.config import GoogleOAuthConfig

logger = logging.getLogger(__name__)


[docs] class GoogleProvider(BaseOAuthProvider[GoogleOAuthConfig]): """Google OAuth 2.0 provider.""" provider_name = "google" # Google OAuth endpoints AUTHORIZATION_URL = "https://accounts.google.com/o/oauth2/v2/auth" TOKEN_URL = "https://oauth2.googleapis.com/token" USERINFO_URL = "https://www.googleapis.com/oauth2/v2/userinfo"
[docs] def get_authorization_url( self, state: str, scopes: list[str] | None = None, redirect_uri: str | None = None, ) -> str: """Generate Google OAuth authorization URL.""" params = { "client_id": self._config.client_id, "redirect_uri": redirect_uri or self._config.redirect_uri, "response_type": "code", "scope": " ".join(scopes or self._config.scopes), "state": state, "access_type": "offline", # Request refresh token "prompt": "consent", # Force consent screen for refresh token } return f"{self.AUTHORIZATION_URL}?{urlencode(params)}"
[docs] async def exchange_code( self, code: str, redirect_uri: str | None = None, ) -> OAuthTokens | None: """Exchange authorization code for Google tokens.""" data = { "client_id": self._config.client_id, "client_secret": self._config.client_secret, "code": code, "grant_type": "authorization_code", "redirect_uri": redirect_uri or self._config.redirect_uri, } async with httpx.AsyncClient() as client: response = await client.post( self.TOKEN_URL, data=data, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) if response.status_code != 200: error_data = response.json() if response.content else {} message = error_data.get("error_description", response.text) logger.error("Google code exchange failed: %s", message) return None token_data = response.json() return OAuthTokens( access_token=token_data["access_token"], token_type=token_data.get("token_type", "bearer"), refresh_token=token_data.get("refresh_token"), expires_in=token_data.get("expires_in"), scope=token_data.get("scope"), id_token=token_data.get("id_token"), )
[docs] async def get_user_info(self, access_token: str) -> OAuthUserInfo | None: """Get user info from Google.""" async with httpx.AsyncClient() as client: response = await client.get( self.USERINFO_URL, headers={"Authorization": f"Bearer {access_token}"}, ) if response.status_code != 200: logger.error("Google get user info failed: %s", response.text) return None user_data = response.json() return OAuthUserInfo( id=user_data["id"], email=user_data["email"], email_verified=user_data.get("verified_email", False), name=user_data.get("name"), picture=user_data.get("picture"), raw_data=user_data, )