parlibot code - alexa
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from random import randint | |
from flask import Flask, render_template | |
from flask_ask import Ask, statement, question, session | |
app = Flask(__name__) | |
application=app | |
ask = Ask(app, "/") | |
logging.getLogger("flask_ask").setLevel(logging.DEBUG) | |
deptMap={'FCO':'Foreign and Commonwealth Office','PM':'Prime Minister','DWP':'Department for Work and Pensions', | |
'DCLG':'Department for Communities and Local Government','MOD':'Ministry of Defence'} | |
import datetime | |
from dateutil.relativedelta import relativedelta | |
import re | |
def tagstrip(txt): | |
ts=re.compile('<.*?>') | |
return re.sub(ts,'', txt) | |
def today(date=None,iso=False): | |
if date is None: date=datetime.date.today() | |
if iso: return date.isoformat() | |
else: return date | |
def yesterday(date=None,iso=False): | |
if date is None: date=today() | |
yesterday = date.today() - relativedelta(days=1) | |
if iso: return yesterday.isoformat() | |
return yesterday | |
def tomorrow(date=None,iso=False): | |
if date is None: date=today() | |
tomorrow=date.today() + relativedelta(days=1) | |
if iso: return tomorrow.isoformat() | |
return tomorrow | |
def last_week(date=None,daterange=True,iso=False): | |
if date is None: date=today() | |
start_date = date + relativedelta(days=-date.weekday(), weeks=-1) | |
end_date = start_date + relativedelta(days=6) | |
if daterange: | |
if iso: return start_date.isoformat(),end_date.isoformat() | |
else: return start_date, end_date | |
def this_week(date=None,daterange=True,iso=False): | |
if date is None: date=today() | |
start_date = date + relativedelta(days=-date.weekday()) | |
end_date = start_date + relativedelta(days=6) | |
if daterange: | |
if iso: | |
return start_date.isoformat(), end_date.isoformat() | |
else: | |
return start_date, end_date | |
def next_week(date=None,daterange=True,iso=False): | |
if date is None: date=today() | |
start_date = date + relativedelta(days=-date.weekday(), weeks=1) | |
end_date = start_date + relativedelta(days=6) | |
if daterange: | |
if iso: | |
return start_date.isoformat(), end_date.isoformat() | |
else: | |
return start_date, end_date | |
def later_this_week(date=None,incl=False,daterange=True,iso=False): | |
date=today(date) | |
start_date, end_date = this_week(date) | |
if not incl: start_date=tomorrow(date) | |
else: start_date=today(date) | |
#Really need to check if today is last day of week? | |
if daterange: | |
if iso: | |
return start_date.isoformat(), end_date.isoformat() | |
else: | |
return start_date, end_date | |
def earlier_this_week(date=None,incl=False,daterange=True,iso=False): | |
date=today(date) | |
start_date, end_date = this_week(date) | |
if not incl: end_date=yesterday(date) | |
else: end_date=today(date) | |
#Really need to check if today is first day of week? | |
if daterange: | |
if iso: | |
return start_date.isoformat(), end_date.isoformat() | |
else: | |
return start_date, end_date | |
def last_month(date=None,daterange=True,iso=False): | |
if date is None: date=today() | |
end_date = date.replace(day=1)- relativedelta(days=1) | |
start_date = end_date.replace(day=1) | |
if daterange: | |
if iso: return start_date.isoformat(),end_date.isoformat() | |
else: return start_date, end_date | |
def next_month(date=None,daterange=True,iso=False): | |
if date is None: date=today() | |
end_date = date+ relativedelta(months=2) | |
end_date=end_date.replace(day=1)- relativedelta(days=1) | |
start_date = end_date.replace(day=1) | |
if daterange: | |
if iso: return start_date.isoformat(),end_date.isoformat() | |
else: return start_date, end_date | |
def this_month(date=None,daterange=True,iso=False): | |
if date is None: date=today() | |
end_date = next_month(date)[0]- relativedelta(days=1) | |
start_date = end_date.replace(day=1) | |
if daterange: | |
if iso: return start_date.isoformat(),end_date.isoformat() | |
else: return start_date, end_date | |
def earlier_this_month(date=None,incl=False,daterange=True,iso=False): | |
if date is None: date=today() | |
start_date, end_date = this_month(date) | |
if not incl: end_date=yesterday(date) | |
else: end_date=today(date) | |
#Really need to check if today is first day of month? | |
if daterange: | |
if iso: | |
return start_date.isoformat(), end_date.isoformat() | |
else: | |
return start_date, end_date | |
def later_this_month(date=None,incl=False,daterange=True,iso=False): | |
if date is None: date=today() | |
start_date, end_date = this_month(date) | |
if not incl: start_date=tomorrow(date) | |
else: start_date=today(date) | |
#Really need to check if today is last day of month? | |
if daterange: | |
if iso: | |
return start_date.isoformat(), end_date.isoformat() | |
else: | |
return start_date, end_date | |
#via http://stackoverflow.com/a/2384407/454773 | |
MON, TUE, WED, THU, FRI, SAT, SUN = range(7) | |
dw={'Monday':MON,'Tuesday': TUE,'Wednesday': WED, | |
'Thursday':THU, 'Friday':FRI,'Saturday': SAT,'Sunday': SUN} | |
dotw=dw.keys() | |
def sayday(d): | |
return d.strftime("%A, %B %d, %Y") | |
def day_lastweek(day=MON,date=None,iso=False): | |
if date is None: date=today() | |
qday= last_week()[0] + relativedelta(days=day) | |
if iso: return qday.isoformat() | |
return qday | |
def day_thisweek(day=MON,date=None,iso=False): | |
if date is None: date=today() | |
qday= this_week()[0] + relativedelta(days=day) | |
if iso: return qday.isoformat() | |
return qday | |
def day_nextweek(day=MON,date=None,iso=False): | |
if date is None: date=today() | |
qday= next_week()[0] + relativedelta(days=day) | |
if iso: return qday.isoformat() | |
return qday | |
from urllib.parse import urlencode | |
import requests | |
import re | |
import feedparser | |
import requests_cache | |
requests_cache.install_cache('test_cache', backend='sqlite', expire_after=600) | |
import inflect | |
p = inflect.engine() | |
class UKParliamentReader(): | |
""" | |
Chat to the UK Parliament API | |
""" | |
def __init__(self): | |
""" Need to think more about the structure of this... """ | |
pass | |
def qpatch(self,query): | |
t=[] | |
for a in query.split('or'): | |
t.append('({})'.format(a.strip().replace(' ',' AND '))) | |
return ' OR '.join(t) | |
def _search_one(self,query, typ='Research Papers',page=0,ps=100,noLink=True): | |
url='http://lda.data.parliament.uk/researchbriefings.json' | |
urlargs={'_view':typ,'_pageSize':ps,'_search':self.qpatch(query),'_page':page} | |
r=requests.get(url,params=urlargs) | |
data =r.json() | |
response=[] | |
if noLink: | |
for i in data['result']['items']: | |
response.append(i['title']) | |
if len(' '.join(response))>7000: break | |
else: | |
for i in data['result']['items']: | |
response.append("{} [http://researchbriefings.parliament.uk/ResearchBriefing/Summary/{}]".format( i['title'],i['identifier']['_value'])) | |
if len(' '.join(response))>7000: break | |
return response | |
def _search_all(self,query, typ='Research Papers',ps=100): | |
return | |
def responder_search(self,query,typ='Research Papers'): | |
items=p.join(['One on {}'.format(i) for i in self._search_one(query, typ)]) | |
return "I know about the following Parliamentary {typ}:\n\n {items}".format(typ=typ,items=items)[:7900] | |
def _getJSON(self,url): | |
headers={'accept': 'application/json'} | |
r=requests.get(url,headers=headers) | |
r.encoding = 'utf-8-sig' | |
return r.json() | |
def committeeMembers(self,c): | |
url='http://data.parliament.uk/membersdataplatform/services/mnis/members/query/House=commons%7C{}/Committees/' | |
urlargs={'committee':c} | |
members=self._getJSON(url.format(urlencode(urlargs))) | |
tl=[] | |
if members['Members'] is None: return "I don't recognise that committee name." | |
for m in members['Members']['Member']: | |
tl.append('{} ({})'.format(m['FullTitle'],m['Party']['#text'])) | |
return 'The members of the {} are {}'.format(c,p.join(tl)) | |
def _committeesXisOnURL(self,x): | |
n=urlencode({'name':x}).replace('=','*') | |
url='http://data.parliament.uk/membersdataplatform/services/mnis/members/query/{}/Committees/'.format(n) | |
return url | |
def _committeesRegExpXisOnURL(self,x): | |
regexp=re.compile(r'.*(?:committees[ (?:that|does|is)]*) (.*?)(( (:?is )?(on|sits on|sit on|a member of))|$)') | |
m=re.match(regexp,x) | |
if m: | |
return self._committeesXisOnURL(m.group(1)) | |
return None | |
def _getXasList(self,x): | |
return [x] if type(x) is dict else x | |
def _parseMemberCommittee(self,mj): | |
txt=[] | |
ml=self._getXasList(mj['Members']['Member']) | |
for m in ml: | |
cl=[] | |
stub='{} ({}) is currently a member of '.format(m['FullTitle'],m['Party']['#text']) | |
if m['Committees'] is None or ('Committee' in m['Committees'] and not len(m['Committees']['Committee'])) : | |
txt.append('{} {}.'.format(stub,'no committees.')) | |
continue | |
for c in self._getXasList(m['Committees']['Committee']): | |
if not isinstance(c['EndDate'], str): | |
cl.append(c['Name']) | |
if not len(cl): cl=['None'] | |
txt.append('{} {}.'.format(stub,p.join(['the {}'.format(c) for c in cl]))) | |
txt='\n\n'.join(txt) | |
return txt | |
def responder_Q_committeesForX(self,x): | |
#Should do proper cleaning/fuzzy match? | |
x=x.rstrip("'s") | |
url=self._committeesXisOnURL(x) | |
if url is not None: | |
c=self._getJSON(url) | |
if c['Members'] is None: return "Sorry, I don't know an MP called {}".format(x) | |
return self._parseMemberCommittee(c) | |
ukpr=UKParliamentReader() | |
def getQuestionsAskedByPeriod(fromDate='',toDate='',currentSessionStart='2016-05-17',house='',dayOnly=False): | |
if fromDate=='' and toDate =='': | |
fromDate=currentSessionStart | |
url='http://www.parliament.uk/g/rss/generic/?type=QnA&pageInstanceId=87749&questionType=QuestionsOnly' | |
if fromDate: url='{url}&fromDate={fr}'.format(url=url,fr=fromDate) | |
if dayOnly and fromDate!='': toDate=fromDate | |
if toDate: url='{url}&toDate={to}'.format(url=url,to=toDate) | |
if house in ['Lords','Commons']: url='{url}&house={house}'.format(url=url,house=house) | |
return feedparser.parse(url) | |
def getQuestionsAnsweredByPeriod(fromDate='',toDate='',currentSessionStart='2016-05-17',house=None,dayOnly=False): | |
if fromDate=='' and toDate =='': | |
fromDate=currentSessionStart | |
url='http://www.parliament.uk/g/rss/generic/?type=QnA&pageInstanceId=87749&questionType=QuestionsWithAnswersOnly' | |
if fromDate: url='{url}&fromDate={fr}'.format(url=url,fr=fromDate) | |
if dayOnly and fromDate!='': toDate=fromDate | |
if toDate: url='{url}&toDate={to}'.format(url=url,to=toDate) | |
if house in ['Lords','Commons']: url='{url}&house={house}'.format(url=url,house=house) | |
return feedparser.parse(url) | |
def askQnAParseId(txt,iso=False): | |
''' | |
'Lords-06/09/2016 00:00:00-HL1591-Answered-20/09/2016 00:00:00' | |
'Lords-15/09/2016 00:00:00-HL1950-Tabled-15/09/2016 00:00:00' | |
''' | |
items=txt.split('-') | |
info={'house':items[0], | |
'tabled':datetime.datetime.strptime(items[1], '%d/%m/%Y %H:%M:%S'), | |
'status':items[3], | |
'statusdate':datetime.datetime.strptime(items[4], '%d/%m/%Y %H:%M:%S'), | |
'id':items[2]} | |
if iso: | |
info['tabled']=info['tabled'].date().isoformat() | |
info['statusdate']=info['statusdate'].date().isoformat() | |
return info | |
def probeQnA(summary): | |
summary=summary.replace('<br/><br/>','<br/>').split('<br/>') | |
return 'Q' if len(summary)==2 else 'A' | |
def askQnAParseSummary(summary): | |
summary=summary.replace('<br/><br/>','<br/>').split('<br/>') | |
parts={'tabled_by':'','answered_by':'','question':'','answer':''} | |
parts['tabled_by']=tagstrip(summary[0]) | |
parts['question']=tagstrip(summary[1]) | |
if len(summary)==4: | |
parts['answered_by']=tagstrip(summary[2]) | |
parts['answer']=tagstrip(summary[3]) | |
return parts | |
def QnABatcher(feed): | |
depts={} | |
for i in feed['entries']: | |
qp=askQnAParseSummary(i['summary']) | |
qp['question']=qp['question'].replace('Secretary of State for Business, Energy and Industrial Strategy', | |
'Secretary of State for Business Energy and Industrial Strategy') | |
dept=qp['question'].lstrip('To ask ').lstrip('the ').split(',')[0] | |
if dept not in depts: | |
depts[dept]={'count':1}#,'items':[qp]} | |
else: | |
depts[dept]['count']+=1 | |
#depts[dept]['items'].append(qp) | |
return depts | |
def responder_QnABatcher(feed): | |
txt='I know about' | |
batch=QnABatcher(feed) | |
items=[] | |
for d in batch: | |
items.append('{count} for the {dept}'.format(txt=txt,count=batch[d]['count'],dept=d).replace('the Mr','Mr')) | |
for item in items: | |
txt ='{}, {}'.format(txt,item) | |
if len(txt)>7000: | |
txt ='{}. There were more.'.format(txt) | |
break | |
return txt | |
@app.route('/test3') | |
def getWrittenStatementsByPeriod(fromDate='',toDate='',currentSessionStart='2016-05-17',dayOnly=False): | |
if fromDate=='' and toDate =='': | |
fromDate=currentSessionStart | |
url='http://www.parliament.uk/g/rss/generic/?type=WS&pageInstanceId=95194' | |
if fromDate: url='{url}&fromDate={fr}'.format(url=url,fr=fromDate) | |
if dayOnly and fromDate!='': toDate=fromDate | |
if toDate: url='{url}&toDate={to}'.format(url=url,to=toDate) | |
return feedparser.parse(url) | |
@app.route('/test4') | |
def feedtest(): | |
url='http://www.parliament.uk/g/rss/generic/?type=WS&pageInstanceId=95194&fromDate=2016-09-10&toDate=2016-09-15' | |
d = feedparser.parse(url) | |
return d['feed']['title'] | |
@app.route('/test2') | |
def responder_getWrittenStatementsByPeriod(fromDate='',toDate='',dayOnly=False, filterby=None): | |
feed=getWrittenStatementsByPeriod(fromDate=fromDate,toDate=toDate,dayOnly=dayOnly) | |
if feed['entries']==[]: | |
return "I don't know of any for then.","" | |
ti=[] | |
if filterby is not None and filterby !='': | |
for k in deptMap.keys(): | |
if k.lower()==filterby.lower(): filterby=deptMap[k] | |
items=[] | |
for i in feed['entries']: | |
if filterby.lower().replace(',','') in i['summary'][:60].lower().replace(',',''): | |
items.append(i) | |
else: items=feed['entries'] | |
txt='I know of {cnt} written statements for then. '.format(cnt=len(items)) | |
#Really need to find a way of paging the responses - eg "okay, there's rather a lot, doing 10 at a time".."next 10?" | |
for i in items: | |
x=tagstrip(i['summary'].split('<br/>')[0]).split(' - ') | |
agent=x[0] | |
dept=x[1] | |
if filterby is None: ti.append('one on {} by {} from {}'.format(i['title'],agent, dept)) | |
else: ti.append('one on {} by {}'.format(i['title'],agent)) | |
for tt in ti: | |
txt2='{} {}'.join(txt,tt) | |
if len(txt2)>7000: | |
txt2='{}. There were others.'.format(txt2) | |
break | |
return txt,txt2 | |
@app.route('/test') | |
def hello_world(): | |
return 'Hello, World!' | |
def statementGrabber(period='',myperiod='',filterby=None): | |
txt='No idea...' | |
tmp='' | |
try: | |
dt=datetime.datetime.strptime(period,'%Y-%m-%d') | |
except: | |
dt=None | |
if myperiod is not None and myperiod!='': | |
txt='{} by {}'.format(txt,myperiod) | |
dg=myperiod.split(' ') | |
lastweekdayguess = dg[1].strip() if dg[1].strip() in dotw and myperiod.startswith('last') else None | |
lastweekdayguess = dg[0].strip() if dg[0].strip() in dotw and myperiod.endswith('last week') else lastweekdayguess | |
if lastweekdayguess in dotw: | |
txt='Written statements for {}.'.format(sayday(day_lastweek(dw[lastweekdayguess]))) | |
txt2,tmp=responder_getWrittenStatementsByPeriod(day_lastweek(dw[lastweekdayguess],iso=True),dayOnly=True,filterby=filterby) | |
txt='{} {}'.format(txt,txt2) | |
elif myperiod in ['this week','last week']: | |
txt='Written statements for {}.'.format(myperiod) | |
if myperiod=='this week': txt2,tmp=responder_getWrittenStatementsByPeriod(*this_week(iso=True),filterby=filterby) | |
elif myperiod=='last week': txt2,tmp=responder_getWrittenStatementsByPeriod(*last_week(iso=True),filterby=filterby) | |
txt='{} {}'.format(txt,txt2) | |
elif isinstance(dt, datetime.date): | |
txt='Written statements for {}.'.format(sayday(dt)) | |
txt2,tmp=responder_getWrittenStatementsByPeriod(dt.isoformat(),dayOnly=True,filterby=filterby) | |
txt='{} {}'.format(txt,txt2) | |
tmp=tmp.replace('HM',' <say-as interpret-as="spell-out">HM</say-as>') | |
return txt,tmp | |
@ask.launch | |
def welcome(): | |
welcome_msg = render_template('welcome') | |
return question(welcome_msg) | |
@ask.intent("AMAZON.StopIntent") | |
def bye(): | |
bye_msg = render_template('bye') | |
return statement(bye_msg) | |
@ask.intent("AMAZON.HelpIntent") | |
def help(): | |
help_msg=render_template('help') | |
return question(help_msg) | |
@ask.intent("MPCommitteeIntent") | |
def getCommitteeForMP(mp): | |
return question(ukpr.responder_Q_committeesForX(mp)) | |
@ask.intent("CommitteeMembersIntent") | |
def getCommitteeMembers(committee): | |
return question(ukpr.committeeMembers(committee)) | |
@ask.intent("WrittenStatementIntent") | |
def writtenStatement(period,myperiod): | |
txt,tmp=statementGrabber(period=period,myperiod=myperiod) | |
session.attributes['period'] = period | |
session.attributes['myperiod'] = myperiod | |
session.attributes['typ'] = 'WrittenStatementIntent' | |
if tmp!='': txt='{} Do you want to hear them all?'.format(txt) | |
else: txt="I don't know of any." | |
return question(txt) | |
@ask.intent("AllOfThemIntent") | |
def sayThemAll(): | |
if "period" not in session.attributes and "myperiod" not in session.attributes: | |
return question("I missed that. Ask me about written statements, tabled question, written answers, library research reports or committee membership") | |
txt='Okay...' | |
period= session.attributes['period'] | |
myperiod= session.attributes['myperiod'] | |
typ=session.attributes['typ'] | |
txt,tmp=statementGrabber(period=period,myperiod=myperiod) | |
return question(tmp) | |
@ask.intent("LimitByDeptIntent") | |
def limitByDept(dept): | |
txt='Okay...' | |
if dept.lower()=='defense':dept='defence' | |
period= session.attributes['period'] | |
myperiod= session.attributes['myperiod'] | |
typ=session.attributes['typ'] | |
for k in deptMap.keys(): | |
if k.lower()==dept.lower(): dept=deptMap[k] | |
tmp2="Okay, limiting it to {}. ".format(dept) | |
txt,tmp=statementGrabber(period=period,myperiod=myperiod,filterby=dept) | |
return question('{} {} {}'.format(tmp2,txt,tmp)) | |
@ask.intent("WrittenQuestionIntent") | |
def writtenQuestions(): | |
cc=getQuestionsAskedByPeriod(day_lastweek(WED),house='Lords') | |
txt=responder_QnABatcher(cc)#responder_QnABatcher(QnAbyAddressee(cc,'Business, Energy and Industrial Strategy')) | |
return question(txt) | |
@ask.intent("WrittenAnswerIntent") | |
def writtenAnswerss(): | |
cc=getQuestionsAnsweredByPeriod(day_lastweek(WED),house='Lords') | |
txt=responder_QnABatcher(cc)#responder_QnABatcher(QnAbyAddressee(cc,'Business, Energy and Industrial Strategy')) | |
return question(txt) | |
@ask.intent("LibraryResearchReportIntent") | |
def researchreports(freetext): | |
if freetext is None or freetext=='': return question('I missed that. What research paper topic are you interested in?') | |
txt=ukpr.responder_search(freetext) | |
return question(txt) | |
if __name__ == '__main__': | |
app.run(debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment