Last active
August 29, 2015 14:24
-
-
Save toffer/9321837e3a8c27bd40f5 to your computer and use it in GitHub Desktop.
Modified server from Thrift tutorial, to demonstrate how to record thrift transactions as background tasks. PythonClient.py is unchanged.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# Licensed to the Apache Software Foundation (ASF) under one | |
# or more contributor license agreements. See the NOTICE file | |
# distributed with this work for additional information | |
# regarding copyright ownership. The ASF licenses this file | |
# to you under the Apache License, Version 2.0 (the | |
# "License"); you may not use this file except in compliance | |
# with the License. You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, | |
# software distributed under the License is distributed on an | |
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
# KIND, either express or implied. See the License for the | |
# specific language governing permissions and limitations | |
# under the License. | |
# | |
# | |
# NEW RELIC INITIALIZATION | |
# | |
# The following 3 lines must go at the top of whichever file you are running | |
# server.serve() from. They are responsible for initializing the agent and | |
# for registering it with New Relic. If you set log_level = debug in your | |
# newrelic.ini configuration file, you should see output that shows the agent | |
# registering with New Relic, even before you run any thrift transactions. | |
# Note that register_application() returns an application instance, which | |
# you can pass into BackgroundTask. | |
import newrelic.agent | |
newrelic.agent.initialize('newrelic.ini') | |
application_instance = newrelic.agent.register_application(timeout=5.0) | |
# END NEW RELIC INITIALIZATION | |
import sys, glob | |
sys.path.append('gen-py') | |
# sys.path.insert(0, glob.glob('../../lib/py/build/lib.*')[0]) | |
import Queue | |
import threading | |
from tutorial import Calculator | |
from tutorial.ttypes import * | |
from shared.ttypes import SharedStruct | |
from thrift.transport import TSocket | |
from thrift.transport import TTransport | |
from thrift.protocol import TBinaryProtocol | |
from thrift.server import TServer | |
import logging | |
console = logging.StreamHandler() | |
logger = logging.getLogger('Thrift') | |
logger.addHandler(console) | |
# InstrumentedTThreadPoolServer is a copy of the TThreadPoolServer, with | |
# the addition of the BackgroundTask context manager that wraps the Thrift | |
# transaction. | |
class InstrumentedTThreadPoolServer(TServer.TServer): | |
"""Server with a fixed size pool of threads which service requests.""" | |
def __init__(self, *args, **kwargs): | |
TServer.TServer.__init__(self, *args) | |
self.clients = Queue.Queue() | |
self.threads = 10 | |
self.daemon = kwargs.get("daemon", False) | |
def setNumThreads(self, num): | |
"""Set the number of worker threads that should be created""" | |
self.threads = num | |
def serveThread(self): | |
"""Loop around getting clients from the shared queue and process them.""" | |
while True: | |
try: | |
client = self.clients.get() | |
self.serveClient(client) | |
except Exception as x: | |
logger.exception(x) | |
def serveClient(self, client): | |
"""Process input/output from a client for as long as possible""" | |
itrans = self.inputTransportFactory.getTransport(client) | |
otrans = self.outputTransportFactory.getTransport(client) | |
iprot = self.inputProtocolFactory.getProtocol(itrans) | |
oprot = self.outputProtocolFactory.getProtocol(otrans) | |
# Use the BackgroundTask context manager to wrap the Thrift transaction | |
with newrelic.agent.BackgroundTask(application_instance, client.host): | |
try: | |
while True: | |
self.processor.process(iprot, oprot) | |
except TTransport.TTransportException as tx: | |
pass | |
except Exception as x: | |
logger.exception(x) | |
itrans.close() | |
otrans.close() | |
def serve(self): | |
"""Start a fixed number of worker threads and put client into a queue""" | |
for i in range(self.threads): | |
try: | |
t = threading.Thread(target=self.serveThread) | |
t.setDaemon(self.daemon) | |
t.start() | |
except Exception as x: | |
logger.exception(x) | |
# Pump the socket for clients | |
self.serverTransport.listen() | |
while True: | |
try: | |
client = self.serverTransport.accept() | |
if not client: | |
continue | |
self.clients.put(client) | |
except Exception as x: | |
logger.exception(x) | |
class CalculatorHandler: | |
def __init__(self): | |
self.log = {} | |
def ping(self): | |
print 'ping()' | |
def add(self, n1, n2): | |
print 'add(%d,%d)' % (n1, n2) | |
return n1+n2 | |
def calculate(self, logid, work): | |
print 'calculate(%d, %r)' % (logid, work) | |
if work.op == Operation.ADD: | |
val = work.num1 + work.num2 | |
elif work.op == Operation.SUBTRACT: | |
val = work.num1 - work.num2 | |
elif work.op == Operation.MULTIPLY: | |
val = work.num1 * work.num2 | |
elif work.op == Operation.DIVIDE: | |
if work.num2 == 0: | |
x = InvalidOperation() | |
x.whatOp = work.op | |
x.why = 'Cannot divide by 0' | |
raise x | |
val = work.num1 / work.num2 | |
else: | |
x = InvalidOperation() | |
x.whatOp = work.op | |
x.why = 'Invalid operation' | |
raise x | |
log = SharedStruct() | |
log.key = logid | |
log.value = '%d' % (val) | |
self.log[logid] = log | |
return val | |
def getStruct(self, key): | |
print 'getStruct(%d)' % (key) | |
return self.log[key] | |
def zip(self): | |
print 'zip()' | |
handler = CalculatorHandler() | |
processor = Calculator.Processor(handler) | |
transport = TSocket.TServerSocket(port=9090) | |
tfactory = TTransport.TBufferedTransportFactory() | |
pfactory = TBinaryProtocol.TBinaryProtocolFactory() | |
server = InstrumentedTThreadPoolServer(processor, transport, tfactory, pfactory) | |
print 'Starting the server...' | |
server.serve() | |
print 'done.' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment