Skip to content

Instantly share code, notes, and snippets.

@rphillips
Created October 20, 2010 05:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rphillips/635823 to your computer and use it in GitHub Desktop.
Save rphillips/635823 to your computer and use it in GitHub Desktop.
hacky irc
import select
import socket
import sys
import fcntl
import os
EPOLLIN = select.EPOLLIN
EPOLLOUT = select.EPOLLOUT
epoll = select.epoll(60000)
connections = {}
SERVER='irc.freenode.net'
PORT=6667
NICK="username-test"
ROOM='#test-room'
class Client(object):
output = []
def __init__(self, host, port):
sock=socket.socket()
sock.connect((host, port))
sock.setblocking(0)
fileno = sock.fileno()
epoll.register(fileno, EPOLLIN|EPOLLOUT)
connections[fileno] = self
self.socket = sock
self.output.append("NICK %s\r\n" % NICK)
self.output.append("USER %s %s bla :%s\r\n" % (NICK, SERVER, NICK))
self.sent = 0
def event(self, line):
if line.find("001") != -1:
print("Joining %s" % ROOM)
self.output.append("JOIN %s\r\n" % ROOM)
def write(self, cmd):
self.output.append("PRIVMSG %s :%s\r\n" % (ROOM,cmd))
def onInput(self):
newdata = self.socket.recv(1024)
if len(newdata) is 0:
self.close()
for line in newdata.split("\r\n"):
self.event(line)
def onOutput(self):
try:
o = self.output[0]
print('sending... ' + o[self.sent:])
self.sent = self.socket.send(o[self.sent:])
if o[self.sent:] == '':
self.sent = 0
self.output.pop(0)
except:
pass
def close(self):
fileno = self.socket.fileno()
del connections[fileno]
epoll.unregister(fileno)
self.socket.close()
class UserInteraction(object):
def __init__(self, client):
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
epoll.register(fd, EPOLLIN)
connections[fd] = self
self.client = client
self.data = ''
def onInput(self):
s = sys.stdin.read()
if (len(s) > 0):
self.data += s
if self.data.find('\n') != -1:
print(self.data.strip())
self.client.write(self.data.strip())
self.data = ''
def onOutput(self):
print('onOutput')
def close(self):
del connections[fileno]
epoll.unregister(fileno)
c = Client(SERVER, PORT)
UserInteraction(c)
while 1:
for fd, event in epoll.poll():
if event & EPOLLIN:
connections[fd].onInput()
if event & EPOLLOUT:
connections[fd].onOutput()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment