Skip to content

Instantly share code, notes, and snippets.

@chand1012
Created March 5, 2023 14:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chand1012/6238065bd9d1b4cdd61f25f844e88f2d to your computer and use it in GitHub Desktop.
Save chand1012/6238065bd9d1b4cdd61f25f844e88f2d to your computer and use it in GitHub Desktop.
Script to use Cloudflare Workers with Python via Requests
import os
import requests
def endpoint(key):
accountID = os.getenv("WORKERS_KV_ACCOUNT_ID")
namespaceID = os.getenv("WORKERS_KV_NAMESPACE_ID")
return f"https://api.cloudflare.com/client/v4/accounts/{accountID}/storage/kv/namespaces/{namespaceID}/values/{key}"
def set(key, value, ttl=0, expiration=0):
headers = {
"X-Auth-Email": os.getenv("CLOUDFLARE_EMAIL"),
"Authorization": f"Bearer {os.getenv('CLOUDFLARE_KV_API_TOKEN')}",
"Content-Type": "application/json"
}
payload = [{"key": key, "value": value, "expiration_ttl": ttl, "expiration": expiration}]
response = requests.put(
f"https://api.cloudflare.com/client/v4/accounts/{os.getenv('WORKERS_KV_ACCOUNT_ID')}/storage/kv/namespaces/{os.getenv('WORKERS_KV_NAMESPACE_ID')}/bulk",
headers=headers,
json=payload
)
json_response = response.json()
if not json_response["success"]:
raise Exception(", ".join(json_response.get("errors", [])) or "Unknown error")
return json_response["result"]
def get(key):
headers = {
"X-Auth-Email": os.getenv("CLOUDFLARE_EMAIL"),
"Authorization": f"Bearer {os.getenv('CLOUDFLARE_KV_API_TOKEN')}",
"Content-Type": "application/json"
}
response = requests.get(endpoint(key), headers=headers)
json_response = response.json()
if json_response.get("errors"):
return None
return response.text
def delete(key):
headers = {
"X-Auth-Email": os.getenv("CLOUDFLARE_EMAIL"),
"Authorization": f"Bearer {os.getenv('CLOUDFLARE_KV_API_TOKEN')}",
"Content-Type": "application/json"
}
response = requests.delete(endpoint(key), headers=headers)
json_response = response.json()
if not json_response["success"]:
raise Exception(", ".join(json_response.get("errors", [])) or "Unknown error")
return json_response["result"]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment