Skip to content

Instantly share code, notes, and snippets.

@mercuryseries
Created August 1, 2023 19:10
Show Gist options
  • Save mercuryseries/2891f0131436f19a946b9cf19e7fe9cc to your computer and use it in GitHub Desktop.
Save mercuryseries/2891f0131436f19a946b9cf19e7fe9cc to your computer and use it in GitHub Desktop.
Clerk OAuth2 Login
import requests
import asyncio
import websockets
import json
# OAuth app information
oauth_info = {
"authorize_url": "https://clerk.your-domain.com/oauth/authorize",
"callback_url": "https://oauth-client.com/oauth2/callback",
"client_id": "d9g4CT4WYiCBm7EU",
"client_secret": "VVgbT7i6sPo7sTljq2zj12fjmg0jPL5k",
"scopes": "profile email",
"token_fetch_url": "https://clerk.your-domain.com/oauth/token",
"user_info_url": "https://clerk.your-domain.com/oauth/userinfo"
}
async def handle_message(websocket, message):
# Here we can process the received message from the browser
print("Received message:", message)
# Parse the json message
data = json.loads(message)
# Extract the authorization code from the received message
if auth_code := data.get('authorization_code', ''):
# Exchange authorization code for an access token
access_token = get_access_token(auth_code)
# Use the access token to access protected resources
user_info = get_user_info(access_token)
# print("User Information:")
# print(user_info)
print()
print('-' * 20)
print("User Information")
print('-' * 20)
print(f"ID: {user_info.get('user_id')}")
print(f"Name: {user_info.get('name')}")
print(f"Username: {user_info.get('username')}")
print(f"Email: {user_info.get('email')}")
# Send the access token back to the client (browser)
await websocket.send(json.dumps({"login_succeeded": True}))
await websocket.close()
exit(0)
async def server(websocket, path):
try:
async for message in websocket:
await handle_message(websocket, message)
except websockets.exceptions.ConnectionClosedError:
# When the client (browser) disconnects
print("Client disconnected")
# Redirect user to the authorization URL
def get_authorization_url():
params = {
"client_id": oauth_info["client_id"],
"redirect_uri": oauth_info["callback_url"],
"response_type": "code",
"scope": oauth_info["scopes"]
}
return oauth_info["authorize_url"] + "?" + "&".join([f"{key}={value}" for key, value in params.items()])
# Exchange authorization code for access token
def get_access_token(authorization_code):
token_params = {
"client_id": oauth_info["client_id"],
"client_secret": oauth_info["client_secret"],
"code": authorization_code,
"grant_type": "authorization_code",
"redirect_uri": oauth_info["callback_url"]
}
response = requests.post(oauth_info["token_fetch_url"], data=token_params)
json_response = response.json()
# print(f'{json_response=}')
access_token = json_response.get("access_token")
print()
print("Logged in successfully 🎉")
print('-' * 20)
print("Authentication Token")
print('-' * 20)
print(f"Access Token: {json_response.get('access_token')}")
print(f"Refresh Token: {json_response.get('refresh_token')}")
print(f"Expires In (seconds): {json_response.get('expires_in')}")
print(f"Token Type: {json_response.get('token_type')}")
return access_token
# Use the access token to get user information
def get_user_info(access_token):
headers = {
"Authorization": f"Bearer {access_token}"
}
response = requests.get(oauth_info["user_info_url"], headers=headers)
return response.json()
# Main execution
if __name__ == "__main__":
# Redirect user to the authorization URL
authorization_url = get_authorization_url()
print(f"Please visit the following URL and grant permission: {authorization_url}")
start_server = websockets.serve(server, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
import { oauth2CodeSchema } from '@/app/api/settings'
import { OAuthCard } from '@/components/oauth-card'
export default function OAuth2Handler({
searchParams
}: {
searchParams: { [key: string]: string | string[] | undefined }
}) {
// Retrieve the authorization code from the query parameters
const { code } = oauth2CodeSchema.parse(searchParams)
return <OAuthCard authorizationCode={code} />
}
"use client";
import { useEffect } from "react";
import { useRouter } from "next/navigation";
import {
Card,
CardContent,
CardDescription,
CardHeader,
CardTitle,
} from "@/components/ui/card";
import { Loader } from "@/components/icons";
import { useToast } from "@/components/ui/use-toast";
export function OAuthCard({
authorizationCode,
}: {
authorizationCode: string;
}) {
const { toast } = useToast();
const router = useRouter();
useEffect(() => {
const socket = new WebSocket("ws://localhost:8765/");
socket.onopen = (event) => {
console.log("WebSocket connection opened.");
if (authorizationCode) {
const message = JSON.stringify({
authorization_code: authorizationCode,
});
socket.send(message);
}
};
socket.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.login_succeeded === true) {
toast({
title: "Logged in successfully 🎉. ",
description: "You can now close this tab.",
});
router.push("/");
}
};
socket.onclose = (event) => {
console.log("WebSocket connection closed.");
};
// Clean up the WebSocket connection on component unmount
return () => {
socket.close();
};
}, [authorizationCode]);
return (
<div className="mt-20 px-4 md:w-[500px] md:px-0 mx-auto">
<Card>
<CardHeader>
<CardTitle>Polaris Hub Authentication</CardTitle>
<CardDescription>
Thank you for your patience as we securely authenticate you...
</CardDescription>
</CardHeader>
<CardContent>
<div className="flex items-center">
<Loader className="animate-spin mr-3 h-5 w-5" />
<span>Processing...</span>
</div>
</CardContent>
</Card>
</div>
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment