Skip to content

Instantly share code, notes, and snippets.

@pipoket
Forked from dustin/flume.py
Last active August 29, 2015 14:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pipoket/193ee5bc8a11a9eeadc7 to your computer and use it in GitHub Desktop.
Save pipoket/193ee5bc8a11a9eeadc7 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
#
# tap -> flume
#
# requires: python thrift bindings + compiled flume thrift binding.
#
import sys
import time
import struct
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
sys.path.extend(['ep_man', 'thriftflume'])
import tap
import memcacheConstants
from flume import ThriftFlumeEventServer
class FlumeDest(object):
def __init__(self, host, port):
self.transport = TSocket.TSocket(host, port)
self.transport = TTransport.TBufferedTransport(self.transport)
self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
self.client = ThriftFlumeEventServer.Client(self.protocol)
self.transport.open()
self.n = 0
def __call__(self, identifier, cmd, extra, key, val, cas):
if cmd == memcacheConstants.CMD_TAP_MUTATION:
assert key is not None
assert val is not None
el, flags, ttl, iflags, exp = struct.unpack(memcacheConstants.TAP_MUTATION_PKT_FMT,
extra)
pri = ThriftFlumeEventServer.Priority.INFO
evt = ThriftFlumeEventServer.ThriftFlumeEvent(timestamp=int(time.time()),
priority=pri,
body=val,
nanos=0,
host='localhost',
fields={'key': key,
'flags': str(flags),
'iflags': str(iflags),
'exp': str(exp)})
self.client.append(evt)
self.n += 1
if (self.n % 1000) == 0:
print self.n
if __name__ == '__main__':
dest = FlumeDest('localhost', 1234)
tap.mainLoop(sys.argv[1:], dest)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment