Skip to content

Instantly share code, notes, and snippets.

@moriyoshi
Created December 10, 2009 06:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save moriyoshi/253169 to your computer and use it in GitHub Desktop.
Save moriyoshi/253169 to your computer and use it in GitHub Desktop.
# encoding: utf-8
import urllib2
import cookielib
import re
import xml.dom as dom
import xml.dom.minidom as minidom
from BeautifulSoup import BeautifulSoup, Tag, Comment, NavigableString
__all__ = (
'AmebaNowClientException',
'UnsupportedContentTypeError',
'AuthenticationError',
'PostError',
'UnexpectedResponseError',
'AmebaNowClient',
)
class AmebaNowClientException(Exception):
pass
class UnsupportedContentTypeError(AmebaNowClientException):
pass
class AuthenticationError(AmebaNowClientException):
pass
class UnexpectedResponseError(AmebaNowClientException):
pass
class PostError(AmebaNowClientException):
pass
class AmebaNowClient(object):
LOGIN_URL = 'http://www.ameba.jp/login.do'
FORM_URL = 'http://now.ameba.jp/'
POST_URL = 'http://ucsnow.ameba.jp/post'
API_MYTIMELINE = 'http://now.ameba.jp/api/timeline'
API_TIMELINE = 'http://now.ameba.jp/api/entryList'
API_ENCODING = 'utf-8'
def __init__(self, credentials):
self.credentials = credentials
self.cookiejar = cookielib.CookieJar()
self.opener = urllib2.build_opener(
urllib2.HTTPCookieProcessor(self.cookiejar))
@staticmethod
def _textify(nodelist):
retval = ''
for n in nodelist:
if isinstance(n, Comment):
pass
elif isinstance(n, Tag):
if n.name == 'br':
retval += "\n"
else:
retval += AmebaNowClient._textify(n)
elif isinstance(n, NavigableString):
retval += unicode(n)
return retval
def _urlread(self, url, data=None, default_charset='utf-8'):
r = self.opener.open(url, data)
headers = r.info()
resp = r.read()
if headers.gettype() == 'text/html':
encoding = headers.getparam('charset')
if encoding == None:
encoding = default_charset
match = re.search(r'<meta\s+http-equiv=(["\']?)Content-Type\1\s+content=("[^"]*"|\'[^\']*\'|[^"\'\s]*)[^>]*>', resp, re.IGNORECASE)
value = match.group(2)
if value[0] == '"' or value[0] == "'":
value = value[1:-1]
# XXX: should take care of quoted values
tmp = re.split(r'\s*;\s*', value)
for k, v in (t.split('=') for t in tmp[1:]):
if k.lower() == 'charset':
encoding = v
elif headers.gettype() == 'text/xml':
encoding = headers.getparam('charset')
if encoding == None:
encoding = default_charset
match = re.match(r'''<?xml\s+[^?>]*encoding=("[^"]*"|'[^']*'|[^"'?>]*)[^?>]*?>''', resp, re.IGNORECASE)
value = match.group(1)
if value[0] == '"' or value[0] == "'":
value = value[1:-1]
encoding = v
else:
raise UnsupportedContentTypeError(headers.gettype())
r.close()
return r, encoding, resp.decode(encoding)
def login(self):
_, _, resp = self._urlread(self.LOGIN_URL,
'password=%s&amebaId=%s' % (
urllib2.quote(self.credentials['password']),
urllib2.quote(self.credentials['ameba_id'])))
error_node = BeautifulSoup(resp).find('div', 'errorId', recursive=True)
if error_node:
raise AuthenticationError(
AmebaNowClient._textify(error_node).strip())
@staticmethod
def _buildquery(params, encoding):
data = []
for k, v in params.iteritems():
if isinstance(k, unicode): k = k.encode(encoding)
if isinstance(v, unicode): v = v.encode(encoding)
k = str(k)
v = str(v)
data.append('%s=%s' % (urllib2.quote(k), urllib2.quote(v)))
return '&'.join(data)
def post(self, text, reply_to=None):
_, encoding, resp = self._urlread(self.FORM_URL)
form = BeautifulSoup(resp).find('form', id='inputForm')
params = {}
for n in form.findAll('input', type='hidden'):
params[n['name']] = n['value']
text_area_name = form.find('textarea')['name']
params[text_area_name] = text
if reply_to is not None:
params['replyEntryId'] = reply_to['id']
params['replyAmebaId'] = reply_to['ameba_id']
r, encoding, resp = self._urlread(self.POST_URL, AmebaNowClient._buildquery(params, encoding))
if r.url != self.FORM_URL:
if r.url == self.POST_URL:
error_node = BeautifulSoup(resp).find('p', id='errorArea', recursive=True)
if error_node:
raise PostError(AmebaNowClient._textify(error_node).strip())
raise PostError()
@staticmethod
def _selectonenode(parent, name):
nodes = parent.getElementsByTagName(name)
if len(nodes) == 0:
raise UnexpectedResponseError('No <%s> element' % name)
elif len(nodes) > 1:
raise UnexpectedResponseError('More than one <%s> elements found' % name)
return nodes[0]
@staticmethod
def _getnodevalue(node, name):
n = AmebaNowClient._selectonenode(node, name)
if len(n.childNodes) > 0:
return ''.join(i.nodeValue if i.nodeValue is not None else '' for i in n.childNodes)
else:
return None
@staticmethod
def _parseentrylist(entry_list_node):
result = []
getnodevalue = AmebaNowClient._getnodevalue
for i in entry_list_node.childNodes:
if i.nodeType == dom.Node.ELEMENT_NODE:
result.append({
'id': getnodevalue(i, 'entryId'),
'ameba_id': getnodevalue(i, 'amebaId'),
'text': getnodevalue(i, 'entryText'),
'reply_to': {
'ameba_id': getnodevalue(i, 'replyAmebaId'),
'id': getnodevalue(i, 'replyEntryId'),
},
'nickname': getnodevalue(i, 'thumbnailNickname'),
'image': {
'url': getnodevalue(i, 'thumbnailImagePath'),
'width': getnodevalue(i, 'thumbnailImageWidth'),
'height': getnodevalue(i, 'thumbnailImageHeight'),
},
'reply_allowed': \
not bool(int(getnodevalue(i, 'denyReplyFlag'))),
'mine': bool(int(getnodevalue(i, 'isMyEntry'))),
})
return result
def getmytimeline(self, offset=0, limit=20):
doc = minidom.parse(self.opener.open(
self.API_MYTIMELINE + '?' \
+ AmebaNowClient._buildquery(dict(offset=offset),
self.API_ENCODING)))
if doc.documentElement.nodeName != u'response':
raise UnexpectedResponseError(doc.documentElement.nodeName)
entry_list_node = AmebaNowClient._selectonenode(doc.documentElement, 'entryList')
offset = entry_list_node.getAttribute('offset')
return int(offset), AmebaNowClient._parseentrylist(entry_list_node)
def gettimeline(self, ameba_id, offset=0, limit=20):
doc = minidom.parse(self.opener.open(
self.API_TIMELINE + '/%s' % urllib2.quote(ameba_id) + '?' \
+ AmebaNowClient._buildquery(dict(offset=offset),
self.API_ENCODING)))
doc = minidom.parseString(resp)
if doc.documentElement.nodeName != u'response':
raise UnexpectedResponseError(doc.documentElement.nodeName)
entry_list_node = AmebaNowClient._selectonenode(doc.documentElement, 'entryList')
offset = entry_list_node.getAttribute('offset')
return int(offset), AmebaNowClient._parseentrylist(entry_list_node)
if __name__ == '__main__':
import os
c = AmebaNowClient(dict(ameba_id=os.environ['AMEBA_ID'], password=os.environ['AMEBA_PASSWORD']))
c.login()
offset, entries = c.getmytimeline()
for entry in entries:
print entry['id'], entry['nickname'], entry['text']
c.post(u'ちんこ')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment