Skip to content

Instantly share code, notes, and snippets.

@plumps
Last active October 12, 2016 20:21
Show Gist options
  • Save plumps/0d617329506dcaed08d1afc1ff215962 to your computer and use it in GitHub Desktop.
Save plumps/0d617329506dcaed08d1afc1ff215962 to your computer and use it in GitHub Desktop.
converted to python3
import os
import mmap
import ctypes
import struct
class SHMWriter(object):
def __init__(self, filename='/tmp/mmaptest'):
self.filename = filename
# Create new empty file to back memory map on disk
self.fd = os.open('/tmp/mmaptest', os.O_CREAT | os.O_TRUNC | os.O_RDWR)
# Zero out the file to insure it's the right size
assert os.write(self.fd, b'\x00' * mmap.PAGESIZE) == mmap.PAGESIZE
# Create the mmap instace with the following params:
# fd: File descriptor which backs the mapping or -1 for anonymous mapping
# length: Must in multiples of PAGESIZE (usually 4 KB)
# flags: MAP_SHARED means other processes can share this mmap
# prot: PROT_WRITE means this process can write to this mmap
if os.name == 'nt':
self.buf = mmap.mmap(0, 65536, 'GlobalSharedMemory)
else:
self.buf = mmap.mmap(self.fd, mmap.PAGESIZE,
mmap.MAP_SHARED, mmap.PROT_WRITE)
self.i = ctypes.c_int.from_buffer(self.buf)
self.i.value = 0
assert self.i.value == 0
# Before we create a new value, we need to find the offset of the next free
# memory address within the mmap
self.offset = struct.calcsize(self.i._type_)
# The offset should be uninitialized ('\x00')
#assert self.buf[self.offset] == '\x00'
# Now ceate a string containing 'foo' by first creating a c_char array
r = b''
self.s_type = ctypes.c_char * len(r)
self.i.value = len(r)
# Now create the ctypes instance
self.s = self.s_type.from_buffer(self.buf, self.offset)
# And finally set it
self.s.raw = r
def write(self, r):
r = str(r).encode('utf-8')
self.s_type = ctypes.c_char * len(r)
self.s = self.s_type.from_buffer(self.buf, self.offset)
self.s.raw = r
self.i.value = len(r)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment