Created
April 24, 2018 06:42
-
-
Save smileboywtu/39ede732814f64408112366e51b96830 to your computer and use it in GitHub Desktop.
simple python daemon worker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
sync device status data from hive | |
""" | |
import argparse | |
import json | |
import logging.handlers | |
import operator | |
import signal | |
import threading | |
import time | |
from functools import partial | |
from functools import wraps | |
import os | |
import pymysql | |
import requests | |
from concurrent.futures import ThreadPoolExecutor, ALL_COMPLETED, wait | |
from redis import StrictRedis | |
from config_manager import ConfigManager | |
from daemons.ifflow.cardflow import get_device_cardflow | |
from daemons.ifflow.presto import Presto | |
logger = logging.getLogger("root") | |
logger.setLevel(logging.DEBUG) | |
formatter = logging.Formatter( | |
"[%(asctime)s %(filename)s:%(lineno)s - %(funcName)15s()] %(message)s" | |
) | |
handler = logging.handlers.RotatingFileHandler("/tmp/status_sync.log") | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
DEVICE_STATUS_INFO_CACHE_PRIFIX = "nsfocus:cache:zelda:device:status:" | |
config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), os.path.pardir, "config.yaml") | |
cm = ConfigManager(config_file=config_file) | |
class Timeit(object): | |
def __init__(self, tag): | |
self.tag = tag | |
def __enter__(self): | |
self.start = time.time() | |
def __exit__(self, *unused): | |
self.cost = time.time() - self.start | |
logger.info("%s took about: %ss" % (self.tag, self.cost)) | |
def tag(name): | |
def wrapper(func): | |
func.__tag__ = name | |
return func | |
return wrapper | |
def exec_profile(func): | |
@wraps(func) | |
def new(*args, **kwds): | |
with Timeit(func.__tag__): | |
return func(*args, **kwds) | |
return new | |
def exception_handler(expect_value=None): | |
def wrapper(func): | |
@wraps(func) | |
def handler(*args, **kwargs): | |
try: | |
return func(*args, **kwargs) | |
except Exception as e: | |
logger.error("{0} raise exception {1}".format(func.__tag__, str(e))) | |
return expect_value | |
return handler | |
return wrapper | |
def parse_args(): | |
parser = argparse.ArgumentParser(description="Sync Device Network Information To Redis") | |
parser.add_argument( | |
"-p", "--period", | |
type=int, | |
dest="PERIOD", | |
default=2 * 60, | |
help="Sync Time Period" | |
) | |
return parser.parse_args() | |
@exec_profile | |
@exception_handler(expect_value=[]) | |
@tag("get_device_status_li") | |
def get_device_status_detail(dev_hashes): | |
""" | |
Get all device status | |
:param hashs: | |
:return: | |
""" | |
url = "http://nscloud.api.nsfocus.com/api/nuri/lan/devicestatus/" | |
resp = requests.get(url, params={"show_status": 2, "hashs": ",".join(dev_hashes)}) | |
if resp.status_code // 100 not in (2, 3): | |
return {} | |
rjson = resp.json() | |
if rjson["status"] != 2000: | |
return {} | |
status_li = {} | |
for status in rjson: | |
status_li[status["device_hash"]] = status | |
return status_li | |
@exec_profile | |
@exception_handler(expect_value=[]) | |
@tag("get_device_protocol_li") | |
def get_device_protocol(hashs): | |
url = "http://nscloud.api.nsfocus.com/api/nuri/etau/device" | |
resp = requests.get(url, params={"device_hash": ",".join(hashs)}) | |
if resp.status_code // 100 not in (2, 3): | |
return {} | |
rjson = resp.json() | |
if rjson["status"] != 2000: | |
return {} | |
protocols = {} | |
for device in rjson: | |
protocols[device["device_hash"]] = device["protocol_type"] | |
return protocols | |
class Worker(object): | |
def __init__(self, period, redis_conf, mysql_conf, hv_conf): | |
self._pool = ThreadPoolExecutor(max_workers=20) | |
self.period = period | |
self.running = True | |
self.mysql_connect_param = {} | |
self.mysql_connect_param.update( | |
host=mysql_conf['host'], | |
port=mysql_conf['port'], | |
db="zelda", | |
user=mysql_conf['user'], | |
password=mysql_conf['password'], | |
charset='utf8mb4', | |
cursorclass=pymysql.cursors.DictCursor, | |
read_timeout=30, | |
write_timeout=30, | |
defer_connect=True | |
) | |
self.mc = pymysql.Connection(**self.mysql_connect_param) | |
self.rc = StrictRedis( | |
socket_timeout=30, | |
socket_connect_timeout=10, | |
retry_on_timeout=True, | |
**redis_conf) | |
self.hv_conf = hv_conf | |
self.devices = [] | |
def renew_mysql(self): | |
try: | |
if self.mc: | |
self.mc.close() | |
except Exception: | |
pass | |
finally: | |
self.mc = pymysql.connect(**self.mysql_connect_param) | |
def sync_device(self): | |
@exec_profile | |
@exception_handler(expect_value=None) | |
@tag("sync_device") | |
def syncer(): | |
retry = 3 | |
while retry: | |
try: | |
self.mc.ping(reconnect=True) | |
except: | |
retry -= 1 | |
logger.warning("mysql connection retrying({0})...".format(retry)) | |
else: | |
break | |
if retry <= 0: | |
return | |
with self.mc.cursor() as cursor: | |
cursor.execute("select hash from waf where hash IS NOT NULL") | |
self.devices = map(operator.itemgetter("hash"), cursor.fetchall()) | |
logger.info("current device number: {0}".format(len(self.devices))) | |
counter = 0 | |
while self.running: | |
if counter - 30 >= 0: | |
job = self._pool.submit(syncer) | |
wait([job], timeout=30) | |
counter = 0 | |
time.sleep(1) | |
counter += 1 | |
@exec_profile | |
@tag("hv_query") | |
def deal_devices(self, dev_hash, status_detail_li, protocol): | |
presto = Presto(**self.hv_conf) | |
try: | |
info = dict(netflow={}) | |
base_status = status_detail_li.get(dev_hash, {}) | |
if base_status: | |
info.update( | |
cpu=base_status.get("cpu_usage", 0), | |
mem=base_status.get("mem_usage", 0), | |
online=base_status.get("online_status", 0), | |
lasttimepoint=int(time.time()) \ | |
if base_status.get("online_status", 0) \ | |
else base_status.get("lasttimepoint") | |
) | |
status_data = get_device_cardflow([dev_hash], protocol=protocol, presto=presto) | |
if status_data: | |
info.update( | |
netflow=status_data[dev_hash] | |
) | |
logger.debug("device hash: <{0}>, cache data: {1}".format(dev_hash, status_data)) | |
self.rc.set( | |
name="{0}{1}".format(DEVICE_STATUS_INFO_CACHE_PRIFIX, dev_hash), | |
value=json.dumps(info) | |
) | |
except Exception as e: | |
logger.error(str(e), exc_info=True) | |
@exec_profile | |
@exception_handler(expect_value=None) | |
@tag("task_running") | |
def task(self): | |
if not self.devices: | |
return | |
timeout = len(self.devices) * 3 | |
protocol_li = get_device_protocol(self.devices) | |
status_detail_li = get_device_status_detail(self.devices) | |
protocol_old = [key for key, value in protocol_li.iteritems() if value == 0] | |
protocol_new = list(set(protocol_li.keys()) - set(protocol_old)) | |
new_device_parser = partial(self.deal_devices, protocol=1, status_detail_li=status_detail_li) | |
old_device_parser = partial(self.deal_devices, protocol=0, status_detail_li=status_detail_li) | |
fs_new = [self._pool.submit(new_device_parser, dev) for dev in protocol_new] | |
wait(fs_new, timeout=timeout, return_when=ALL_COMPLETED) | |
fs_old = [self._pool.submit(old_device_parser, dev) for dev in protocol_old] | |
wait(fs_old, timeout=timeout, return_when=ALL_COMPLETED) | |
logger.info("Sync Status Done.") | |
def stop(self, signum, frame): | |
self.running = False | |
self._pool.shutdown() | |
def run_worker(self): | |
data_job = threading.Thread(target=self.sync_device) | |
data_job.start() | |
while self.running: | |
self.task() | |
time.sleep(self.period) | |
def main(): | |
options = parse_args() | |
time_period = options.PERIOD | |
mysql_conf = cm.get("mysql_setting") | |
redis_conf = cm.get("cache_redis_setting") | |
hv_conf = cm.get("hv") | |
runner = Worker( | |
mysql_conf=mysql_conf, | |
redis_conf=redis_conf, | |
hv_conf=hv_conf, | |
period=time_period | |
) | |
signal.signal(signal.SIGTERM, runner.stop) | |
signal.signal(signal.SIGINT, runner.stop) | |
signal.signal(signal.SIGQUIT, runner.stop) | |
runner.run_worker() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment