Skip to content

Instantly share code, notes, and snippets.

@anthonyrisinger
Created October 22, 2012 21:29
Show Gist options
  • Save anthonyrisinger/3934532 to your computer and use it in GitHub Desktop.
Save anthonyrisinger/3934532 to your computer and use it in GitHub Desktop.
SIGSEGV when running tracebacker on loaded workers
#!/usr/bin/env python
import os
import sys
import time
import logging
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
import socket
from os import path, environ
from threading import Thread
from pprint import pformat
from functools import partial
from itertools import count
from kombu import (Connection, Exchange,
Queue, Producer, Consumer)
from kombu.common import uuid
from kombu.entity import DELIVERY_MODES
from kombu.mixins import ConsumerMixin
class Blizzard(ConsumerMixin):
connect_max_retries = 10
should_stop = False
consumer = None
producer = None
uri = 'memory://'
fqn = 'sigsegv'
adl = True
dur = False
pfx = ''
exchange = Exchange(fqn, 'topic', durable=True)
queues = [
Queue('.'.join(filter(None, (fqn, pfx, q))), exchange=exchange,
auto_delete=adl, durable=dur,
routing_key='.'.join(filter(None, (pfx, q, '#'))))
for q in ('loopback',)
]
def __init__(self, connection=Connection(uri)):
self.connection = connection
self.producer = Producer(channel=connection.channel(),
exchange=self.exchange,
auto_declare=True,
serializer='json')
def get_consumers(self, Consumer, channel):
self.consumer = Consumer(self.queues, callbacks=[self.on_message])
self.consumer.qos(prefetch_count=0)
return [self.consumer]
def on_message(self, body, message):
key = message.delivery_info['routing_key'][len(self.pfx):].strip('.')
logger.debug('MESSAGE(%s):\n%s', key, pformat(message.properties))
message.ack()
def push(self, body=None, **kwds):
if body is None:
body = dict()
headers = kwds.setdefault('headers', {})
if 'routing_key' not in kwds:
kwds['routing_key'] = self.queues[0].routing_key.rstrip('#.') + '.me'
if 'delivery_mode' not in kwds:
kwds['delivery_mode'] = DELIVERY_MODES['persistent']
if 'message_id' not in kwds:
kwds['message_id'] = uuid()
if 'correlation_id' not in kwds:
kwds['correlation_id'] = uuid()
logger.debug('SENDING(%s):\n%s', kwds['routing_key'], pformat(kwds))
if self.pfx:
kwds['routing_key'] = '.'.join((self.pfx, kwds['routing_key']))
if 'body' not in kwds:
kwds['body'] = body
publish = self.connection.ensure(
self.producer, self.producer.publish,
max_retries=3, interval_start=3,
)
return publish(**kwds)
def bind(self):
logger.info('Starting vassal (pid %d)' % os.getpid())
while True:
try:
self.run()
except Exception:
pass
def push():
time.sleep(3)
t = Blizzard()
while True:
time.sleep(0.05)
t.push({'abc':123})
def spawn():
t = Thread(target=push)
t.daemon = True
t.start()
def main(core_id):
spawn()
Blizzard().bind()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment