代码片段1:
import time
def timer(func):
def wrapper(*args, **kwargs):
timeout = kwargs.get('timer_timeout', 0)
start = time.time()
ret = func(*args, **kwargs)
end = time.time()
duration = end - start
if duration > timeout > 0:
tips = '(mark as timeout)'
else:
tips = ''
print 'function %s totally use: %.3f %s' % (func.__name__, duration, tips)
return ret
return wrapper
@timer
def add(a, b):
print "%d + %d = %d" % (a, b, a + b)
add(3, 4)
add(3, 4, timer_timeout=0.1)
代码片段2:
from threading import Thread
from multiprocessing import Process
from dbmodels import PrsPolicyConfigRiskSyslog
class BaseConsumer(Thread):
def start(self):
self._start()
def _start(self):
return NotImplemented
class ApiConsumer(BaseConsumer):
def _start(self):
# do some business here
pass
class SyslogConsumer(BaseConsumer):
def _start(self):
# do some business here
pass
def start_sub_process(send_type):
send_map = {
"api": ApiConsumer,
"syslog": SyslogConsumer
}
for data in PrsPolicyConfigRiskSyslog.select().where(PrsPolicyConfigRiskSyslog.risk_type == send_type):
t = Thread(target=send_map[send_type]().start, args=(data.to_dict(),))
t.daemon = True
t.start()
def start_api_consumer():
p = Process(target=start_sub_process("api"), args=())
p.start()
p.join()
def start_syslog_consumer():
p = Process(target=start_sub_process("syslog"), args=())
p.start()
p.join()
class SyslogWorker(object):
def process(self):
start_api_consumer()
start_syslog_consumer()