Skip to content

Instantly share code, notes, and snippets.

@joelverhagen

joelverhagen/main.py

Created May 17, 2020
Embed
What would you like to do?
Convert NuGet restore logs to XES (process mining) format
import os
import collections
import urllib
from pm4py.objects.log.log import EventLog, Trace, Event
from pm4py.objects.log.exporter.xes import exporter as xes_exporter
solutionNameToSourcesToEventLog = {}
for (dirPath, dirNames, fileNames) in os.walk(r"logs"):
for fileName in fileNames:
if not fileName.startswith("restoreLog-"):
continue
print("Processing " + fileName + "...")
# Parse the log file and convert it to a Trace
filePath = os.path.join(dirPath, fileName)
trace = Trace()
trace.attributes["concept:name"] = filePath
sources = []
with open(filePath, "r") as fh:
inSourceList = False
for line in fh:
if inSourceList:
if len(line.strip()) == 0:
inSourceList = False
else:
sources.append(line.strip())
if line.startswith(" GET "):
(method, url) = line.strip().split(" ", 1)
trace.append(Event(**{
"concept:name": url,
"lifecycle:transition": "start",
"nuget:http_method": method
}))
elif line.startswith(" OK ") or line.startswith(" NotFound "):
(status, url, duration) = line.strip().split(" ", 2)
trace.append(Event(**{
"concept:name": url,
"lifecycle:transition": "complete",
"nuget:http_status": status,
"nuget:http_duration": duration
}))
elif line.startswith(" InternalServerError "):
(status, url, duration) = line.strip().split(" ", 2)
trace.append(Event(**{
"concept:name": url,
"lifecycle:transition": "ate_abort",
"nuget:http_status": status,
"nuget:http_duration": duration
}))
elif line.startswith(" ") and len(line) > 2 and line[2] != " " and ("http://" in line or "https://" in line):
raise ValueError("Unknown request/response line: " + line)
elif line.strip() == "Feeds used:":
inSourceList = True
if len(sources) == 0:
raise ValueError("No feeds found.")
sources.sort()
# Add the trace to the proper Event Log
(_, solutionName, _) = fileName.split("-", 2)
if solutionName not in solutionNameToSourcesToEventLog:
sourcesToEventLog = {}
solutionNameToSourcesToEventLog[solutionName] = sourcesToEventLog
else:
sourcesToEventLog = solutionNameToSourcesToEventLog[solutionName]
sourcesKey = repr(sources)
if sourcesKey not in sourcesToEventLog:
eventLog = EventLog()
eventLog.attributes["concept:name"] = solutionName
eventLog.attributes["nuget:sources"] = sources
sourcesToEventLog[sourcesKey] = eventLog
else:
eventLog = sourcesToEventLog[sourcesKey]
eventLog.append(trace)
index = 0
for sourcesToEventLog in solutionNameToSourcesToEventLog.values():
for eventLog in sourcesToEventLog.values():
sources = eventLog.attributes["nuget:sources"]
sourceDescription = urllib.parse.urlparse(sources[0]).hostname
if len(sources) == 2:
sourceDescription += "-and-1-other-source"
elif len(sources) > 2:
sourceDescription += "-and-" + str(len(sources)) + "-other-sources"
exportName = eventLog.attributes["concept:name"] + "_" + str(index) + "_" + sourceDescription + ".xes"
print("Exporting " + exportName + "...")
xes_exporter.apply(eventLog, exportName)
index += 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment