Skip to content

Instantly share code, notes, and snippets.

@zmstone
Created July 7, 2024 22:25
Show Gist options
  • Save zmstone/599893c09f417c74bd49e07fb9fd4bda to your computer and use it in GitHub Desktop.
Save zmstone/599893c09f417c74bd49e07fb9fd4bda to your computer and use it in GitHub Desktop.
Test with JWT authn/authz for EMQX using RSA-public key
#!/bin/bash -e
password="$(python3 ./jwt-gen.py)"
json="$(echo $password | cut -d '.' -f 2)"
echo -n 'using: '
echo "${json}" | base64 -d 2>/dev/null || true
echo
mqttx sub -t 'a/#' -h localhost -p 1883 -i aaa -P "$password" -u pub1
#!/bin/bash
openssl genpkey -algorithm RSA -out private_key.pem -pkeyopt rsa_keygen_bits:2048
openssl rsa -pubout -in private_key.pem -out public_key.pem
authentication = [
{
algorithm = "public-key"
disconnect_after_expire = true
from = password
mechanism = jwt
public_key = """~
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2R4P49PEajv6MWfeEnzl
yXxsqyN3c05aLnoT7Ifs/xOO4QyrHiKSMJiUZbjqVGU6uTFMQYwvvAIFxGmAsBY/
llvUElcNYO7JMbzwzQisxerL0M9UgzKCUsHPWfnylR4wy0IchXXhjl6mjvXMoxLe
IJhZQeKujgqKG8EQ6Z0pCaUftgubngJCAvyJSjz6d73hW5jx2+PenMg+6m0eiG1k
IVpqsjJQgXyvi0JS5IyRZuHjnhFN0biRCl5B94WFr2CKopturzR2Qe6UY4Vh/NIs
fXhIQxw2EjCcWvWhn/8AfdzpciYtxYiezXdjgMCHlj6LCtHVa0FZoLOhPeBxvJz6
LQIDAQAB
-----END PUBLIC KEY-----~"""
use_jwks = false
verify_claims {}
}
]
#!/usr/bin/env python3
import jwt
from datetime import datetime, timedelta
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
# Function to load an existing RSA private key from a PEM file
def load_rsa_private_key(path):
with open(path, 'rb') as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=None, # Use a password if your key is encrypted
backend=default_backend()
)
return private_key
# Path to your RSA private key
private_key_path = 'private_key.pem'
private_key = load_rsa_private_key(private_key_path)
# Current time
now = datetime.utcnow()
# JWT Payload with dynamic expiration
payload = {
"username": "pub1",
"acl": [
{"permission": "allow", "action": "sub", "topic": "eq a/#"},
{"permission": "allow", "action": "pub", "topic": "a/1"}
],
# Set expiration to a specific time in the future (e.g., 1 minute from now)
"exp": now + timedelta(minutes=1)
}
# Generate JWT using RS256 and the private key
encoded_jwt = jwt.encode(payload, private_key, algorithm="RS256")
decoded_jwt = encoded_jwt.decode('utf-8')
# Output the JWT
print(f"{decoded_jwt}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment