Skip to content

Instantly share code, notes, and snippets.

@antigenius0910
Last active November 3, 2022 09:49
Show Gist options
  • Save antigenius0910/2528b68510a522c4f7ffd22d32e06c49 to your computer and use it in GitHub Desktop.
Save antigenius0910/2528b68510a522c4f7ffd22d32e06c49 to your computer and use it in GitHub Desktop.
Cloudflare instant log
import json
import requests
import websockets
import asyncio
import os
import urllib.request
url = "https://api.cloudflare.com/client/v4/"
x_auth_token = os.environ['CLOUDFLARE_LOG_READ_TOKEN']
zone_id = "$ZONE_ID"
external_ip = urllib.request.urlopen('https://ident.me').read().decode('utf8')
### all fields selectors reference here https://developers.cloudflare.com/logs/reference/log-fields/zone/http_requests/
filter_conf = {
"fields": "ClientRequestHost,ClientRequestMethod,ClientRequestPath,EdgeResponseStatus",
"sample": 1,
"filter": "{\"where\":{\"and\":[{\"key\":\"ClientIP\",\"operator\":\"eq\",\"value\":\"" + external_ip + "\"}]}}",
"kind": "instant-logs"
}
logpush_url = url + "/zones/%s/logpush/edge" % zone_id
headers = {
'Authorization': 'Bearer ' + x_auth_token,
'Content-Type': 'application/json'
}
# # Create job
r = requests.post(logpush_url + "/jobs", headers=headers, json=filter_conf)
print(r.text)
session_id = json.loads(r.text)['result']['session_id']
async def connect_listen():
async with websockets.connect("wss://logs.cloudflare.com/instant-logs/ws/sessions/" + session_id) as websocket:
while True:
try:
message = await websocket.recv()
print(str(message))
except websockets.exceptions.ConnectionClosed:
print('Connection with server closed')
break
asyncio.get_event_loop().run_until_complete(connect_listen())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment