Skip to content

Instantly share code, notes, and snippets.

@mike-lawrence
Created April 11, 2011 19:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mike-lawrence/914070 to your computer and use it in GitHub Desktop.
Save mike-lawrence/914070 to your computer and use it in GitHub Desktop.
minimal (well, somewhat) example to demonstrate strange time-travelling behaviour in the multiprocessing module
library(plyr)
a = read.table('temp.txt')
names(a) = c('trial','event','time')
a$row = as.numeric(row.names(a))
a$event = factor(a$event,levels=c('collect_data','A','B','C','write_data'))
b = ddply(
.data = a
, .variables = .(trial,event)
, .fun = function(x){
x = x[1,]
return(x)
}
)
b$expected_time = c(0:4)
b$time_diff = b$time-b$expected_time
b$row_diff = c(NA,diff(b$row))
print(b)
trial event time row expected_time time_diff row_diff
1 1 collect_data 9.536743e-07 1 0 9.536743e-07 NA
2 1 A 9.003241e-01 174147 1 -9.967589e-02 174146
3 1 B 1.908913e+00 367401 2 -9.108710e-02 193254
4 1 C 2.969911e+00 570822 3 -3.008890e-02 203421
5 1 write_data 3.900322e+00 750213 4 -9.967804e-02 179391
6 2 collect_data 5.960464e-06 750214 0 5.960464e-06 1
7 2 A 2.090931e-04 750215 1 -9.997909e-01 1
8 2 B 2.700012e-01 802051 2 -1.729999e+00 51836
9 2 C 1.265457e+00 991264 3 -1.734543e+00 189213
10 2 write_data 2.279008e+00 1193456 4 -1.720992e+00 202192
11 3 collect_data 6.914139e-06 1193457 0 6.914139e-06 1
12 3 A 4.192169e-01 1275981 1 -5.807831e-01 82524
13 3 B 1.390950e+00 1464989 2 -6.090500e-01 189008
14 3 C 2.405614e+00 1662223 3 -5.943861e-01 197234
15 3 write_data 3.390909e+00 1855735 4 -6.090910e-01 193512
16 4 collect_data 1.215935e-05 1855736 0 1.215935e-05 1
17 4 A 3.650188e-04 1855737 1 -9.996350e-01 1
18 4 B 4.714012e-03 1855971 2 -1.995286e+00 234
19 4 C 9.980690e-01 2047554 3 -2.001931e+00 191583
20 4 write_data 1.998041e+00 2244853 4 -2.001959e+00 197299
21 5 collect_data 5.006790e-06 2244854 0 5.006790e-06 1
22 5 A 5.426459e-01 2347254 1 -4.573541e-01 102400
23 5 B 1.573082e+00 2551555 2 -4.269180e-01 204301
24 5 C 2.544557e+00 2740059 3 -4.554431e-01 188504
25 5 write_data 3.542654e+00 2931585 4 -4.573460e-01 191526
26 6 collect_data 6.914139e-06 2931586 0 6.914139e-06 1
27 6 A 2.160072e-04 2931587 1 -9.997840e-01 1
28 6 B 5.342839e-01 3035573 2 -1.465716e+00 103986
29 6 C 1.534276e+00 3230524 3 -1.465724e+00 194951
30 6 write_data 2.534287e+00 3424108 4 -1.465713e+00 193584
31 7 collect_data 6.914139e-06 3424109 0 6.914139e-06 1
32 7 A 2.414820e-01 3469317 1 -7.585180e-01 45208
33 7 B 1.231899e+00 3661549 2 -7.681010e-01 192232
34 7 C 2.231901e+00 3861064 3 -7.680991e-01 199515
35 7 write_data 3.231890e+00 4058763 4 -7.681100e-01 197699
36 8 collect_data 2.861023e-06 4058764 0 2.861023e-06 1
37 8 A 2.088547e-04 4058765 1 -9.997911e-01 1
38 8 B 4.090550e-01 4137335 2 -1.590945e+00 78570
39 8 C 1.450758e+00 4338125 3 -1.549242e+00 200790
40 8 write_data 2.441662e+00 4532371 4 -1.558338e+00 194246
41 9 collect_data 5.006790e-06 4532372 0 5.006790e-06 1
42 9 A 1.579721e-01 4563257 1 -8.420279e-01 30885
43 9 B 1.157796e+00 4757562 2 -8.422039e-01 194305
44 9 C 2.177286e+00 4957171 3 -8.227139e-01 199609
45 9 write_data 3.200134e+00 5154536 4 -7.998660e-01 197365
46 10 collect_data 5.006790e-06 5154537 0 5.006790e-06 1
47 10 A 2.059937e-04 5154538 1 -9.997940e-01 1
48 10 B 4.303179e-01 5238730 2 -1.569682e+00 84192
49 10 C 1.470674e+00 5442405 3 -1.529326e+00 203675
50 10 write_data 2.459069e+00 5635571 4 -1.540931e+00 193166
# Import libraries
import sys
import time
import multiprocessing
#create a queue that will serve as a messaging system to the worker
queue = multiprocessing.Queue()
#create a message class for talking to the worker
class message(object):
def __init__(self, type, value):
self.type = type
self.value = value
#define what the worker should do
def worker_fun(queue,outfile):
out = open(outfile,'w')
collect_data = False
write_data = False
done = False
start = time.time()
while not done:
if not queue.empty():
from_queue = queue.get()
if from_queue.type == 'done':
done = True
elif from_queue.type == 'collect_data':
collect_data = True
trial_num = from_queue.value
event = 'collect_data'
t = [] #initialize a time collector
event_list = [] #initialize an event collector
start = time.time()
elif from_queue.type == 'write_data':
write_data = True
event = 'write_data'
elif from_queue.type == 'event':
event = from_queue.value
if collect_data:
#usually I'd poll a usb device here,
# but for simplicity this has been removed
t.append(time.time()-start)
event_list.append(event)
if write_data:
#usually there'd be data from the usb device
# to write out here, but for simplicity this
# has been removed
event_list = map(str,event_list)
t = map(str,t)
for i in range(len(t)):
out.write(str(trial_num)+'\t'+event_list[i]+'\t'+t[i]+'\n')
write_data = False
collect_data = False
out.close()
#initialize and start a worker
worker = multiprocessing.Process(target=worker_fun, args=(queue,'temp.txt',))
worker.start()
#define a function that is like time.sleep(), but more active
# and referenced to a specifiable start time
def wait(duration,start):
done = False
while not done:
now = time.time()
if( now>=(start+duration) ):
done = True
#usually there are other things going on here (watching for
# keypresses, etc), but for simplicity these have been removed
#run 10 "trials", where each trials has three events that occur,
# spaced by 1 second
repetitions = 10
trial_num = 0
for rep in range(repetitions):
trial_num = trial_num + 1
start = time.time()
queue.put(message('collect_data',trial_num))
wait(1,start)
queue.put(message('event','A'))
wait(1,start+1)
queue.put(message('event','B'))
wait(1,start+2)
queue.put(message('event','C'))
wait(1,start+3)
queue.put(message('write_data',0))
wait(1,start+4)
#clean up and quit
queue.put(message('done',0))
worker.join()
queue.close()
queue.join_thread()
sys.exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment