Skip to content

Instantly share code, notes, and snippets.

@Bluscream
Created April 6, 2021 15:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Bluscream/ada7aa791868da812c7ba32cf2d6850f to your computer and use it in GitHub Desktop.
Save Bluscream/ada7aa791868da812c7ba32cf2d6850f to your computer and use it in GitHub Desktop.
Convert adguard logs to HAR or OpenAPI3/Swagger
import json
from urllib.parse import urlparse
from pprint import pformat, pprint
app = "ChilloutVR.exe"
host = "api.alphablend.cloud"
filepath = 'C:/Users/Shadow/Desktop/20210406_adguard_filtering_log_records_chilloutvr.json'
urllist = []
LogRecords = []
filepath_swagger = filepath + '.swagger.json'
swagger = {
"openapi" : "3.0.0",
"servers" : [
],
"info" : {
"description" : "Generated from Adguard Log",
"version" : "1.0",
"title" : "ChilloutVR API",
"contact" : {
"email" : "hello@abinteractive.net"
},
"license" : {
"name" : "WTFPL",
"url" : "http://www.whatthefuck.org/licenses/WTFPL.html"
}
},
"tags" : [
{
"name" : "admins",
"description" : "Secured Admin-only calls"
}, {
"name" : "developers",
"description" : "Operations available to regular developers"
}
],
"paths" : { },
"components" : { }
}
with open(filepath) as json_file:
data = json.load(json_file)
for p in data['LogRecords']: # Filter Records to only include the ones we need
url = urlparse(p["Url"])
if app and app != p["AppPath"]: continue
if host and host != url.hostname: continue
if p["RequestHeaders"] is not None and "accept" in p["RequestHeaders"] and "image" in p["RequestHeaders"]["accept"]: continue
if ".png" in p["Url"] or ".jpg" in p["Url"]: continue
if p["ProtocolType"] != "Tcp": continue
if not p["Url"]: continue
LogRecords.append(p)
data['LogRecords'] = LogRecords
for p in data['LogRecords']: # Process Records into OpenAPI file
url = urlparse(p["Url"])
# if url.hostname != "api.alphablend.cloud": continue
# if p["Url"] in urllist: continue
# urllist.append(p["Url"])
print(f'{p["RemoteIpAddress"]} => {pformat(url)}')
host = url.scheme + "://" + url.hostname
if not any(d['url'] == host for d in swagger["servers"]):
swagger["servers"].append({"description": url.hostname,"url": host})
if not any(url.path in d for d in swagger["paths"]):
swagger["paths"][url.path] = {}
Method = p["Method"].lower()
if not any(Method in d for d in swagger["paths"][url.path]):
swagger["paths"][url.path][Method] = { "responses": {}, "parameters": [], "requestBody": { "content": {} } }
StatusCode = str(p["StatusCode"])
if not any(StatusCode in d for d in swagger["paths"][url.path][Method]["responses"]):
swagger["paths"][url.path][Method]["responses"][StatusCode] = { "description": p["StatusDescription"], "headers": {} }
# if not "description" in swagger["paths"][url.path][Method]["responses"][StatusCode]:
# swagger["paths"][url.path][Method]["responses"][StatusCode]["description"] = "Endpoint"
if p["RequestHeaders"] is not None:
for h, v in p["RequestHeaders"].items():
if h == "Accept": pass
elif h == "Content-Type": swagger["paths"][url.path][Method]["requestBody"]["content"][v] = {}
elif not any(d['name'] == h for d in swagger["paths"][url.path][Method]["parameters"]):
swagger["paths"][url.path][Method]["parameters"].append({ "in": "header", "name": h, "schema": { "type": "string" }, "description": v })
if p["ResponseHeaders"] is not None:
for h, v in p["ResponseHeaders"].items():
if h == "Accept": swagger["paths"][url.path][Method]["responses"][StatusCode][v] = {}
elif h == "Content-Type": swagger["paths"][url.path][Method]["responses"][StatusCode][v] = {}
else: # elif not any(d['name'] == h for d in swagger["paths"][url.path][Method]["parameters"]):
swagger["paths"][url.path][Method]["responses"][StatusCode]["headers"][h] = { "schema": { "type": "string" }, "description": v }
#
with open(filepath_swagger, 'w') as outfile:
json.dump(swagger, outfile, indent=4, sort_keys=True)
exit()
"""
with open(filepath, 'w') as outfile:
json.dump(data, outfile, indent=4, sort_keys=True)
"""
"""
with open(filepath_har, 'w') as outfile:
json.dump(har, outfile, indent=4, sort_keys=True)
"""
"""
filepath_har = filepath + '.har'
har = {
"log": {
"version": "1.2",
"creator": {
"name": "WebInspector",
"version": "537.36"
},
"pages": [
{
"startedDateTime": "2021-04-06T12:56:33.907Z",
"id": "page_1",
"title": "https://www.medianova.com/en-blog/2019/07/15/a-step-by-step-guide-to-generating-a-har-file",
"pageTimings": {
"onContentLoad": 1016.4359999998851,
"onLoad": 1775.1450000005207
}
}
],
"entries": [
]
}
}
"""
"""
entry = {
"_initiator": {
"type": "other"
},
"_priority": "VeryHigh",
"_resourceType": "document",
"cache": {},
"connection": "5156",
"pageref": "page_1",
"request": {
"method": p["Method"],
"url": p["Url"],
"httpVersion": "http/2.0",
"headers": [
],
"queryString": [],
"cookies": [
],
"headersSize": -1,
"bodySize": 0
},
"response": {
"status": p["StatusCode"],
"statusText": p["StatusDescription"],
"httpVersion": "http/2.0",
"headers": [
],
"cookies": [],
"content": {
"size": p["ResponseSizeBytes"],
"mimeType": "text/json"
},
"redirectURL": "",
"headersSize": -1,
"bodySize": 0,
"_transferSize": 86,
"_error": None
},
"serverIPAddress": p["RemoteIpAddress"],
"startedDateTime": p["Time"],
"time": 15.251000000716886,
"timings": {
"blocked": 5.708000000047381,
"dns": -1,
"ssl": -1,
"connect": -1,
"send": 0.16800000000000015,
"wait": 7.075999999993772,
"receive": 2.2990000006757327,
"_blocked_queueing": 4.304000000047381
}
}
if p["ResponseHeaders"] is not None:
for h, v in p["ResponseHeaders"].items():
entry["response"]["headers"].append({ "name": h, "value": v })
try: entry["response"]["content"]["mimeType"] = p["ResponseHeaders"]["content-type"].split(";")[0]
except: pass
if p["RequestHeaders"] is not None:
for h, v in p["RequestHeaders"].items():
entry["request"]["headers"].append({ "name": h, "value": v })
har["log"]["entries"].append(entry)
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment