Skip to content

Instantly share code, notes, and snippets.

@xudifsd
Last active March 18, 2021 19:51
Show Gist options
  • Save xudifsd/8e3df2a7ce14eb50671d5c60ca5f3e36 to your computer and use it in GitHub Desktop.
Save xudifsd/8e3df2a7ce14eb50671d5c60ca5f3e36 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import traceback
import sqlite3
import logging
from Queue import Queue
from Queue import Empty
import threading
import multiprocessing
import functools
import signal
import faulthandler # since python 3.3 https://docs.python.org/3/library/faulthandler.html
import configparser
log = logging.getLogger("py_utils")
#lib_dir_path = os.path.join(os.path.dirname(__file__), "../py-lib")
#
#sys.path.append(lib_dir_path)
#sys.path.append(os.path.join(lib_dir_path, "protobuf-3.2.0-py2.7.egg"))
#sys.path.append(os.path.join(lib_dir_path, "six-1.10.0-py2.py3-none-any.whl"))
#sys.path.append(os.path.join(lib_dir_path, "thrift"))
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TCompactProtocol
class Connect:
def __init__(self, address, port):
self.__address = address
self.__port = port
def curl(self, httpType, url, param={}):
if not (self.__address and self.__port):
raise ValueError("should provide address and port")
content=urllib.urlencode(param)
connection = httplib.HTTPConnection(self.__address, self.__port)
if httpType != 'POST' and httpType != 'PUT':
paramUrl = url + '?' + content
connection.request(httpType, paramUrl)
else:
headers = {"Content-type": "application/x-www-form-urlencoded;charset=UTF-8"}
connection.request(httpType,url,content,headers)
response=connection.getresponse()
result=response.read().strip()
return result
def connect(self, Client):
if not (self.__address and self.__port):
raise ValueError("should provide address and port")
self.__transport = TSocket.TSocket(self.__address, self.__port)
self.__transport = TTransport.TFramedTransport(self.__transport)
self.__transport = TTransport.TBufferedTransport(self.__transport)
self.__protocol = TBinaryProtocol.TBinaryProtocol(self.__transport)
self.__client = Client(self.__protocol)
self.__transport.open()
return self.__client
def close(self):
if self.__transport:
self.__transport.close()
class Sqlite3Writer:
""" Because sqlite3 can not be safely opened in multi-thread, so we use
this class to do write, other should only pass write operation through
queue """
CREATE_HOST_TABLE = """CREATE TABLE IF NOT EXISTS host (
cluster_name text NOT NULL,
ip text NOT NULL,
host_name text,
tags text)"""
CREATE_HOST_INDEX = """CREATE INDEX IF NOT EXISTS host_index ON host (cluster_name, ip);"""
CREATE_JOB_TABLE = """CREATE TABLE IF NOT EXISTS job (
cluster_name text NOT NULL,
user_name text NOT NULL,
service_name text NOT NULL,
offset integer NOT NULL,
host_ip text)"""
CREATE_JOB_INDEX = """CREATE INDEX IF NOT EXISTS job_index ON job (
cluster_name, user_name, service_name, host_ip);"""
def __init__(self, queue, db_path="data/matrix_data.db"):
self.db_path = db_path
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(Sqlite3Writer.CREATE_HOST_TABLE)
cursor.execute(Sqlite3Writer.CREATE_HOST_INDEX)
cursor.execute(Sqlite3Writer.CREATE_JOB_TABLE)
cursor.execute(Sqlite3Writer.CREATE_JOB_INDEX)
conn.commit()
conn.close()
self.running = True
self.queue = queue
def process(self):
conn = sqlite3.connect(self.db_path)
try:
cursor = conn.cursor()
batch = 0
while self.running:
try:
data = self.queue.get(True, 1)
conn.cursor().execute(*data)
batch += 1
if batch > 1000:
conn.commit()
batch = 0
except Empty as e:
# queue empty
if batch > 0:
conn.commit()
batch = 0
continue
except Exception as e:
traceback.print_exc()
log.warn("data is %s, error is %s" % (str(data), str(e)))
finally:
conn.commit()
conn.close()
def close(self):
self.running = False
def thread_pool_map(target, args, thread_pool_size=5):
""" block until all task finished, args SHOULD be list of tuple """
def wrapper(queue, *args, **kwargs):
result = None
try:
result = target(*args, **kwargs)
except Exception as e:
log.info("apply target failed with args %s, kwargs %s", str(args), str(kwargs))
log.exception(e)
finally:
queue.put(result)
thread_pool = []
result = []
for arg in args:
if len(thread_pool) > thread_pool_size:
joined = False
while not joined:
for i in xrange(len(thread_pool)):
# TODO we can reuse finished thread
t = thread_pool[i]
t.join(0.2)
if not t.isAlive():
joined = True
thread_pool.pop(i)
break
queue = Queue()
result.append(queue)
processor = functools.partial(wrapper, queue)
if type(arg) != tuple:
log.warn("arg %s is not of type tuple, ignore it", str(arg))
continue
thread_pool.append(threading.Thread(target=processor, args=arg,
name="thread-worker-" + str(len(result))))
thread_pool[-1].start()
log.info("finished adding all tasks")
for t in thread_pool:
t.join()
log.info("all tasks finished")
def getter(queue):
try:
return queue.get(False)
except Empty:
return None
return map(getter, result)
def process_pool_map(target, args, process_pool_size=5):
""" block until all task finished, args SHOULD be list of tuple """
def wrapper(queue, *args, **kwargs):
""" wrapper fn to log any exception """
result = None
try:
result = target(*args, **kwargs)
except Exception as e:
log.info("apply target failed with args %s, kwargs %s", str(args), str(kwargs))
log.exception(e)
finally:
queue.put(result)
process_pool = []
result = []
for arg in args:
if len(process_pool) > process_pool_size:
joined = False
while not joined:
for i in xrange(len(process_pool)):
# TODO we can reuse finished thread
t = process_pool[i]
t.join(0.2)
if not t.is_alive():
joined = True
process_pool.pop(i)
break
queue = multiprocessing.Queue()
result.append(queue)
processor = functools.partial(wrapper, queue)
if type(arg) != tuple:
log.warn("arg %s is not of type tuple, ignore it", str(arg))
continue
process_pool.append(multiprocessing.Process(target=processor, args=arg,
name="processor-worker-" + str(len(result))))
process_pool[-1].start()
log.info("finished adding all tasks to " + str(target))
for t in process_pool:
t.join()
log.info("all tasks finished for tasks " + str(target))
def getter(queue):
""" wrapper getter return None on Empty queue """
try:
return queue.get(False)
except Empty:
return None
return map(getter, result)
def register_stack_trace_dump():
faulthandler.register(signal.SIGTRAP, all_threads=True, chain=False)
def load_and_write_ini(file):
config = configparser.ConfigParser()
config.optionxform=str # make it case sensitive
config.read(file)
print(config["foo"])
config["foo"] = "bar"
with open(file, "w") as configfile:
config.write(configfile)
if __name__ == '__main__':
register_stack_trace_dump()
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
print(thread_pool_map(lambda x : x + 1, map(lambda x : (x,), range(100))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment