Skip to content

Instantly share code, notes, and snippets.

@hillar
Last active August 29, 2015 13:58
Show Gist options
  • Save hillar/10100156 to your computer and use it in GitHub Desktop.
Save hillar/10100156 to your computer and use it in GitHub Desktop.
dumps last {window_time} events in room on every {purge_time} to json file (used for anonymous vsroom demo)
import idiokit
from abusehelper.core import bot, taskfarm
# please move Roombot from contrib to core
class RoomBot(bot.ServiceBot):
def __init__(self, *args, **keys):
bot.ServiceBot.__init__(self, *args, **keys)
self.room_handlers = taskfarm.TaskFarm(self._handle_room)
@idiokit.stream
def _handle_room(self, name):
msg = "room {0!r}".format(name)
attrs = events.Event(type="room", service=self.bot_name, room=name)
with self.log.stateful(repr(self.xmpp.jid), "room", repr(name)) as log:
log.open("Joining " + msg, attrs, status="joining")
room = yield self.xmpp.muc.join(name, self.bot_name)
log.open("Joined " + msg, attrs, status="joined")
try:
yield room
finally:
log.close("Left " + msg, attrs, status="left")
def to_room(self, name):
return self.room_handlers.inc(name) | idiokit.consume()
def from_room(self, name):
return idiokit.consume() | self.room_handlers.inc(name)
#
# dumps room last (window_time) events on every (purge_time) to json file
#
import time
import collections
from abusehelper.core import events, services
from abusehelper.core.archivebot import ensure_dir
try:
import simplejson as json
except ImportError:
import json
class Dump2JsonBot(RoomBot):
dump_dir = bot.Param(default='./')
_state = collections.deque()
@idiokit.stream
def session(self, state, src_room, window_time=3600*24, purge_time=60):
window_time = int(window_time)
purge_time = int(purge_time)
self.dump_dir = ensure_dir(self.dump_dir)
file_name = self.dump_dir + '/' + src_room +'.json'
self.log.info('setting dump file to %r', file_name)
if state:
self.log.info('got state %r',state)
queue = state
else:
queue = collections.deque()
try:
yield (self.from_room(src_room)
| events.stanzas_to_events()
| self.collect(queue, window_time)
| self.dump_and_purge(queue, purge_time,file_name))
except services.Stop:
idiokit.stop(queue)
@idiokit.stream
def collect(self, queue, window_time):
while True:
event = yield idiokit.next()
current_time = time.time()
expire_time = current_time + window_time
eve = dict()
for key, value in event.items():
eve[key] = value
queue.append((expire_time, eve))
@idiokit.stream
def dump_and_purge(self, queue, purge_time,file_name):
yield idiokit.sleep(purge_time/2)
while True:
current_time = time.time()
while queue and queue[0][0] <= current_time:
expire_time, event = queue.popleft()
if queue:
_json = []
for _, event in queue:
_json.append(event)
self.log.info('writing to %r event count %r', file_name, len(_json))
json_file = open(file_name, 'w')
json.dump(_json, json_file, indent=0)
json_file.close()
yield idiokit.sleep(purge_time)
if __name__ == "__main__":
Dump2JsonBot.from_command_line().execute()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment