Skip to content

Instantly share code, notes, and snippets.

View piroyoung's full-sized avatar
🏠
Working from home

Hiroki Mizukami piroyoung

🏠
Working from home
View GitHub Profile
import abc
import functools
import json
import time
import uuid
from dataclasses import asdict
from dataclasses import dataclass
from logging import INFO
from logging import Logger
from logging import basicConfig
@piroyoung
piroyoung / dimcheck.py
Last active September 16, 2018 05:36
dimcheck.py
from typing import Callable
from numpy import array
from numpy import ndarray
def dimension(*expected: int):
def wrap_up(func: Callable) -> Callable:
def wrapped(*args, **kwargs) -> ndarray:
result: ndarray = func(*args, **kwargs)
@piroyoung
piroyoung / tf_util.py
Last active May 7, 2018 15:23
可変長の1階テンソルをskip-gramの2階テンソルに変換する. tf.data.Datasetと一緒にお使いください
import tensorflow as tf
class VarLenSeries:
"""
sess = tf.InteractiveSession()
series = tf.constant(list(range(5)))
sess.run(VarLenSeries(series).to_skip_gram(3))
# # it returns below
@piroyoung
piroyoung / pipeline.py
Last active March 27, 2018 17:06
シングルノードマルチスレッドでサクッとパイプライン処理できたらいいなと思ったけど,普通にアクターモデルとかに乗っかった方がいい気がするのでボツ
import queue
import threading
from typing import Callable, Generator, Any
_TERM = threading.Event()
class Worker(threading.Thread):
def __init__(self, source: queue.Queue, destination: queue.Queue, batch_size: int,
callback: Callable[[Generator], Any]):
@piroyoung
piroyoung / event_counter.py
Created February 18, 2018 02:35
一定時間内に発生したイベント数を返すAPI
import redis
import time
from flask import jsonify, Flask
class EventCounter:
def __init__(self, seconds: float):
pool: redis.ConnectionPool = redis.ConnectionPool(host='localhost', port=6379, db=0)
self._r: redis.StrictRedis = redis.StrictRedis(connection_pool=pool)
self._s: float = seconds
@piroyoung
piroyoung / leak_killer.py
Last active January 17, 2018 16:14
定期的にリークしてそうなオブジェクトを殺すマン.別スレッドでインスタンス作ってqueueに積む実装にするとなお良さそう
import threading
import contextlib
class LeakObject:
def __init__(self, lifetime: int, cls: object, **kwargs):
self._counter = 0
self.lifetime = lifetime
self._kwargs = kwargs
@piroyoung
piroyoung / sample_leak.py
Created January 17, 2018 12:09
人為的にメモリリークさせる例
import threading
import tracemalloc
import time
tracemalloc.start()
class Sub(threading.Thread):
def run(self):
while True:
time.sleep(3)
@piroyoung
piroyoung / pt.py
Created January 9, 2018 22:51
observing spent time
import time
from typing import Callable, Any
def print_timestamp(func: Callable) -> Callable:
def wrap_up(**kwargs) -> Any:
t0 = time.time()
res = func(**kwargs)
t1 = time.time()
logger.debug({'elapsed_time': t1-t0})
@piroyoung
piroyoung / deco.py
Created December 8, 2017 01:43
このデコレータべんり
def retry_if_fail(callback: Callable):
"""
bigtable sdk auto-reconnect works only if failed.
:param callback: some callback function
:return:
"""
def wrap(*args, **kwargs):
try:
return callback(*args, **kwargs)
from typing import List, Callable, TypeVar
from functools import reduce, lru_cache
T = TypeVar('T')
class MyCollection(List[T]):
def reduce(self, f: Callable[[T, T], T]):
return reduce(f, self)
# example usage
obj = MyCollection(range(5))
obj.reduce(lambda x, y: x + y)