simple python daemon worker
# -*- coding: utf-8 -*- | |
""" | |
sync device status data from hive | |
""" | |
import argparse | |
import json | |
import logging.handlers | |
import operator | |
import signal | |
import threading | |
import time | |
from functools import partial | |
from functools import wraps | |
import os | |
import pymysql | |
import requests | |
from concurrent.futures import ThreadPoolExecutor, ALL_COMPLETED, wait | |
from redis import StrictRedis | |
from config_manager import ConfigManager | |
from daemons.ifflow.cardflow import get_device_cardflow | |
from daemons.ifflow.presto import Presto | |
logger = logging.getLogger("root") | |
logger.setLevel(logging.DEBUG) | |
formatter = logging.Formatter( | |
"[%(asctime)s %(filename)s:%(lineno)s - %(funcName)15s()] %(message)s" | |
) | |
handler = logging.handlers.RotatingFileHandler("/tmp/status_sync.log") | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
DEVICE_STATUS_INFO_CACHE_PRIFIX = "nsfocus:cache:zelda:device:status:" | |
config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), os.path.pardir, "config.yaml") | |
cm = ConfigManager(config_file=config_file) | |
class Timeit(object): | |
def __init__(self, tag): | |
self.tag = tag | |
def __enter__(self): | |
self.start = time.time() | |
def __exit__(self, *unused): | |
self.cost = time.time() - self.start | |
logger.info("%s took about: %ss" % (self.tag, self.cost)) | |
def tag(name): | |
def wrapper(func): | |
func.__tag__ = name | |
return func | |
return wrapper | |
def exec_profile(func): | |
@wraps(func) | |
def new(*args, **kwds): | |
with Timeit(func.__tag__): | |
return func(*args, **kwds) | |
return new | |
def exception_handler(expect_value=None): | |
def wrapper(func): | |
@wraps(func) | |
def handler(*args, **kwargs): | |
try: | |
return func(*args, **kwargs) | |
except Exception as e: | |
logger.error("{0} raise exception {1}".format(func.__tag__, str(e))) | |
return expect_value | |
return handler | |
return wrapper | |
def parse_args(): | |
parser = argparse.ArgumentParser(description="Sync Device Network Information To Redis") | |
parser.add_argument( | |
"-p", "--period", | |
type=int, | |
dest="PERIOD", | |
default=2 * 60, | |
help="Sync Time Period" | |
) | |
return parser.parse_args() | |
@exec_profile | |
@exception_handler(expect_value=[]) | |
@tag("get_device_status_li") | |
def get_device_status_detail(dev_hashes): | |
""" | |
Get all device status | |
:param hashs: | |
:return: | |
""" | |
url = "http://nscloud.api.nsfocus.com/api/nuri/lan/devicestatus/" | |
resp = requests.get(url, params={"show_status": 2, "hashs": ",".join(dev_hashes)}) | |
if resp.status_code // 100 not in (2, 3): | |
return {} | |
rjson = resp.json() | |
if rjson["status"] != 2000: | |
return {} | |
status_li = {} | |
for status in rjson: | |
status_li[status["device_hash"]] = status | |
return status_li | |
@exec_profile | |
@exception_handler(expect_value=[]) | |
@tag("get_device_protocol_li") | |
def get_device_protocol(hashs): | |
url = "http://nscloud.api.nsfocus.com/api/nuri/etau/device" | |
resp = requests.get(url, params={"device_hash": ",".join(hashs)}) | |
if resp.status_code // 100 not in (2, 3): | |
return {} | |
rjson = resp.json() | |
if rjson["status"] != 2000: | |
return {} | |
protocols = {} | |
for device in rjson: | |
protocols[device["device_hash"]] = device["protocol_type"] | |
return protocols | |
class Worker(object): | |
def __init__(self, period, redis_conf, mysql_conf, hv_conf): | |
self._pool = ThreadPoolExecutor(max_workers=20) | |
self.period = period | |
self.running = True | |
self.mysql_connect_param = {} | |
self.mysql_connect_param.update( | |
host=mysql_conf['host'], | |
port=mysql_conf['port'], | |
db="zelda", | |
user=mysql_conf['user'], | |
password=mysql_conf['password'], | |
charset='utf8mb4', | |
cursorclass=pymysql.cursors.DictCursor, | |
read_timeout=30, | |
write_timeout=30, | |
defer_connect=True | |
) | |
self.mc = pymysql.Connection(**self.mysql_connect_param) | |
self.rc = StrictRedis( | |
socket_timeout=30, | |
socket_connect_timeout=10, | |
retry_on_timeout=True, | |
**redis_conf) | |
self.hv_conf = hv_conf | |
self.devices = [] | |
def renew_mysql(self): | |
try: | |
if self.mc: | |
self.mc.close() | |
except Exception: | |
pass | |
finally: | |
self.mc = pymysql.connect(**self.mysql_connect_param) | |
def sync_device(self): | |
@exec_profile | |
@exception_handler(expect_value=None) | |
@tag("sync_device") | |
def syncer(): | |
retry = 3 | |
while retry: | |
try: | |
self.mc.ping(reconnect=True) | |
except: | |
retry -= 1 | |
logger.warning("mysql connection retrying({0})...".format(retry)) | |
else: | |
break | |
if retry <= 0: | |
return | |
with self.mc.cursor() as cursor: | |
cursor.execute("select hash from waf where hash IS NOT NULL") | |
self.devices = map(operator.itemgetter("hash"), cursor.fetchall()) | |
logger.info("current device number: {0}".format(len(self.devices))) | |
counter = 0 | |
while self.running: | |
if counter - 30 >= 0: | |
job = self._pool.submit(syncer) | |
wait([job], timeout=30) | |
counter = 0 | |
time.sleep(1) | |
counter += 1 | |
@exec_profile | |
@tag("hv_query") | |
def deal_devices(self, dev_hash, status_detail_li, protocol): | |
presto = Presto(**self.hv_conf) | |
try: | |
info = dict(netflow={}) | |
base_status = status_detail_li.get(dev_hash, {}) | |
if base_status: | |
info.update( | |
cpu=base_status.get("cpu_usage", 0), | |
mem=base_status.get("mem_usage", 0), | |
online=base_status.get("online_status", 0), | |
lasttimepoint=int(time.time()) \ | |
if base_status.get("online_status", 0) \ | |
else base_status.get("lasttimepoint") | |
) | |
status_data = get_device_cardflow([dev_hash], protocol=protocol, presto=presto) | |
if status_data: | |
info.update( | |
netflow=status_data[dev_hash] | |
) | |
logger.debug("device hash: <{0}>, cache data: {1}".format(dev_hash, status_data)) | |
self.rc.set( | |
name="{0}{1}".format(DEVICE_STATUS_INFO_CACHE_PRIFIX, dev_hash), | |
value=json.dumps(info) | |
) | |
except Exception as e: | |
logger.error(str(e), exc_info=True) | |
@exec_profile | |
@exception_handler(expect_value=None) | |
@tag("task_running") | |
def task(self): | |
if not self.devices: | |
return | |
timeout = len(self.devices) * 3 | |
protocol_li = get_device_protocol(self.devices) | |
status_detail_li = get_device_status_detail(self.devices) | |
protocol_old = [key for key, value in protocol_li.iteritems() if value == 0] | |
protocol_new = list(set(protocol_li.keys()) - set(protocol_old)) | |
new_device_parser = partial(self.deal_devices, protocol=1, status_detail_li=status_detail_li) | |
old_device_parser = partial(self.deal_devices, protocol=0, status_detail_li=status_detail_li) | |
fs_new = [self._pool.submit(new_device_parser, dev) for dev in protocol_new] | |
wait(fs_new, timeout=timeout, return_when=ALL_COMPLETED) | |
fs_old = [self._pool.submit(old_device_parser, dev) for dev in protocol_old] | |
wait(fs_old, timeout=timeout, return_when=ALL_COMPLETED) | |
logger.info("Sync Status Done.") | |
def stop(self, signum, frame): | |
self.running = False | |
self._pool.shutdown() | |
def run_worker(self): | |
data_job = threading.Thread(target=self.sync_device) | |
data_job.start() | |
while self.running: | |
self.task() | |
time.sleep(self.period) | |
def main(): | |
options = parse_args() | |
time_period = options.PERIOD | |
mysql_conf = cm.get("mysql_setting") | |
redis_conf = cm.get("cache_redis_setting") | |
hv_conf = cm.get("hv") | |
runner = Worker( | |
mysql_conf=mysql_conf, | |
redis_conf=redis_conf, | |
hv_conf=hv_conf, | |
period=time_period | |
) | |
signal.signal(signal.SIGTERM, runner.stop) | |
signal.signal(signal.SIGINT, runner.stop) | |
signal.signal(signal.SIGQUIT, runner.stop) | |
runner.run_worker() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment