Skip to content

Instantly share code, notes, and snippets.

@psychemedia
Created February 23, 2017 17:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save psychemedia/51bf0219b831bd4adec0cdb683ce30f0 to your computer and use it in GitHub Desktop.
Save psychemedia/51bf0219b831bd4adec0cdb683ce30f0 to your computer and use it in GitHub Desktop.
parlibot code - alexa
import logging
from random import randint
from flask import Flask, render_template
from flask_ask import Ask, statement, question, session
app = Flask(__name__)
application=app
ask = Ask(app, "/")
logging.getLogger("flask_ask").setLevel(logging.DEBUG)
deptMap={'FCO':'Foreign and Commonwealth Office','PM':'Prime Minister','DWP':'Department for Work and Pensions',
'DCLG':'Department for Communities and Local Government','MOD':'Ministry of Defence'}
import datetime
from dateutil.relativedelta import relativedelta
import re
def tagstrip(txt):
ts=re.compile('<.*?>')
return re.sub(ts,'', txt)
def today(date=None,iso=False):
if date is None: date=datetime.date.today()
if iso: return date.isoformat()
else: return date
def yesterday(date=None,iso=False):
if date is None: date=today()
yesterday = date.today() - relativedelta(days=1)
if iso: return yesterday.isoformat()
return yesterday
def tomorrow(date=None,iso=False):
if date is None: date=today()
tomorrow=date.today() + relativedelta(days=1)
if iso: return tomorrow.isoformat()
return tomorrow
def last_week(date=None,daterange=True,iso=False):
if date is None: date=today()
start_date = date + relativedelta(days=-date.weekday(), weeks=-1)
end_date = start_date + relativedelta(days=6)
if daterange:
if iso: return start_date.isoformat(),end_date.isoformat()
else: return start_date, end_date
def this_week(date=None,daterange=True,iso=False):
if date is None: date=today()
start_date = date + relativedelta(days=-date.weekday())
end_date = start_date + relativedelta(days=6)
if daterange:
if iso:
return start_date.isoformat(), end_date.isoformat()
else:
return start_date, end_date
def next_week(date=None,daterange=True,iso=False):
if date is None: date=today()
start_date = date + relativedelta(days=-date.weekday(), weeks=1)
end_date = start_date + relativedelta(days=6)
if daterange:
if iso:
return start_date.isoformat(), end_date.isoformat()
else:
return start_date, end_date
def later_this_week(date=None,incl=False,daterange=True,iso=False):
date=today(date)
start_date, end_date = this_week(date)
if not incl: start_date=tomorrow(date)
else: start_date=today(date)
#Really need to check if today is last day of week?
if daterange:
if iso:
return start_date.isoformat(), end_date.isoformat()
else:
return start_date, end_date
def earlier_this_week(date=None,incl=False,daterange=True,iso=False):
date=today(date)
start_date, end_date = this_week(date)
if not incl: end_date=yesterday(date)
else: end_date=today(date)
#Really need to check if today is first day of week?
if daterange:
if iso:
return start_date.isoformat(), end_date.isoformat()
else:
return start_date, end_date
def last_month(date=None,daterange=True,iso=False):
if date is None: date=today()
end_date = date.replace(day=1)- relativedelta(days=1)
start_date = end_date.replace(day=1)
if daterange:
if iso: return start_date.isoformat(),end_date.isoformat()
else: return start_date, end_date
def next_month(date=None,daterange=True,iso=False):
if date is None: date=today()
end_date = date+ relativedelta(months=2)
end_date=end_date.replace(day=1)- relativedelta(days=1)
start_date = end_date.replace(day=1)
if daterange:
if iso: return start_date.isoformat(),end_date.isoformat()
else: return start_date, end_date
def this_month(date=None,daterange=True,iso=False):
if date is None: date=today()
end_date = next_month(date)[0]- relativedelta(days=1)
start_date = end_date.replace(day=1)
if daterange:
if iso: return start_date.isoformat(),end_date.isoformat()
else: return start_date, end_date
def earlier_this_month(date=None,incl=False,daterange=True,iso=False):
if date is None: date=today()
start_date, end_date = this_month(date)
if not incl: end_date=yesterday(date)
else: end_date=today(date)
#Really need to check if today is first day of month?
if daterange:
if iso:
return start_date.isoformat(), end_date.isoformat()
else:
return start_date, end_date
def later_this_month(date=None,incl=False,daterange=True,iso=False):
if date is None: date=today()
start_date, end_date = this_month(date)
if not incl: start_date=tomorrow(date)
else: start_date=today(date)
#Really need to check if today is last day of month?
if daterange:
if iso:
return start_date.isoformat(), end_date.isoformat()
else:
return start_date, end_date
#via http://stackoverflow.com/a/2384407/454773
MON, TUE, WED, THU, FRI, SAT, SUN = range(7)
dw={'Monday':MON,'Tuesday': TUE,'Wednesday': WED,
'Thursday':THU, 'Friday':FRI,'Saturday': SAT,'Sunday': SUN}
dotw=dw.keys()
def sayday(d):
return d.strftime("%A, %B %d, %Y")
def day_lastweek(day=MON,date=None,iso=False):
if date is None: date=today()
qday= last_week()[0] + relativedelta(days=day)
if iso: return qday.isoformat()
return qday
def day_thisweek(day=MON,date=None,iso=False):
if date is None: date=today()
qday= this_week()[0] + relativedelta(days=day)
if iso: return qday.isoformat()
return qday
def day_nextweek(day=MON,date=None,iso=False):
if date is None: date=today()
qday= next_week()[0] + relativedelta(days=day)
if iso: return qday.isoformat()
return qday
from urllib.parse import urlencode
import requests
import re
import feedparser
import requests_cache
requests_cache.install_cache('test_cache', backend='sqlite', expire_after=600)
import inflect
p = inflect.engine()
class UKParliamentReader():
"""
Chat to the UK Parliament API
"""
def __init__(self):
""" Need to think more about the structure of this... """
pass
def qpatch(self,query):
t=[]
for a in query.split('or'):
t.append('({})'.format(a.strip().replace(' ',' AND ')))
return ' OR '.join(t)
def _search_one(self,query, typ='Research Papers',page=0,ps=100,noLink=True):
url='http://lda.data.parliament.uk/researchbriefings.json'
urlargs={'_view':typ,'_pageSize':ps,'_search':self.qpatch(query),'_page':page}
r=requests.get(url,params=urlargs)
data =r.json()
response=[]
if noLink:
for i in data['result']['items']:
response.append(i['title'])
if len(' '.join(response))>7000: break
else:
for i in data['result']['items']:
response.append("{} [http://researchbriefings.parliament.uk/ResearchBriefing/Summary/{}]".format( i['title'],i['identifier']['_value']))
if len(' '.join(response))>7000: break
return response
def _search_all(self,query, typ='Research Papers',ps=100):
return
def responder_search(self,query,typ='Research Papers'):
items=p.join(['One on {}'.format(i) for i in self._search_one(query, typ)])
return "I know about the following Parliamentary {typ}:\n\n {items}".format(typ=typ,items=items)[:7900]
def _getJSON(self,url):
headers={'accept': 'application/json'}
r=requests.get(url,headers=headers)
r.encoding = 'utf-8-sig'
return r.json()
def committeeMembers(self,c):
url='http://data.parliament.uk/membersdataplatform/services/mnis/members/query/House=commons%7C{}/Committees/'
urlargs={'committee':c}
members=self._getJSON(url.format(urlencode(urlargs)))
tl=[]
if members['Members'] is None: return "I don't recognise that committee name."
for m in members['Members']['Member']:
tl.append('{} ({})'.format(m['FullTitle'],m['Party']['#text']))
return 'The members of the {} are {}'.format(c,p.join(tl))
def _committeesXisOnURL(self,x):
n=urlencode({'name':x}).replace('=','*')
url='http://data.parliament.uk/membersdataplatform/services/mnis/members/query/{}/Committees/'.format(n)
return url
def _committeesRegExpXisOnURL(self,x):
regexp=re.compile(r'.*(?:committees[ (?:that|does|is)]*) (.*?)(( (:?is )?(on|sits on|sit on|a member of))|$)')
m=re.match(regexp,x)
if m:
return self._committeesXisOnURL(m.group(1))
return None
def _getXasList(self,x):
return [x] if type(x) is dict else x
def _parseMemberCommittee(self,mj):
txt=[]
ml=self._getXasList(mj['Members']['Member'])
for m in ml:
cl=[]
stub='{} ({}) is currently a member of '.format(m['FullTitle'],m['Party']['#text'])
if m['Committees'] is None or ('Committee' in m['Committees'] and not len(m['Committees']['Committee'])) :
txt.append('{} {}.'.format(stub,'no committees.'))
continue
for c in self._getXasList(m['Committees']['Committee']):
if not isinstance(c['EndDate'], str):
cl.append(c['Name'])
if not len(cl): cl=['None']
txt.append('{} {}.'.format(stub,p.join(['the {}'.format(c) for c in cl])))
txt='\n\n'.join(txt)
return txt
def responder_Q_committeesForX(self,x):
#Should do proper cleaning/fuzzy match?
x=x.rstrip("'s")
url=self._committeesXisOnURL(x)
if url is not None:
c=self._getJSON(url)
if c['Members'] is None: return "Sorry, I don't know an MP called {}".format(x)
return self._parseMemberCommittee(c)
ukpr=UKParliamentReader()
def getQuestionsAskedByPeriod(fromDate='',toDate='',currentSessionStart='2016-05-17',house='',dayOnly=False):
if fromDate=='' and toDate =='':
fromDate=currentSessionStart
url='http://www.parliament.uk/g/rss/generic/?type=QnA&pageInstanceId=87749&questionType=QuestionsOnly'
if fromDate: url='{url}&fromDate={fr}'.format(url=url,fr=fromDate)
if dayOnly and fromDate!='': toDate=fromDate
if toDate: url='{url}&toDate={to}'.format(url=url,to=toDate)
if house in ['Lords','Commons']: url='{url}&house={house}'.format(url=url,house=house)
return feedparser.parse(url)
def getQuestionsAnsweredByPeriod(fromDate='',toDate='',currentSessionStart='2016-05-17',house=None,dayOnly=False):
if fromDate=='' and toDate =='':
fromDate=currentSessionStart
url='http://www.parliament.uk/g/rss/generic/?type=QnA&pageInstanceId=87749&questionType=QuestionsWithAnswersOnly'
if fromDate: url='{url}&fromDate={fr}'.format(url=url,fr=fromDate)
if dayOnly and fromDate!='': toDate=fromDate
if toDate: url='{url}&toDate={to}'.format(url=url,to=toDate)
if house in ['Lords','Commons']: url='{url}&house={house}'.format(url=url,house=house)
return feedparser.parse(url)
def askQnAParseId(txt,iso=False):
'''
'Lords-06/09/2016 00:00:00-HL1591-Answered-20/09/2016 00:00:00'
'Lords-15/09/2016 00:00:00-HL1950-Tabled-15/09/2016 00:00:00'
'''
items=txt.split('-')
info={'house':items[0],
'tabled':datetime.datetime.strptime(items[1], '%d/%m/%Y %H:%M:%S'),
'status':items[3],
'statusdate':datetime.datetime.strptime(items[4], '%d/%m/%Y %H:%M:%S'),
'id':items[2]}
if iso:
info['tabled']=info['tabled'].date().isoformat()
info['statusdate']=info['statusdate'].date().isoformat()
return info
def probeQnA(summary):
summary=summary.replace('<br/><br/>','<br/>').split('<br/>')
return 'Q' if len(summary)==2 else 'A'
def askQnAParseSummary(summary):
summary=summary.replace('<br/><br/>','<br/>').split('<br/>')
parts={'tabled_by':'','answered_by':'','question':'','answer':''}
parts['tabled_by']=tagstrip(summary[0])
parts['question']=tagstrip(summary[1])
if len(summary)==4:
parts['answered_by']=tagstrip(summary[2])
parts['answer']=tagstrip(summary[3])
return parts
def QnABatcher(feed):
depts={}
for i in feed['entries']:
qp=askQnAParseSummary(i['summary'])
qp['question']=qp['question'].replace('Secretary of State for Business, Energy and Industrial Strategy',
'Secretary of State for Business Energy and Industrial Strategy')
dept=qp['question'].lstrip('To ask ').lstrip('the ').split(',')[0]
if dept not in depts:
depts[dept]={'count':1}#,'items':[qp]}
else:
depts[dept]['count']+=1
#depts[dept]['items'].append(qp)
return depts
def responder_QnABatcher(feed):
txt='I know about'
batch=QnABatcher(feed)
items=[]
for d in batch:
items.append('{count} for the {dept}'.format(txt=txt,count=batch[d]['count'],dept=d).replace('the Mr','Mr'))
for item in items:
txt ='{}, {}'.format(txt,item)
if len(txt)>7000:
txt ='{}. There were more.'.format(txt)
break
return txt
@app.route('/test3')
def getWrittenStatementsByPeriod(fromDate='',toDate='',currentSessionStart='2016-05-17',dayOnly=False):
if fromDate=='' and toDate =='':
fromDate=currentSessionStart
url='http://www.parliament.uk/g/rss/generic/?type=WS&pageInstanceId=95194'
if fromDate: url='{url}&fromDate={fr}'.format(url=url,fr=fromDate)
if dayOnly and fromDate!='': toDate=fromDate
if toDate: url='{url}&toDate={to}'.format(url=url,to=toDate)
return feedparser.parse(url)
@app.route('/test4')
def feedtest():
url='http://www.parliament.uk/g/rss/generic/?type=WS&pageInstanceId=95194&fromDate=2016-09-10&toDate=2016-09-15'
d = feedparser.parse(url)
return d['feed']['title']
@app.route('/test2')
def responder_getWrittenStatementsByPeriod(fromDate='',toDate='',dayOnly=False, filterby=None):
feed=getWrittenStatementsByPeriod(fromDate=fromDate,toDate=toDate,dayOnly=dayOnly)
if feed['entries']==[]:
return "I don't know of any for then.",""
ti=[]
if filterby is not None and filterby !='':
for k in deptMap.keys():
if k.lower()==filterby.lower(): filterby=deptMap[k]
items=[]
for i in feed['entries']:
if filterby.lower().replace(',','') in i['summary'][:60].lower().replace(',',''):
items.append(i)
else: items=feed['entries']
txt='I know of {cnt} written statements for then. '.format(cnt=len(items))
#Really need to find a way of paging the responses - eg "okay, there's rather a lot, doing 10 at a time".."next 10?"
for i in items:
x=tagstrip(i['summary'].split('<br/>')[0]).split(' - ')
agent=x[0]
dept=x[1]
if filterby is None: ti.append('one on {} by {} from {}'.format(i['title'],agent, dept))
else: ti.append('one on {} by {}'.format(i['title'],agent))
for tt in ti:
txt2='{} {}'.join(txt,tt)
if len(txt2)>7000:
txt2='{}. There were others.'.format(txt2)
break
return txt,txt2
@app.route('/test')
def hello_world():
return 'Hello, World!'
def statementGrabber(period='',myperiod='',filterby=None):
txt='No idea...'
tmp=''
try:
dt=datetime.datetime.strptime(period,'%Y-%m-%d')
except:
dt=None
if myperiod is not None and myperiod!='':
txt='{} by {}'.format(txt,myperiod)
dg=myperiod.split(' ')
lastweekdayguess = dg[1].strip() if dg[1].strip() in dotw and myperiod.startswith('last') else None
lastweekdayguess = dg[0].strip() if dg[0].strip() in dotw and myperiod.endswith('last week') else lastweekdayguess
if lastweekdayguess in dotw:
txt='Written statements for {}.'.format(sayday(day_lastweek(dw[lastweekdayguess])))
txt2,tmp=responder_getWrittenStatementsByPeriod(day_lastweek(dw[lastweekdayguess],iso=True),dayOnly=True,filterby=filterby)
txt='{} {}'.format(txt,txt2)
elif myperiod in ['this week','last week']:
txt='Written statements for {}.'.format(myperiod)
if myperiod=='this week': txt2,tmp=responder_getWrittenStatementsByPeriod(*this_week(iso=True),filterby=filterby)
elif myperiod=='last week': txt2,tmp=responder_getWrittenStatementsByPeriod(*last_week(iso=True),filterby=filterby)
txt='{} {}'.format(txt,txt2)
elif isinstance(dt, datetime.date):
txt='Written statements for {}.'.format(sayday(dt))
txt2,tmp=responder_getWrittenStatementsByPeriod(dt.isoformat(),dayOnly=True,filterby=filterby)
txt='{} {}'.format(txt,txt2)
tmp=tmp.replace('HM',' <say-as interpret-as="spell-out">HM</say-as>')
return txt,tmp
@ask.launch
def welcome():
welcome_msg = render_template('welcome')
return question(welcome_msg)
@ask.intent("AMAZON.StopIntent")
def bye():
bye_msg = render_template('bye')
return statement(bye_msg)
@ask.intent("AMAZON.HelpIntent")
def help():
help_msg=render_template('help')
return question(help_msg)
@ask.intent("MPCommitteeIntent")
def getCommitteeForMP(mp):
return question(ukpr.responder_Q_committeesForX(mp))
@ask.intent("CommitteeMembersIntent")
def getCommitteeMembers(committee):
return question(ukpr.committeeMembers(committee))
@ask.intent("WrittenStatementIntent")
def writtenStatement(period,myperiod):
txt,tmp=statementGrabber(period=period,myperiod=myperiod)
session.attributes['period'] = period
session.attributes['myperiod'] = myperiod
session.attributes['typ'] = 'WrittenStatementIntent'
if tmp!='': txt='{} Do you want to hear them all?'.format(txt)
else: txt="I don't know of any."
return question(txt)
@ask.intent("AllOfThemIntent")
def sayThemAll():
if "period" not in session.attributes and "myperiod" not in session.attributes:
return question("I missed that. Ask me about written statements, tabled question, written answers, library research reports or committee membership")
txt='Okay...'
period= session.attributes['period']
myperiod= session.attributes['myperiod']
typ=session.attributes['typ']
txt,tmp=statementGrabber(period=period,myperiod=myperiod)
return question(tmp)
@ask.intent("LimitByDeptIntent")
def limitByDept(dept):
txt='Okay...'
if dept.lower()=='defense':dept='defence'
period= session.attributes['period']
myperiod= session.attributes['myperiod']
typ=session.attributes['typ']
for k in deptMap.keys():
if k.lower()==dept.lower(): dept=deptMap[k]
tmp2="Okay, limiting it to {}. ".format(dept)
txt,tmp=statementGrabber(period=period,myperiod=myperiod,filterby=dept)
return question('{} {} {}'.format(tmp2,txt,tmp))
@ask.intent("WrittenQuestionIntent")
def writtenQuestions():
cc=getQuestionsAskedByPeriod(day_lastweek(WED),house='Lords')
txt=responder_QnABatcher(cc)#responder_QnABatcher(QnAbyAddressee(cc,'Business, Energy and Industrial Strategy'))
return question(txt)
@ask.intent("WrittenAnswerIntent")
def writtenAnswerss():
cc=getQuestionsAnsweredByPeriod(day_lastweek(WED),house='Lords')
txt=responder_QnABatcher(cc)#responder_QnABatcher(QnAbyAddressee(cc,'Business, Energy and Industrial Strategy'))
return question(txt)
@ask.intent("LibraryResearchReportIntent")
def researchreports(freetext):
if freetext is None or freetext=='': return question('I missed that. What research paper topic are you interested in?')
txt=ukpr.responder_search(freetext)
return question(txt)
if __name__ == '__main__':
app.run(debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment