Skip to content

Instantly share code, notes, and snippets.

@piroyoung
Created February 18, 2018 02:35
Show Gist options
  • Save piroyoung/1447877cd4a6aa60a3e792356f07a2ad to your computer and use it in GitHub Desktop.
Save piroyoung/1447877cd4a6aa60a3e792356f07a2ad to your computer and use it in GitHub Desktop.
一定時間内に発生したイベント数を返すAPI
import redis
import time
from flask import jsonify, Flask
class EventCounter:
def __init__(self, seconds: float):
pool: redis.ConnectionPool = redis.ConnectionPool(host='localhost', port=6379, db=0)
self._r: redis.StrictRedis = redis.StrictRedis(connection_pool=pool)
self._s: float = seconds
@staticmethod
def get_byte_timestamp() -> bytes:
return time.time().hex().encode()
def touch(self, event_name: str) -> None:
self._r.sadd(event_name.encode(), self.get_byte_timestamp())
self._r.expire(event_name.encode(), self._s)
return None
def count(self, event_name: str) -> int:
now = time.time()
key = event_name.encode()
with self._r.pipeline() as p:
for v in self._r.smembers(key):
if now - float.fromhex(v.decode()) > self._s:
p.srem(key, v)
p.execute()
return self._r.scard(key)
_app = Flask(__name__)
_counter = EventCounter(seconds=30)
@_app.route('/touch/<event_name>')
def touch(event_name: str):
try:
_counter.touch(event_name=event_name)
return jsonify({'success': True})
except:
return jsonify({'success': False})
@_app.route('/count/<event_name>')
def count(event_name: str):
try:
return jsonify(
{
'success': True,
'count': _counter.count(event_name)
}
)
except:
return jsonify(
{
'success': False
}
)
if __name__ == '__main__':
_app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment