Created
October 22, 2012 21:29
-
-
Save anthonyrisinger/3934532 to your computer and use it in GitHub Desktop.
SIGSEGV when running tracebacker on loaded workers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import sys | |
import time | |
import logging | |
logging.basicConfig() | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.DEBUG) | |
import socket | |
from os import path, environ | |
from threading import Thread | |
from pprint import pformat | |
from functools import partial | |
from itertools import count | |
from kombu import (Connection, Exchange, | |
Queue, Producer, Consumer) | |
from kombu.common import uuid | |
from kombu.entity import DELIVERY_MODES | |
from kombu.mixins import ConsumerMixin | |
class Blizzard(ConsumerMixin): | |
connect_max_retries = 10 | |
should_stop = False | |
consumer = None | |
producer = None | |
uri = 'memory://' | |
fqn = 'sigsegv' | |
adl = True | |
dur = False | |
pfx = '' | |
exchange = Exchange(fqn, 'topic', durable=True) | |
queues = [ | |
Queue('.'.join(filter(None, (fqn, pfx, q))), exchange=exchange, | |
auto_delete=adl, durable=dur, | |
routing_key='.'.join(filter(None, (pfx, q, '#')))) | |
for q in ('loopback',) | |
] | |
def __init__(self, connection=Connection(uri)): | |
self.connection = connection | |
self.producer = Producer(channel=connection.channel(), | |
exchange=self.exchange, | |
auto_declare=True, | |
serializer='json') | |
def get_consumers(self, Consumer, channel): | |
self.consumer = Consumer(self.queues, callbacks=[self.on_message]) | |
self.consumer.qos(prefetch_count=0) | |
return [self.consumer] | |
def on_message(self, body, message): | |
key = message.delivery_info['routing_key'][len(self.pfx):].strip('.') | |
logger.debug('MESSAGE(%s):\n%s', key, pformat(message.properties)) | |
message.ack() | |
def push(self, body=None, **kwds): | |
if body is None: | |
body = dict() | |
headers = kwds.setdefault('headers', {}) | |
if 'routing_key' not in kwds: | |
kwds['routing_key'] = self.queues[0].routing_key.rstrip('#.') + '.me' | |
if 'delivery_mode' not in kwds: | |
kwds['delivery_mode'] = DELIVERY_MODES['persistent'] | |
if 'message_id' not in kwds: | |
kwds['message_id'] = uuid() | |
if 'correlation_id' not in kwds: | |
kwds['correlation_id'] = uuid() | |
logger.debug('SENDING(%s):\n%s', kwds['routing_key'], pformat(kwds)) | |
if self.pfx: | |
kwds['routing_key'] = '.'.join((self.pfx, kwds['routing_key'])) | |
if 'body' not in kwds: | |
kwds['body'] = body | |
publish = self.connection.ensure( | |
self.producer, self.producer.publish, | |
max_retries=3, interval_start=3, | |
) | |
return publish(**kwds) | |
def bind(self): | |
logger.info('Starting vassal (pid %d)' % os.getpid()) | |
while True: | |
try: | |
self.run() | |
except Exception: | |
pass | |
def push(): | |
time.sleep(3) | |
t = Blizzard() | |
while True: | |
time.sleep(0.05) | |
t.push({'abc':123}) | |
def spawn(): | |
t = Thread(target=push) | |
t.daemon = True | |
t.start() | |
def main(core_id): | |
spawn() | |
Blizzard().bind() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment