Skip to content

Instantly share code, notes, and snippets.

View dsociative's full-sized avatar
🐒

artem.prikhodin dsociative

🐒
View GitHub Profile
import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.PULL)
sid = 1
socket.connect("ipc://jewels_%s" % sid)
while 1:
# coding: utf8
import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("ipc://jewels_1")
socket.bind("ipc://jewels_2")
socket.bind("ipc://jewels_3")
socket.bind("ipc://jewels_4")
# -*- coding: utf-8 -*-
import time
import threading
from itertools import repeat
class KeepAliveThread(threading.Thread):
nodes = []
%% Copyright
-module(serv).
-author("dsociative").
%% API
-export([listen/1, listen/0, talk/1]).
listen() ->
listen(9998).
listen(Port) ->
import logging
import os, sys
def my_excepthook(excType, excValue, traceback, logger=logger):
logger.error("Logging an uncaught exception",
exc_info=(excType, excValue, traceback))
sys.excepthook = my_excepthook
# coding: utf8
from tests.controller.building.BuildingCommon import BuildingCommon
from tycoon.controller.CommandBase import CommandBase
class BuildingBuild(CommandBase, BuildingCommon):
params = 'location_id', 'building_id', 'x', 'y', 'r'
requires = 'user', 'location', 'balance_building',
name = 'building.build'
@dsociative
dsociative / gist:3155154
Created July 21, 2012 08:55
test runner
#!/usr/bin/python
# -*- coding: utf-8 -*-
import unittest
if __name__ == '__main__':
MASK = 'Test'
runner = unittest.TextTestRunner()
def run_callback(self, obj, args=None):
try:
assert hasattr(self, 'on%s' % obj)
f = getattr(self, 'on%s' % obj)
if args:
return f(args)
else:
return f()
except Exception as e: