Skip to content

Instantly share code, notes, and snippets.

@garyfeng
Last active July 17, 2018 03:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save garyfeng/72545b10424acb8a081a07c58f46c07d to your computer and use it in GitHub Desktop.
Save garyfeng/72545b10424acb8a081a07c58f46c07d to your computer and use it in GitHub Desktop.
The following scripts were used on 2018-07-09 to generate descriptive writing features for 2017 reporting. It's done on Kurt, using the pdia docker. Steps:
# Functions for extracting the descriptive features for the 2017 operational analysis
# Changes from make2017FinalFeatures.py:
# - min
#############
import StringIO
import os
import traceback
from pdia.durSinceBlockStart import *
from pdia.writing2017.addFeatureTextChange import addTextChangeVars
from pdia.writing2017.addKeyPressVars import addKeyPressVars
from pdia.writing2017.burstFeatures import *
from pdia.writing2017.editingFeatures import mapEditingFeatures, reduceEditingFeatures, reduceDeleteFeatures
from pdia.writing2017.featuresConfig2017 import featureConfig2017
from pdia.writing2017.reduceFeatureInitialKeypress import reduceFeatureInitialKeypress
from pdia.writing2017.getData import getData
from pdia.writing2017.reducePauseFeatures import reducePauseFeatures
from pdia.writing2017.addWordTokens import *
# Configs
# parser config
from pdia.writing2017.reduceWordBasedFeatures import reduceWordBasedFeatures
# 2016 default configuration
# 2017 data config
# Read data
def getData2017(filename, featureConfig=featureConfig2017):
"""
Simply a wrap of getData with the 2017 config
:param filename: the file name to process
:param featureConfig: using the 2017 configuration
:return: the parsed df
"""
return getData(filename, featureConfig=featureConfig)
def mapStep(df, feaConfig, verbose=False):
"""
MAP step: creating keystroke level features, adding columns
:param df: the data frame for a booklet, contain potentially multiple blocks
:param feaConfig: the configuration for data import/parsing
:param verbose: if True, saves the interim data
"""
# asserts
if df is None:
logger.error("MapStep: input df is None; quitting")
return None
if not any([(k in df.columns) for k in feaConfig["byVars"]]):
# keyword missing
return None
studentID = df["BookletNumber"].unique()[0]
# ##### MAP ####
# to handle the feature functions in the featureMap object
# ##############
def mapBlock(d):
# return None if no keystroke log is available
if d.loc[d.Label == "Pilot Observables", :].shape[0] == 0:
# print("mapBlock: No Observable data for the block")
logger.debug("mapBlock: No Observable data for the block")
return None
d = durSinceBlockStart(d) if d is not None else None
#d = addKeyPressVars(d) if d is not None else None
#d = addTextChangeVars(d) if d is not None else None
d = addFeatureIKI(d) if d is not None else None
d = addWordTokens(d) if d is not None else None
# garyfeng 2018-07-09: changing default minJumpDistance from 2 to 5
d = mapEditingFeatures(d, verbose=False, minTextChangeEvents=5, minJumpDistance=5) if d is not None else None
return d
try:
# the following groupby().apply() is causing occasional python crashes
# df = df \
# .groupby(feaConfig["byVars"]) \
# .apply(mapBlock)
# taking a stupid method here
tmp=[]
for b in df["BlockCode"].unique():
tmp.append(df.loc[df.BlockCode == b, :].pipe(mapBlock))
df = pd.concat(tmp)
except Exception as e:
logger.error("Error in mapStep")
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
df.to_csv("Error_{}_MapStep.csv".format(studentID), encoding="utf-8")
return
# saving
if verbose:
outputFileName = "{}_mapStep.csv".format(
df["BookletNumber"].unique()[0]
)
# remove
df.loc[:, ~df.columns.duplicated()].to_csv(outputFileName, encoding="utf-8")
# simplified for human reading
outputFileName = "{}_mapStep_simplified.csv".format(
df["BookletNumber"].unique()[0]
)
rowsToKeep = df.keystrokeEvent.notnull() & ~df.keystrokeEvent.isin(["Keypress"])
df.loc[rowsToKeep, "textLenReconText"] = df.loc[rowsToKeep, "reconstructedText"].str.len()
colsToKeep = ['BookletNumber', 'BlockCode', 'AccessionNumber', 'rowCount',
'keystrokeEvent', 'keyName', 'durSinceBlockStart', 'IKI',
'reconCursorPosition', 'textLength', "textLenReconText",
'textContext', 'intendedWord', 'currentToken',
# 'interWord', 'wtf', 'isAtWordBoundary', 'isWordInitial',
'intraWord',
'focalWordNum', 'interWordRunNumber', 'interClauseRunNumber', 'isJump', 'isReplace',
'reconstructedText']
# to get rid of duplicated columns, remove the multiple index first
df.loc[rowsToKeep, colsToKeep]\
.to_csv(outputFileName, encoding="utf-8")
return df
# note we are not catching exceptions here, to save time.
# errors are caught at the highest level
def reduceStep(df, feaConfig, verbose=False):
"""
REDUCE step: taking the df after the MAP step, and reduce to features, one block a row.
:param df: the df passed from the mapStep
:param feaConfig: the configuration file with parameters setting the byVars
:param verbose: to be passed to reduce functions to save interim data frame if True
:return: a Pandas data frame, with # of rows as blocks, and features as columns
"""
# asserts
if df is None:
logger.error("ReduceStep: input df is None; quitting")
return None
if not any([(k in df.columns) for k in feaConfig["byVars"]]):
# keyword missing
return None
studentID = df["BookletNumber"].unique()[0]
# #### Reduce ####
# here we begin to summarize the feature columns
# ################
# This is obviously a waste of time to repeat some feature steps in these
# will deal with this later. For now, this is pleasing to the eyes
try:
dfFeaInitialKeypress = df.groupby(feaConfig["byVars"]).apply(
lambda d: reduceFeatureInitialKeypress(d, verbose=verbose)
).reset_index()
#print dfFeaInitialKeypress
except Exception as e:
logger.error("Error in reduceStep: reduceFeatureInitialKeypress")
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
return
try:
dfFeaWordBased = df.groupby(feaConfig["byVars"]).apply(
lambda d: reduceWordBasedFeatures(d, verbose=verbose)
).reset_index()
#print dfFeaWordBased
except Exception as e:
logger.error("Error in reduceStep: reduceWordBasedFeatures")
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
return
try:
dfFeaPauses = df.groupby(feaConfig["byVars"]).apply(
lambda d: reducePauseFeatures(d, verbose=verbose)
).reset_index()
#print dfFeaPauses
except Exception as e:
logger.error("Error in reduceStep: reducePauseFeatures")
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
return
try:
# garyfeng 2018-07-09: changing minRunLength to 1 for deletions to get sum of time before deletions
dfFeaDelete = df.groupby(feaConfig["byVars"]).apply(
lambda d: reduceDeleteFeatures(d, verbose=verbose, minRunLength = 1)
).reset_index()
#print dfFeaDelete
except Exception as e:
logger.error("Error in reduceStep: reduceDeleteFeatures")
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
return
try:
dfFeaEditing = df.groupby(feaConfig["byVars"]).apply(
lambda d: reduceEditingFeatures(d, verbose=verbose)
).reset_index()
#print dfFeaEditing
except Exception as e:
logger.error("Error in reduceStep: reduceEditingFeatures")
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
return
try:
nDiscrepancyMarkers = df.groupby(feaConfig["byVars"]).apply(
lambda d: d\
.loc[d.reconstructedText.notnull()]\
.reconstructedText.iloc[-1].count("`")
).rename("flagDiscrepancyMarkers").reset_index()
except Exception as e:
logger.error("Error in reduceStep: reduceEditingFeatures")
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
return
try:
adminEventList = feaConfig['adminEventList']
nAdminRaiseHandEvents = df.groupby(feaConfig["byVars"]).apply(
lambda d: d\
.loc[(d.Label.isin(adminEventList)) | (d.AccessionNumber == "RaiseHand")] \
.shape[0]
).rename("flagAdminRaiseHandEvents").reset_index()
except Exception as e:
logger.error("Error in reduceStep: reduceEditingFeatures")
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
return
try:
dfSummary = pd.concat([dfFeaInitialKeypress, dfFeaWordBased,
dfFeaPauses, dfFeaDelete, dfFeaEditing,
nDiscrepancyMarkers, nAdminRaiseHandEvents], axis=1)
except Exception as e:
logger.error("Error in reduceStep: merging all features")
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
return
return dfSummary
def processBooklet(filename,
featureConfig,
verbose=False,
outputFeaturePath = ".",
featureSetName = "finalFeatures", ):
"""
Process a single booklet CSV file. Steps involving reading/QCing data, map, reduce, saving.
:param filename: full path to the CSV file
:param featureConfig: the dict with config info
:param verbose: if true, save intermediate data frames to the current directory
:param outputFeaturePath: output path
:param featureSetName: name of the final feature set; will be the last part of the output csv file name
:return: none
"""
# output file path and name
outputFeatureFileName = os.path.join(outputFeaturePath,
os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv")
# debug
logger.info("Processing %s", filename)
#############
# Get Data
try:
df = getData(filename, featureConfig=featureConfig)
except:
df = None
if df is None:
logger.error("processBooklet: getData failed for %s", filename)
return
studentID = df["BookletNumber"].unique()[0]
#############
# Map
#logger.info("Map %s", filename)
try:
df = mapStep(df, verbose=verbose, feaConfig=featureConfig)
except Exception as e:
logger.error("Error in mapStep: %s", filename)
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
return
if df is None:
logger.error("processBooklet: mapStep failed for %s", filename)
return
#############
# Reduce
#logger.info("Reduce %s", filename)
try:
df = reduceStep(df, verbose=verbose, feaConfig=featureConfig)
except Exception as e:
logger.error("Error in reduceStep: %s", filename)
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
return
if df is None:
logger.error("processBooklet: reduceStep failed for %s", filename)
return
#############
# Save Data
# debug
logger.info("Saving %s", filename)
try:
# first drop duplicated rows (occasionally there will be)
# then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force
df \
.loc[:, ~df.columns.duplicated()]\
.drop_duplicates() \
.to_csv(outputFeatureFileName, encoding='utf-8')
except Exception as e:
logger.error("Error writing to_csv: %s", outputFeatureFileName)
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
return
logger.info("Done. Output= %s", outputFeatureFileName)
return
def processBooklet_dask(filename,
featureConfig,
verbose=False,
outputFeaturePath = ".",
featureSetName = "finalFeatures"):
"""
processing a writing CSV file, for dask parallel processing. We remove any logger reference here.
:param filename: full path to the CSV file
:param featureConfig: the dict with config info
:param verbose: if true, save intermediate data frames to the current directory
:param outputFeaturePath: output path
:param featureSetName: name of the final feature set; will be the last part of the output csv file name
:return: none
"""
# output file path and name
outputFeatureFileName = os.path.join(outputFeaturePath,
os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv")
#############
# Get Data
try:
df = getData(filename, featureConfig=featureConfig)
except:
return
if df is None:
logger.error("processBooklet: getData failed for %s", filename)
return
#############
# Map
try:
df = mapStep(df, verbose=verbose, feaConfig=featureConfig)
except:
return
if df is None:
#logger.error("processBooklet: mapStep failed for %s", filename)
return
#############
# Reduce
try:
df = reduceStep(df, verbose=verbose, feaConfig=featureConfig)
except:
return
if df is None:
return
#############
# Save Data
try:
# first drop duplicated rows (occasionally there will be)
# then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force
df \
.loc[:, ~df.columns.duplicated()]\
.drop_duplicates() \
.to_csv(outputFeatureFileName, encoding='utf-8')
except Exception as e:
return
return
import sys
if __name__ == '__main__':
if len(sys.argv) == 1:
print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n"
exit()
if sys.argv[1] not in ["Grade4", "Grade8", "test"]:
print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n"
exit()
import glob
from pdia import *
from pdia.writing2017.make2017Features import *
import dask.bag as db
from distributed import Client
import datetime
import time
# paths
today=time.strftime("%Y%m%d_%H", time.localtime()) # timestamp as 20170810_22 for date and hour (24h) to run the script
# garyfeng: to resume from a run:
# today = "20180709_21"
####
grade = sys.argv[1]
inputCSVPath = "{}/".format(grade)
outputFeaturePath = "{}_descFeatures_{}/".format(grade, today)
if not os.path.exists(outputFeaturePath):
os.makedirs(outputFeaturePath)
featureSetName = "descFeature{}".format(today)
print "input folder: {}".format(inputCSVPath)
print "output folder: {}".format(outputFeaturePath)
print "featureSetName: {}".format(featureSetName)
#########
# getting the files to process
print "======= Scanning for CSV files ============"
print datetime.datetime.now()
fileList = glob.glob(os.path.join(inputCSVPath, "*_ObservableData.csv"))
if len(fileList)==0:
print "\nNo CSV files found in the directory\n"
exit()
##########
# garyfeng: to resume by ignoring ones with output already.
finishedFiles = glob.glob(os.path.join(outputFeaturePath, "*_{}.csv".format(featureSetName)))
finishedFiles = [f.replace(outputFeaturePath, inputCSVPath).replace("_"+featureSetName, "") for f in finishedFiles]
fileList = list(set(fileList) - set(finishedFiles))
##########
print "Total input CSV files: %i" % len(fileList)
print datetime.datetime.now()
import gc
def processIt(filename):
processBooklet_dask(filename,
featureConfig=featureConfig2017,
verbose=False,
outputFeaturePath=outputFeaturePath,
featureSetName=featureSetName)
gc.collect()
return
print "======= Begin Processing ============"
print datetime.datetime.now()
print "====================================="
# test with 1 file
# processFile(fileList[0])
# Using distributed clients
client = Client()
# run parallel with dask
db.from_sequence(fileList).map(processIt).compute()
print "======== End Processing ==========="
print datetime.datetime.now()
print "==================================="
# Functions for extracting the descriptive features for the 2017 operational analysis
# Changes from make2017FinalFeatures.py:
# - min
#############
import StringIO
import os
import traceback
from pdia.durSinceBlockStart import *
from pdia.writing2017.addFeatureTextChange import addTextChangeVars
from pdia.writing2017.addKeyPressVars import addKeyPressVars
from pdia.writing2017.burstFeatures import *
from pdia.writing2017.editingFeatures import mapEditingFeatures, reduceEditingFeatures, reduceDeleteFeatures
from pdia.writing2017.featuresConfig2017 import featureConfig2017
from pdia.writing2017.reduceFeatureInitialKeypress import reduceFeatureInitialKeypress
from pdia.writing2017.getData import getData
from pdia.writing2017.reducePauseFeatures import reducePauseFeatures
from pdia.writing2017.addWordTokens import *
# Configs
# parser config
from pdia.writing2017.reduceWordBasedFeatures import reduceWordBasedFeatures
# 2016 default configuration
# 2017 data config
# Read data
def getData2017(filename, featureConfig=featureConfig2017):
"""
Simply a wrap of getData with the 2017 config
:param filename: the file name to process
:param featureConfig: using the 2017 configuration
:return: the parsed df
"""
return getData(filename, featureConfig=featureConfig)
def mapStep(df, feaConfig, verbose=False):
"""
MAP step: creating keystroke level features, adding columns
:param df: the data frame for a booklet, contain potentially multiple blocks
:param feaConfig: the configuration for data import/parsing
:param verbose: if True, saves the interim data
"""
# asserts
if df is None:
logger.error("MapStep: input df is None; quitting")
return None
if not any([(k in df.columns) for k in feaConfig["byVars"]]):
# keyword missing
return None
studentID = df["BookletNumber"].unique()[0]
# ##### MAP ####
# to handle the feature functions in the featureMap object
# ##############
def mapBlock(d):
# return None if no keystroke log is available
if d.loc[d.Label == "Pilot Observables", :].shape[0] == 0:
# print("mapBlock: No Observable data for the block")
logger.debug("mapBlock: No Observable data for the block")
return None
d = durSinceBlockStart(d) if d is not None else None
#d = addKeyPressVars(d) if d is not None else None
#d = addTextChangeVars(d) if d is not None else None
d = addFeatureIKI(d) if d is not None else None
d = addWordTokens(d) if d is not None else None
# garyfeng 2018-07-09: changing default minJumpDistance from 2 to 5
d = mapEditingFeatures(d, verbose=False, minTextChangeEvents=5, minJumpDistance=5) if d is not None else None
return d
try:
# the following groupby().apply() is causing occasional python crashes
# df = df \
# .groupby(feaConfig["byVars"]) \
# .apply(mapBlock)
# taking a stupid method here
tmp=[]
for b in df["BlockCode"].unique():
tmp.append(df.loc[df.BlockCode == b, :].pipe(mapBlock))
df = pd.concat(tmp)
except Exception as e:
logger.error("Error in mapStep")
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
df.to_csv("Error_{}_MapStep.csv".format(studentID), encoding="utf-8")
return
# saving
if verbose:
outputFileName = "{}_mapStep.csv".format(
df["BookletNumber"].unique()[0]
)
# remove
df.loc[:, ~df.columns.duplicated()].to_csv(outputFileName, encoding="utf-8")
# simplified for human reading
outputFileName = "{}_mapStep_simplified.csv".format(
df["BookletNumber"].unique()[0]
)
rowsToKeep = df.keystrokeEvent.notnull() & ~df.keystrokeEvent.isin(["Keypress"])
df.loc[rowsToKeep, "textLenReconText"] = df.loc[rowsToKeep, "reconstructedText"].str.len()
colsToKeep = ['BookletNumber', 'BlockCode', 'AccessionNumber', 'rowCount',
'keystrokeEvent', 'keyName', 'durSinceBlockStart', 'IKI',
'reconCursorPosition', 'textLength', "textLenReconText",
'textContext', 'intendedWord', 'currentToken',
# 'interWord', 'wtf', 'isAtWordBoundary', 'isWordInitial',
'intraWord',
'focalWordNum', 'interWordRunNumber', 'interClauseRunNumber', 'isJump', 'isReplace',
'reconstructedText']
# to get rid of duplicated columns, remove the multiple index first
df.loc[rowsToKeep, colsToKeep]\
.to_csv(outputFileName, encoding="utf-8")
return df
# note we are not catching exceptions here, to save time.
# errors are caught at the highest level
def reduceStep(df, feaConfig, verbose=False):
"""
REDUCE step: taking the df after the MAP step, and reduce to features, one block a row.
:param df: the df passed from the mapStep
:param feaConfig: the configuration file with parameters setting the byVars
:param verbose: to be passed to reduce functions to save interim data frame if True
:return: a Pandas data frame, with # of rows as blocks, and features as columns
"""
# asserts
if df is None:
logger.error("ReduceStep: input df is None; quitting")
return None
if not any([(k in df.columns) for k in feaConfig["byVars"]]):
# keyword missing
return None
studentID = df["BookletNumber"].unique()[0]
# #### Reduce ####
# here we begin to summarize the feature columns
# ################
# This is obviously a waste of time to repeat some feature steps in these
# will deal with this later. For now, this is pleasing to the eyes
try:
dfFeaInitialKeypress = df.groupby(feaConfig["byVars"]).apply(
lambda d: reduceFeatureInitialKeypress(d, verbose=verbose)
).reset_index()
#print dfFeaInitialKeypress
except Exception as e:
logger.error("Error in reduceStep: reduceFeatureInitialKeypress")
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
return
try:
dfFeaWordBased = df.groupby(feaConfig["byVars"]).apply(
lambda d: reduceWordBasedFeatures(d, verbose=verbose)
).reset_index()
#print dfFeaWordBased
except Exception as e:
logger.error("Error in reduceStep: reduceWordBasedFeatures")
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
return
try:
dfFeaPauses = df.groupby(feaConfig["byVars"]).apply(
lambda d: reducePauseFeatures(d, verbose=verbose)
).reset_index()
#print dfFeaPauses
except Exception as e:
logger.error("Error in reduceStep: reducePauseFeatures")
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
return
try:
# garyfeng 2018-07-09: changing minRunLength to 1 for deletions to get sum of time before deletions
dfFeaDelete = df.groupby(feaConfig["byVars"]).apply(
lambda d: reduceDeleteFeatures(d, verbose=verbose, minRunLength = 1)
).reset_index()
#print dfFeaDelete
except Exception as e:
logger.error("Error in reduceStep: reduceDeleteFeatures")
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
return
try:
dfFeaEditing = df.groupby(feaConfig["byVars"]).apply(
lambda d: reduceEditingFeatures(d, verbose=verbose)
).reset_index()
#print dfFeaEditing
except Exception as e:
logger.error("Error in reduceStep: reduceEditingFeatures")
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
return
try:
nDiscrepancyMarkers = df.groupby(feaConfig["byVars"]).apply(
lambda d: d\
.loc[d.reconstructedText.notnull()]\
.reconstructedText.iloc[-1].count("`")
).rename("flagDiscrepancyMarkers").reset_index()
except Exception as e:
logger.error("Error in reduceStep: reduceEditingFeatures")
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
return
try:
adminEventList = feaConfig['adminEventList']
nAdminRaiseHandEvents = df.groupby(feaConfig["byVars"]).apply(
lambda d: d\
.loc[(d.Label.isin(adminEventList)) | (d.AccessionNumber == "RaiseHand")] \
.shape[0]
).rename("flagAdminRaiseHandEvents").reset_index()
except Exception as e:
logger.error("Error in reduceStep: reduceEditingFeatures")
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
return
try:
dfSummary = pd.concat([dfFeaInitialKeypress, dfFeaWordBased,
dfFeaPauses, dfFeaDelete, dfFeaEditing,
nDiscrepancyMarkers, nAdminRaiseHandEvents], axis=1)
except Exception as e:
logger.error("Error in reduceStep: merging all features")
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
df.to_csv("Error_{}_ReduceStep.csv".format(studentID), encoding="utf-8")
return
return dfSummary
def processBooklet(filename,
featureConfig,
verbose=False,
outputFeaturePath = ".",
featureSetName = "finalFeatures", ):
"""
Process a single booklet CSV file. Steps involving reading/QCing data, map, reduce, saving.
:param filename: full path to the CSV file
:param featureConfig: the dict with config info
:param verbose: if true, save intermediate data frames to the current directory
:param outputFeaturePath: output path
:param featureSetName: name of the final feature set; will be the last part of the output csv file name
:return: none
"""
# output file path and name
outputFeatureFileName = os.path.join(outputFeaturePath,
os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv")
# debug
logger.info("Processing %s", filename)
#############
# Get Data
try:
df = getData(filename, featureConfig=featureConfig)
except:
df = None
if df is None:
logger.error("processBooklet: getData failed for %s", filename)
return
studentID = df["BookletNumber"].unique()[0]
#############
# Map
#logger.info("Map %s", filename)
try:
df = mapStep(df, verbose=verbose, feaConfig=featureConfig)
except Exception as e:
logger.error("Error in mapStep: %s", filename)
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
return
if df is None:
logger.error("processBooklet: mapStep failed for %s", filename)
return
#############
# Reduce
#logger.info("Reduce %s", filename)
try:
df = reduceStep(df, verbose=verbose, feaConfig=featureConfig)
except Exception as e:
logger.error("Error in reduceStep: %s", filename)
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
return
if df is None:
logger.error("processBooklet: reduceStep failed for %s", filename)
return
#############
# Save Data
# debug
logger.info("Saving %s", filename)
try:
# first drop duplicated rows (occasionally there will be)
# then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force
df \
.loc[:, ~df.columns.duplicated()]\
.drop_duplicates() \
.to_csv(outputFeatureFileName, encoding='utf-8')
except Exception as e:
logger.error("Error writing to_csv: %s", outputFeatureFileName)
logger.exception(e)
exc_buffer = StringIO.StringIO()
traceback.print_exc(file=exc_buffer)
logger.error('Uncaught exception in worker process:\n%s', exc_buffer.getvalue())
return
logger.info("Done. Output= %s", outputFeatureFileName)
return
def processBooklet_dask(filename,
featureConfig,
verbose=False,
outputFeaturePath = ".",
featureSetName = "finalFeatures"):
"""
processing a writing CSV file, for dask parallel processing. We remove any logger reference here.
:param filename: full path to the CSV file
:param featureConfig: the dict with config info
:param verbose: if true, save intermediate data frames to the current directory
:param outputFeaturePath: output path
:param featureSetName: name of the final feature set; will be the last part of the output csv file name
:return: none
"""
# output file path and name
outputFeatureFileName = os.path.join(outputFeaturePath,
os.path.splitext(os.path.split(filename)[1])[0] + "_" + featureSetName +".csv")
#############
# Get Data
try:
df = getData(filename, featureConfig=featureConfig)
except:
return
if df is None:
logger.error("processBooklet: getData failed for %s", filename)
return
#############
# Map
try:
df = mapStep(df, verbose=verbose, feaConfig=featureConfig)
except:
return
if df is None:
#logger.error("processBooklet: mapStep failed for %s", filename)
return
#############
# Reduce
try:
df = reduceStep(df, verbose=verbose, feaConfig=featureConfig)
except:
return
if df is None:
return
#############
# Save Data
try:
# first drop duplicated rows (occasionally there will be)
# then we drop duplicated columns (from the multiIndex) using a trick of transposing back and force
df \
.loc[:, ~df.columns.duplicated()]\
.drop_duplicates() \
.to_csv(outputFeatureFileName, encoding='utf-8')
except Exception as e:
return
return
import sys
if __name__ == '__main__':
if len(sys.argv) == 1:
print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n"
exit()
if sys.argv[1] not in ["Grade4", "Grade8", "test"]:
print "\nUsage: python Writing2017_descReportingFeatures.py Grade4, to process Grade4 or Grade8\n"
exit()
import glob
from pdia import *
from pdia.writing2017.make2017Features import *
import dask.bag as db
from distributed import Client
import datetime
import time
# paths
today=time.strftime("%Y%m%d_%H", time.localtime()) # timestamp as 20170810_22 for date and hour (24h) to run the script
# today = "20180711_04"
grade = sys.argv[1]
inputCSVPath = "{}/".format(grade)
outputFeaturePath = "{}_descFeatures_{}/".format(grade, today)
if not os.path.exists(outputFeaturePath):
os.makedirs(outputFeaturePath)
featureSetName = "descFeature{}".format(today)
print "input folder: {}".format(inputCSVPath)
print "output folder: {}".format(outputFeaturePath)
print "featureSetName: {}".format(featureSetName)
#########
# getting the files to process
print "======= Scanning for CSV files ============"
print datetime.datetime.now()
fileList = glob.glob(os.path.join(inputCSVPath, "*_ObservableData.csv"))
if len(fileList)==0:
print "\nNo CSV files found in the directory\n"
exit()
##########
# garyfeng: to resume by ignoring ones with output already.
finishedFiles = glob.glob(os.path.join(outputFeaturePath, "*_{}.csv".format(featureSetName)))
finishedFiles = [f.replace(outputFeaturePath, inputCSVPath).replace("_"+featureSetName, "") for f in finishedFiles]
fileList = list(set(fileList) - set(finishedFiles))
# error files should not be repeated
finishedFiles = glob.glob("Error_*.csv")
finishedFiles = ["{}/Writing_Grade8_{}_ObservableData.csv".format(grade, f.split("_")[1]) for f in finishedFiles]
fileList = list(set(fileList) - set(finishedFiles))
##########
print "Total input CSV files: %i" % len(fileList)
print datetime.datetime.now()
import gc
def processIt(filename):
processBooklet_dask(filename,
featureConfig=featureConfig2017,
verbose=False,
outputFeaturePath=outputFeaturePath,
featureSetName=featureSetName)
gc.collect()
return
print "======= Begin Processing ============"
print datetime.datetime.now()
print "====================================="
# test with 1 file
# processFile(fileList[0])
# Using distributed clients
client = Client()
# run parallel with dask
db.from_sequence(fileList).map(processIt).compute()
print "======== End Processing ==========="
print datetime.datetime.now()
print "==================================="
# To restart
# until python run_grade8.py Grade8; do
# echo "Program crashed with exit code $?. Respawning.." >&2
# sleep 1
# done
SET NUM=0
:LOOP
SET /A NUM = %NUM% + 1
FOR /f "usebackq" %%G in (`docker ps -q -f "name=pdj-%NUM%"`) DO set PDJID=%%G
IF [%PDJID%] == [] GOTO QUERY
SET PDJID=
GOTO LOOP
:QUERY
set INPUT=
set /P INPUT=Which version of pdia would you like to use? (1=master, 2=2018xval): %=%
IF /I "%INPUT%"=="1" GOTO MASTER
IF /I "%INPUT%"=="2" GOTO 2018XVAL
GOTO QUERY
:MASTER
SET PDJTAG=master
GOTO RUN
:2018XVAL
SET PDJTAG=2018xval
GOTO RUN
:RUN
SET /A PORT = %NUM% + 8887
docker pull pdia/docked-jupyter:%PDJTAG%
docker run -p %PORT%:8888 -h CONTAINER -d -it --rm --name pdj-%NUM%-%PDJTAG% -v "%cd%":/home/jovyan/work pdia/docked-jupyter:%PDJTAG% jupyter notebook --NotebookApp.token='pdia'
timeout /t 5
start http://localhost:%PORT%/?token=pdia
timeout /t -1
docker stop pdj-%NUM%-%PDJTAG%
@garyfeng
Copy link
Author

Steps:

  • Put the above files under D:\2017Writing on Kurt, where we have writing logs in "Grade4" and "Grade8" folders.
  • start the .bat file. This will launch docker, mounting the current dir as "/worker", and start jupyter notebook
  • in jupyter home, start a terminal window
    • do . activate pdia to activate the python 2 env with pdia
  • run the script python xxxx.py Grade4 or python xxx.py Grade8.
    • Important: edit the above script where I hard-coded the feature name. This was needed to relaunch the script when it crashed.
    • important: If the script crashes, use bash code such as "until python xxxx.py Grade 4; do sleep 1;" done; to relaunch it automatically.

This may take 10-20 hours per grade on Kurt.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment