Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
NB. parpool.ijs
NB. Attempt to make a J parpool
NB. ----------------------------
NB. Todo's:
NB. - Spin up tasks
NB. - Use net/clientserver's jsss/jssc for workers resp master
NB. - Figure out how to automatically find data from sentence and send to each worker if needed
NB. - keep old data? or discard after return?
NB. - exploit rank (IRS) write parfor / blockprocessing
NB. ...
NB. - possibility: share launched pool with other clients/parpools: J cluster.
NB. - Allow for spinning up instances on remote machines ...
NB.
NB. Practical todo's:
NB. - Take out logfile from jsss. it makes no sense here
NB. - take arguments from create instead of hardcoded ones
coclass 'Parpool'
create =: 3 : 0
NB. Settings (will become arguments to a constructor)
nWorkers =: 3 NB. Number of J servers to spin up to do the work
baseport =: 1209 NB. Port number to start from (each worker increases by one)
NB. path =: 'C:\Users\Jan-Pieter\Dropbox\scripts\J\J-Envi\' NB. Path to the worker base script.
path =: './' NB. Path to the worker base script.
NB. Generate random alpha-numeric passwords for each worker
pwd =: (u:(48+i.10),,(65 97)+"0 1 i.26) {~ ? (nWorkers,64)$62
NB. Verb to call for forking a worker, appends the correct password and port for workerID.
bootstrap =: [: jpath '~bin/jconsole ',path,'workerBase.ijs ',('"'([,,~) pwd {~ ]), ' ',[: ": baseport + ]
NB. Start pool
smoutput 'Forking workers'
fork_jtask_@bootstrap"0] i. nWorkers
6!:3 (1)NB. wait a second
smoutput 'Connecting workers'
NB. Connect to each worker.
load 'net/clientserver/jssc'
conns =: (<"0 baseport+i.nWorkers) conew each < 'jssc'
NB. Get su rights in each worker
6!:3 (1)NB. wait a second
NB. Get administrativer rights in all workers
(su ineach) <"1 pwd
)
ineach =: 1 : 0
result =. (#conns)$a:
NB. try.
for_k. > conns do.
result =. (<u__k k{:: y) k} result
end.
NB. catch.
NB. cocurrent ret
NB. NB. in the future: propagate error
NB. end.
result
:
result =. (#conns)$a:
NB. try.
for_k. i. #conns do.
result=. (<x u__k&(k&{::) y) k} result
end.
NB. catch.
NB. cocurrent ret
NB. NB. in the future: propagate error
NB. end.
result
)
NB. test the ineach facility
NB. (ping ineach conns) 3$<'hello'
NB. NB. x (v queue conns) y
NB. NB. queue processing of left and right blocks on number of available workers
NB. queueOn =. 2 : 0
NB. assert. x -:&$ y
NB. NB. find entire runs and how long the rest is going to be.
NB. nx =. $@,@x NB. number of cells in x
NB. nw =. #n NB. number of workers
NB. (nw$<'cmd_current') (su_assignAtomic ineach conns) nw$<(5!:1<'u')
NB. 'entire part' =. nx (<.@% ; |~) nw
NB. part =. (*part)$part NB. if part = 0, make it an empty list instead
NB. result =. nx$<''
NB. for_k. i. entire do.
NB. ix =. (k*nw)+i.nw NB. indices of data and result
NB. result =. (x (current ineach n)&(ix&{) y) ix} result
NB. end.
NB. NB. add result for last part
NB. ($x) $ x (current ineach ((->:i.part)&{n))&((->:i.part)&{) y (->:i.part)} result
NB. )
NB. x (v DR rank) y
NB. Distributed rank: execute verb at rank, and do v on all cells parallel
NB.DR =. 2 : 0
NB.NB. upload verb to all workers
NB.nw =. #n NB. number of workers
NB.(nw$<'cmd_current') (assignAtomic ineach conns) nw$<(5!:1<'u')
NB.NB. from Rank and Uniformity (Roger K. W. Hui)
NB. rk =. #@$ NB. Rank of y
NB. er =. (0:>.(+rk))`(<.rk) @. (0:<:[) NB. x er y: rank x (also neg) of y
NB. NB. converted to positive
NB. fr =. -@er }. $@] NB. Frame dimensions
NB. cs =. -@er {. $@] NB. cell dimensions
NB. boxr =. ]`(<@$ , [ $: */@[}.])@.(*@#@]) NB. box cells
NB. cells =. fr $ cs boxr ,@] NB. Boxed cells in frame
NB.NB. Prefix aggreement between ranks
NB. pfx =. <.&rk NB. length of common prefix of x and y
NB. agree =. (pfx {. $@[) -: (pfx {. $@]) NB. Do they agree?
NB. frame =. (3+&i. 1:)`($@([^:(>&rk))) @. agree NB. generate lenght error if not agree
NB. NB. This is more consistent with general J errors
NB. rag =. (rk@]}.$@[) $"1 0 ] NB. return right matched cells
NB.NB. Maximum rank
NB.xcells =. (n cells x)
NB.ycells =. (n cells y)
NB.xcellMatch =. ycells rag xcells
NB.ycellMatch =. xcells rag ycells
NB.ressize =. $ xcellMatch
NB.result =. > ressize $ x (current queue)&, y
NB.)
NB. (su_interrupt ineach conns) 3$<''
NB. Clean up workers, closing the pool
destroy =: 3 : 0
(su_shutdown ineach conns) nWorkers$<''
codestroy ''
)
cocurrent'base'
NB. ==============================
NB. ==============================
NB. workerbase.ijs
require '~addons/net/clientserver/jsss.ijs'
cocurrent 'jsss'
NB. Define any cmd_ methods
cmd_ping =: 'pong'"_@smoutput
cmd_current=: ]
cmd_assignAtomic =: 4 : ('[:';':';'(x) =: y 5!:0')
password =: 2{:: ARGV_j_
cocurrent 'base'
init ". 3{:: ARGV_j_
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.