Skip to content

Instantly share code, notes, and snippets.

@e000
Created January 21, 2011 22:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save e000/790588 to your computer and use it in GitHub Desktop.
Save e000/790588 to your computer and use it in GitHub Desktop.
youtube module
from twisted.internet.task import LoopingCall
from twisted.web.client import getPage
from twisted.internet.defer import DeferredLock, inlineCallbacks, returnValue
from Core.ZalgoUtil.CmdWraps import command, hook
from Core.ZalgoUtil.ModUtil import dateDiff, utf8
from Core.ZalgoBase.BaseModule import BaseModule
from Core.ZalgoUtil.DeferredCache import deferred_lfu_cache
from Core.ZalgoHooks import Hooks
from xml.etree.cElementTree import fromstring
import re
from time import time, mktime
from iso8601 import parse_date
class YoutubeVideo:
@staticmethod
def fromTreeObject(xml):
vid = YoutubeVideo()
find = xml.find
vid.id = find('{http://www.w3.org/2005/Atom}id').text
vid.published = parse_date(find('{http://www.w3.org/2005/Atom}published').text)
vid.title = find('{http://www.w3.org/2005/Atom}title').text
vid.author = find('{http://www.w3.org/2005/Atom}author/{http://www.w3.org/2005/Atom}name').text
vid.rating = dict(find('{http://schemas.google.com/g/2005}rating').items())
vid.views = int(find('{http://gdata.youtube.com/schemas/2007}statistics').get('viewCount'))
vid.duration = int(find('{http://search.yahoo.com/mrss/}group/{http://gdata.youtube.com/schemas/2007}duration').get('seconds'))
vid.got = time()
return vid
@staticmethod
def fromXml(xml):
return YoutubeVideo.fromTreeObject(fromstring(xml))
def __repr__(self):
return '<YoutubeVideo title: %s, author: %s, duration: %s, rating: %s, views: %s>' % (
self.title, self.author, self.duration, self.rating['average'], self.views
)
class Module(BaseModule):
parseCommands = True
parseHooks = True
regex = re.compile('youtube\.com\/watch\?v=([0-9A-z-_]{11})')
match = regex.search
finditer = regex.findall
def init(self):
self.lc = LoopingCall(self.getYoutubeVideo.clear) # clear the cache every hour.
self.lc.start(60*60, now = False)
def onUnload(self):
if self.lc.running:
self.lc.stop() # stop the cache clearer.
del self.lc
@hook(Hooks.PRIVMSG)
def isYtLink(self, hook, user, channel, line, params):
if self.match(line):
q = []
for vid in self.finditer(line):
if vid not in q:
self.findYoutubeLink(channel, vid)
q.append(id)
return Hooks.STOP
@inlineCallbacks
def findYoutubeLink(self, channel, videoId):
try:
video, isCached = yield self.getYoutubeVideo(videoId)
out = "\x02\x037,01[\x030You\x034Tube\x037] \x0307%s \x0315(%i:%02i) \x0308by \x0311%s \x0308%.2f\x0315/\x03095 \x0308stars\x038, \x0309%i\x038 views\x0315" % (
video.title, video.duration//60, video.duration % 60, video.author, float(video.rating['average']), video.views,
)
if isCached:
out += ' cached about %s ago.' % (dateDiff(time() - video.got))
else:
out += '.'
self.bot.privmsg(channel, utf8(out))
except:
self.bot.privmsg(channel, "\x02\x037,01[\x030You\x034Tube\x037] \x034Video \x0308%s\x034 not found." % videoId)
@deferred_lfu_cache(maxsize=25)
def getYoutubeVideo(self, videoID):
return getPage(
'http://gdata.youtube.com/feeds/api/videos/%s' % videoID,
agent='Mozilla/5.0 (X11; U; Linux i686; en-US) AppleWebKit/534.16 (KHTML, like Gecko) Chrome/10.0.634.0 Safari/534.16'
).addCallback(
lambda xml: YoutubeVideo.fromXml(xml)
)
@command('.youtube-cachestats', level = 1)
def stats(self, cmd, user, channel, line, params):
f = self.getYoutubeVideo
self.bot.privmsg(channel,
"[\x02YouTubeParser\x02] deferred_lfu_cache hits: %s, misses: %s, size: %s, waiting: %s, maxsize: %s" %
(f.hits, f.misses, f.size(), f.waiting(), f.maxsize))
@command('.youtube-cacheclear', level = 1)
def clear(self, cmd, user, channel, line, params):
self.getYoutubeVideo.clear()
self.bot.privmsg(channel, "[\x02YouTubeParser\x02] deferred_lfu_cache cleared.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment