Instantly share code, notes, and snippets.

# docPhil99/numpyPipe.md

Last active March 16, 2018 16:59
Show Gist options
• Save docPhil99/c5d41af8ab017e77476bf9199ce849a6 to your computer and use it in GitHub Desktop.
Piping numpy arrays to other processes

To pipe data from one process to another as a stream in python we need to pickle the object and pass it to the pipe stream. In this example I've used Numpy arrays but this could be applied to any object that can be pickled in Python. This took far too long to get working and I could find little information online on how to put it all together so here it is. This code is Python 3 only, I've only run this on a Mac.

I've used binary as the stream rather than text purley becuase of effiencies. Numpy arrays can get huge! This means `readline()` is not going to work. Instead, I send a single control byte , 1, for data and 0 for stop. This could be extended to include other control operations. I then send the length of the data as a 8 byte int, followed by the data itself.

### simpleSend.py

```import numpy as np
import pickle
import sys
import io
import time

#define some control bytes
control_data=bytes([1])
control_stop=bytes([0])

def send_data(arr):
dataStr=pickle.dumps(arr)  #pickle the data array into a byte array
dlen=len(dataStr).to_bytes(8, byteorder='big') #find the length of the array and
print(control_data.decode('latin-1'),end='',flush=True)  #convert this to a byte array
print(dlen.decode('latin-1'), end='', flush=True)   #encode the data and write it
print(dataStr.decode('latin-1'), end='', flush=True)  # end='' will remove that extra \r\n

def send_stop():
print(control_stop.decode('latin-1'), end='', flush=True)

#set the stdout such that it prints in latin-1,   sys.stdout.detach() is a binary stream
sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding='latin-1')

for p in range(10):
arr=np.ones((5000,500))*p  #generate some data
send_data(arr)
#the sleep is purely for testing and can be removed, ie does the reader fall over after a long delay
time.sleep(.1)
send_stop()        ```

```import numpy as np
import sys
import pickle

#define some control bytes
control_data=bytes([1])
control_stop=bytes([0])

while True:
if data==control_data:
dlen=int.from_bytes(data, byteorder='big')
print('data lenght %d'%dlen)
print(npd.shape)
print(npd.max())
elif data==control_stop:
print('stopped')
break
else:
print('Oh no')```

to run this `python simpleSend.py | python simpleReceiver.py`

If we want to use Python's subprocess module to start `simpleReceiver.py` we basically need to `write` to the STDIN instead of `print`

```import numpy as np
import pickle
import sys
import subprocess as sp

#define some control bytes
control_data=bytes([1])
control_stop=bytes([0])

def send_data(arr,buff):
dataStr=pickle.dumps(arr)  #pickle the data array into a byte array
dlen=len(dataStr).to_bytes(8, byteorder='big') #find the length of the array and
mp.stdin.write(control_data)
mp.stdin.write(dlen)
mp.stdin.write(dataStr)
mp.stdin.flush() #not sure this needed

def send_stop(mp):
mp.stdin.write(control_stop)
mp.stdin.flush()

try:
mp = sp.Popen("python3 simpleReceiver.py",  shell = True,stdin=sp.PIPE)
except sp.CalledProcessError as err:
print('ERROR:', err)
sys.exit(-1)

for p in range(10):
arr=np.ones((5000,5000))*p  #generate some data
send_data(arr,mp)
send_stop(mp)        ```

With such a large array 5000x5000 this takes sometime. Running it through the python profiler indicates about 75% of the time is taken by `pickle.dumps` and most of the rest of the remaining 25% is taken by the write operation. Numpy's own method gives a speed increase. Replacing `dataStr=pickle.dumps(arr)` with `dataStr=arr.tobytes()` and `npd=pickle.loads(data)` with `npd=np.frombuffer(data)` more than halves the time taken but lose the shape and dtype information. This would have to be sent along with the data.