Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@zopieux
Last active July 30, 2017 22:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zopieux/84f3bea4895b75fa404872cc09aa8f40 to your computer and use it in GitHub Desktop.
Save zopieux/84f3bea4895b75fa404872cc09aa8f40 to your computer and use it in GitHub Desktop.
from curio import run, spawn, open_connection, sleep
import argparse
import collections
import re
TOKEN = re.compile(r'([\S]{3,})', re.I)
async def client(opts):
q = collections.deque(maxlen=opts.window)
pattern = f'PRIVMSG {opts.channel}'
sock = await open_connection(opts.server, opts.port)
stream = sock.as_stream()
async def ping():
while True:
await sleep(30)
await stream.write(b'PING\r\n')
async def read():
while True:
line = (await stream.readline()).rstrip().decode()
if 'PING' in line:
await stream.write(f'PONG :{line.split(":", 1)[1]}\r\n')
continue
if pattern not in line:
continue
line = line.lstrip(':').split(':', 1)[1].lower()
q.extend(set(TOKEN.findall(line)))
async with stream:
await stream.write(f'PASS {opts.password}\r\n'.encode())
await stream.write(f'NICK {opts.user}\r\n'.encode())
await stream.write(f'JOIN {opts.channel}\r\n'.encode())
await spawn(ping())
await spawn(read())
while True:
best = ' '.join(w for w, c in collections.Counter(q).most_common(opts.count))
print(f'{best:<60s}', end='\r')
await sleep(1)
if __name__ == '__main__':
p = argparse.ArgumentParser()
p.add_argument('-s', '--server', default='irc.chat.twitch.tv')
p.add_argument('-w', '--window', type=int, default=30)
p.add_argument('-n', '--count', type=int, default=9)
p.add_argument('-p', '--port', type=int, default=6667)
p.add_argument('user')
p.add_argument('password')
p.add_argument('channel')
args = p.parse_args()
run(client(args))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment