Last active
September 24, 2021 02:47
-
-
Save AdamBanham/e1def562e67844068709ba0ef66748b1 to your computer and use it in GitHub Desktop.
Handling a pm4py event log with more specificity
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pm4py | |
from pm4py.objects.log.obj import EventLog,Event,Trace # under the hood objects within pm4py, subject to change | |
import pandas as pd | |
from tqdm import tqdm | |
from joblib import Parallel,delayed | |
from typing import Tuple | |
def thread_work(group:str,concept:Tuple[str,str,str],xtrace:int,exo_series:pd.DataFrame) -> Trace: | |
""" | |
Thread work to create a single trace, given some group id, key values from dataframe groupby, trace instance number and some dataframe of events to handle for this trace. | |
""" | |
# build a trace object | |
trace_ins = Trace() | |
# to add attributes to a trace, use the .attribute member of the trace | |
# .attribtues is a dictionary | |
trace_ins.attributes["concept:name"] = handle_concept(concept_id) | |
# convert rows into events | |
exo_series = exo_series.sort_values("time:timestamp") # always sort your events by timestamp | |
for _,event_data in exo_series.iterrows(): | |
if (not pd.isna(event_data.charttime) and not pd.isna(event_data.value)): | |
# create a new event object | |
event_ins = Event() | |
# to add attributes to an event object, use dictionary notation dict['name of attribute'] = value | |
event_ins["concept:name"] = f"{group}_datapoint" | |
event_ins["time:timestamp"] = pd.to_datetime(event_data.charttime) #important to convert to pd.timestamp to parse into ProM | |
event_ins["exogenous:value"] = event_data.value | |
# add create event to the trace, a trace supports list like functions and access | |
trace_ins.append(event_ins) | |
# return trace | |
return trace_ins | |
if __name__ == "__main__": | |
# load in your tabular event log, where rows are an event/s | |
dataset = pd.read_csv("some_event_universe.csv") | |
# make a thread pool | |
process_pool = Parallel(n_jobs=-3,verbose=3) # increase verbose to get more info about thread pool | |
# create a dummy event log | |
event_log = EventLog( | |
**{ | |
"attributes" : { | |
"concept:name" : "name of event log" | |
} | |
} | |
) | |
# create a trace per concept:name, using a thread pool | |
traces = process_pool(delayed(thread_work)("foo",concept_id,xtrace,exo_series) | |
for xtrace,(concept,exo_series) | |
in enumerate(tqdm(dataset.groupby(["subject_id","hadm_id","icustay_id"]),desc=f"{group}")) | |
) | |
# link our dummy event log with created traces | |
event_log._list = traces | |
# save out event log, using helpful pm4py function | |
pm4py.write_xes(event_log, "foo_bar.xes") # do not use the gz version, it will take years with many events |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment