Source code for derp.auth.providers.github

"""GitHub OAuth provider implementation."""

from __future__ import annotations

import logging
from urllib.parse import urlencode

import httpx

from derp.auth.providers.base import BaseOAuthProvider, OAuthTokens, OAuthUserInfo
from derp.config import GitHubOAuthConfig

logger = logging.getLogger(__name__)


[docs] class GitHubProvider(BaseOAuthProvider[GitHubOAuthConfig]): """GitHub OAuth provider.""" provider_name = "github" # GitHub OAuth endpoints AUTHORIZATION_URL = "https://github.com/login/oauth/authorize" TOKEN_URL = "https://github.com/login/oauth/access_token" USERINFO_URL = "https://api.github.com/user" EMAILS_URL = "https://api.github.com/user/emails"
[docs] def get_authorization_url( self, state: str, scopes: list[str] | None = None, redirect_uri: str | None = None, ) -> str: """Generate GitHub OAuth authorization URL.""" params = { "client_id": self._config.client_id, "redirect_uri": redirect_uri or self._config.redirect_uri, "state": state, "scope": " ".join(scopes or self._config.scopes), } return f"{self.AUTHORIZATION_URL}?{urlencode(params)}"
[docs] async def exchange_code( self, code: str, redirect_uri: str | None = None, ) -> OAuthTokens | None: """Exchange authorization code for GitHub tokens.""" data = { "client_id": self._config.client_id, "client_secret": self._config.client_secret, "code": code, "redirect_uri": redirect_uri or self._config.redirect_uri, } async with httpx.AsyncClient() as client: response = await client.post( self.TOKEN_URL, data=data, headers={ "Accept": "application/json", "Content-Type": "application/x-www-form-urlencoded", }, ) if response.status_code != 200: logger.error("GitHub code exchange failed: %s", response.text) return None token_data = response.json() if "error" in token_data: message = token_data.get("error_description", token_data["error"]) logger.error("GitHub code exchange failed: %s", message) return None return OAuthTokens( access_token=token_data["access_token"], token_type=token_data.get("token_type", "bearer"), scope=token_data.get("scope"), )
[docs] async def get_user_info(self, access_token: str) -> OAuthUserInfo | None: """Get user info from GitHub.""" headers = { "Authorization": f"Bearer {access_token}", "Accept": "application/vnd.github+json", "X-GitHub-Api-Version": "2022-11-28", } async with httpx.AsyncClient() as client: # Get basic user info response = await client.get(self.USERINFO_URL, headers=headers) if response.status_code != 200: logger.error("GitHub get user info failed: %s", response.text) return None user_data = response.json() # Get user emails (GitHub may not include email in user response) email = user_data.get("email") email_verified = False if not email: email_response = await client.get(self.EMAILS_URL, headers=headers) if email_response.status_code == 200: emails = email_response.json() # Find primary verified email for email_entry in emails: if email_entry.get("primary") and email_entry.get("verified"): email = email_entry["email"] email_verified = True break # Fallback to any verified email if not email: for email_entry in emails: if email_entry.get("verified"): email = email_entry["email"] email_verified = True break # Last resort: any email if not email and emails: email = emails[0]["email"] if not email: logger.error( "GitHub OAuth: could not retrieve email. " "Ensure the user:email scope is granted." ) return None return OAuthUserInfo( id=str(user_data["id"]), email=email, email_verified=email_verified, name=user_data.get("name") or user_data.get("login"), picture=user_data.get("avatar_url"), raw_data=user_data, )