Skip to content

Instantly share code, notes, and snippets.

@toffer
Last active August 29, 2015 14:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save toffer/9321837e3a8c27bd40f5 to your computer and use it in GitHub Desktop.
Save toffer/9321837e3a8c27bd40f5 to your computer and use it in GitHub Desktop.
Modified server from Thrift tutorial, to demonstrate how to record thrift transactions as background tasks. PythonClient.py is unchanged.
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#
# NEW RELIC INITIALIZATION
#
# The following 3 lines must go at the top of whichever file you are running
# server.serve() from. They are responsible for initializing the agent and
# for registering it with New Relic. If you set log_level = debug in your
# newrelic.ini configuration file, you should see output that shows the agent
# registering with New Relic, even before you run any thrift transactions.
# Note that register_application() returns an application instance, which
# you can pass into BackgroundTask.
import newrelic.agent
newrelic.agent.initialize('newrelic.ini')
application_instance = newrelic.agent.register_application(timeout=5.0)
# END NEW RELIC INITIALIZATION
import sys, glob
sys.path.append('gen-py')
# sys.path.insert(0, glob.glob('../../lib/py/build/lib.*')[0])
import Queue
import threading
from tutorial import Calculator
from tutorial.ttypes import *
from shared.ttypes import SharedStruct
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer
import logging
console = logging.StreamHandler()
logger = logging.getLogger('Thrift')
logger.addHandler(console)
# InstrumentedTThreadPoolServer is a copy of the TThreadPoolServer, with
# the addition of the BackgroundTask context manager that wraps the Thrift
# transaction.
class InstrumentedTThreadPoolServer(TServer.TServer):
"""Server with a fixed size pool of threads which service requests."""
def __init__(self, *args, **kwargs):
TServer.TServer.__init__(self, *args)
self.clients = Queue.Queue()
self.threads = 10
self.daemon = kwargs.get("daemon", False)
def setNumThreads(self, num):
"""Set the number of worker threads that should be created"""
self.threads = num
def serveThread(self):
"""Loop around getting clients from the shared queue and process them."""
while True:
try:
client = self.clients.get()
self.serveClient(client)
except Exception as x:
logger.exception(x)
def serveClient(self, client):
"""Process input/output from a client for as long as possible"""
itrans = self.inputTransportFactory.getTransport(client)
otrans = self.outputTransportFactory.getTransport(client)
iprot = self.inputProtocolFactory.getProtocol(itrans)
oprot = self.outputProtocolFactory.getProtocol(otrans)
# Use the BackgroundTask context manager to wrap the Thrift transaction
with newrelic.agent.BackgroundTask(application_instance, client.host):
try:
while True:
self.processor.process(iprot, oprot)
except TTransport.TTransportException as tx:
pass
except Exception as x:
logger.exception(x)
itrans.close()
otrans.close()
def serve(self):
"""Start a fixed number of worker threads and put client into a queue"""
for i in range(self.threads):
try:
t = threading.Thread(target=self.serveThread)
t.setDaemon(self.daemon)
t.start()
except Exception as x:
logger.exception(x)
# Pump the socket for clients
self.serverTransport.listen()
while True:
try:
client = self.serverTransport.accept()
if not client:
continue
self.clients.put(client)
except Exception as x:
logger.exception(x)
class CalculatorHandler:
def __init__(self):
self.log = {}
def ping(self):
print 'ping()'
def add(self, n1, n2):
print 'add(%d,%d)' % (n1, n2)
return n1+n2
def calculate(self, logid, work):
print 'calculate(%d, %r)' % (logid, work)
if work.op == Operation.ADD:
val = work.num1 + work.num2
elif work.op == Operation.SUBTRACT:
val = work.num1 - work.num2
elif work.op == Operation.MULTIPLY:
val = work.num1 * work.num2
elif work.op == Operation.DIVIDE:
if work.num2 == 0:
x = InvalidOperation()
x.whatOp = work.op
x.why = 'Cannot divide by 0'
raise x
val = work.num1 / work.num2
else:
x = InvalidOperation()
x.whatOp = work.op
x.why = 'Invalid operation'
raise x
log = SharedStruct()
log.key = logid
log.value = '%d' % (val)
self.log[logid] = log
return val
def getStruct(self, key):
print 'getStruct(%d)' % (key)
return self.log[key]
def zip(self):
print 'zip()'
handler = CalculatorHandler()
processor = Calculator.Processor(handler)
transport = TSocket.TServerSocket(port=9090)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
server = InstrumentedTThreadPoolServer(processor, transport, tfactory, pfactory)
print 'Starting the server...'
server.serve()
print 'done.'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment