Skip to content

Instantly share code, notes, and snippets.

@AdamBanham
Last active September 24, 2021 02:47
Show Gist options
  • Save AdamBanham/e1def562e67844068709ba0ef66748b1 to your computer and use it in GitHub Desktop.
Save AdamBanham/e1def562e67844068709ba0ef66748b1 to your computer and use it in GitHub Desktop.
Handling a pm4py event log with more specificity
import pm4py
from pm4py.objects.log.obj import EventLog,Event,Trace # under the hood objects within pm4py, subject to change
import pandas as pd
from tqdm import tqdm
from joblib import Parallel,delayed
from typing import Tuple
def thread_work(group:str,concept:Tuple[str,str,str],xtrace:int,exo_series:pd.DataFrame) -> Trace:
"""
Thread work to create a single trace, given some group id, key values from dataframe groupby, trace instance number and some dataframe of events to handle for this trace.
"""
# build a trace object
trace_ins = Trace()
# to add attributes to a trace, use the .attribute member of the trace
# .attribtues is a dictionary
trace_ins.attributes["concept:name"] = handle_concept(concept_id)
# convert rows into events
exo_series = exo_series.sort_values("time:timestamp") # always sort your events by timestamp
for _,event_data in exo_series.iterrows():
if (not pd.isna(event_data.charttime) and not pd.isna(event_data.value)):
# create a new event object
event_ins = Event()
# to add attributes to an event object, use dictionary notation dict['name of attribute'] = value
event_ins["concept:name"] = f"{group}_datapoint"
event_ins["time:timestamp"] = pd.to_datetime(event_data.charttime) #important to convert to pd.timestamp to parse into ProM
event_ins["exogenous:value"] = event_data.value
# add create event to the trace, a trace supports list like functions and access
trace_ins.append(event_ins)
# return trace
return trace_ins
if __name__ == "__main__":
# load in your tabular event log, where rows are an event/s
dataset = pd.read_csv("some_event_universe.csv")
# make a thread pool
process_pool = Parallel(n_jobs=-3,verbose=3) # increase verbose to get more info about thread pool
# create a dummy event log
event_log = EventLog(
**{
"attributes" : {
"concept:name" : "name of event log"
}
}
)
# create a trace per concept:name, using a thread pool
traces = process_pool(delayed(thread_work)("foo",concept_id,xtrace,exo_series)
for xtrace,(concept,exo_series)
in enumerate(tqdm(dataset.groupby(["subject_id","hadm_id","icustay_id"]),desc=f"{group}"))
)
# link our dummy event log with created traces
event_log._list = traces
# save out event log, using helpful pm4py function
pm4py.write_xes(event_log, "foo_bar.xes") # do not use the gz version, it will take years with many events
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment