Skip to content

Instantly share code, notes, and snippets.

@markharwood
Created November 15, 2019 11:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save markharwood/e649882e8f44d40a68f60e89ffb131bf to your computer and use it in GitHub Desktop.
Save markharwood/e649882e8f44d40a68f60e89ffb131bf to your computer and use it in GitHub Desktop.
Elasticsearch RSS feed indexer with Spacy entity extraction
http://feeds.bbci.co.uk/news/world/rss.xml
http://feeds.feedburner.com/breitbart
http://feeds.feedburner.com/time/topstories
http://feeds.foxnews.com/foxnews/latest
http://feeds.reuters.com/reuters/UKWorldNews
http://feeds.skynews.com/feeds/rss/home.xml
http://feeds.skynews.com/feeds/rss/world.xml
http://news.yahoo.com/rss/
http://rss.cnn.com/rss/cnn_topstories.rss
http://rssfeeds.usatoday.com/UsatodaycomNation-TopStories
http://www.dailymail.co.uk/news/articles.rss
http://www.nbcnews.com/id/3032091/device/rss/rss.xml
http://www.telegraph.co.uk/newsfeed/rss/news_uk.xml
http://www.theguardian.com/media/newspapers/rss
https://feeds.a.dj.com/rss/RSSWorldNews.xml
https://rss.nytimes.com/services/xml/rss/nyt/HomePage.xml
https://www.cnbc.com/id/100727362/device/rss/rss.html
https://www.ft.com/news-feed
https://www.huffingtonpost.com/section/front-page/feed
https://www.independent.co.uk/news/world/rss
https://www.politico.com/rss/politics.xml
https://www.thesun.co.uk/news/worldnews/feed
http://abcnews.go.com/abcnews/internationalheadlines
http://feeds.washingtonpost.com/rss/world
import feedparser
import html2text
from elasticsearch import helpers
from elasticsearch.client import Elasticsearch
import time
import spacy
import urllib.parse
print ("Loading spacy model")
nlp = spacy.load('en_core_web_sm')
print ("Loading feed")
indexName = "newsfeeds"
es = Elasticsearch()
indexSettings = {
"settings": {
"index.number_of_replicas": 0,
"index.number_of_shards": 1,
"analysis": {
"analyzer": {
"analyzer_shingle": {
"tokenizer": "standard",
"filter": ["standard",
"apostrophe",
"filter_case_sensitive_stop_word",
"lowercase",
"filter_case_insensitive_stop_word",
"filter_shingle",
"trim",
"my_length"]
},
"my_english_analyzer": {
"tokenizer": "standard",
"filter": [
"standard",
"lowercase",
"apostrophe"
]
},
"lowerKeyword": {
"type": "custom",
"filter": [
"lowercase"
],
"tokenizer": "keyword"
}
},
"filter": {
"filter_shingle": {
"type": "shingle",
"max_shingle_size": 2,
"min_shingle_size": 2,
"filler_token": "",
"output_unigrams": "true"
},
"filter_case_insensitive_stop_word": {
"type": "stop",
"stopwords": [
"from", "for", "with", "a","of", "to", "by", "the", "and", "has", "had", "said",
"was", "on", "this", "that", "its", "at", "or","which", "an", "in", "they", "his",
"are", "were", "as", "if", "would", "be"
]
},
"filter_case_sensitive_stop_word": {
"type": "stop",
"stopwords": [
"will", "who", "it", "It", "is"
]
},
"my_length": {
"type": "length",
"min": 2
}
}
}
},
"mappings": {
"properties": {
"id": {"type": "keyword"},
"url": {"type": "keyword"},
"headline": {"type": "annotated_text", "analyzer":"analyzer_shingle"},
"published": {"type": "date"},
"feedLink": {"type": "keyword"},
"tags":{"type":"keyword"}
}
}
}
if not es.indices.exists(index=indexName):
es.indices.create(index=indexName, body=indexSettings)
actions = []
def getIso(channel_name, item):
t = time.gmtime()
if "updated_parsed" in item and type(item["updated_parsed"]) is time.struct_time:
t = item["updated_parsed"]
elif "published_parsed" in item and type(item["published_parsed"]) is time.struct_time:
t = item["published_parsed"]
else:
print(
"Warning: An item in channel '%s' is missing both update and publish time in item data. Using processing time." % channel_name)
return time.strftime("%Y-%m-%dT%H:%M:%SZ", t)
def getEntities(text):
# Run the spacy model over the text
parsedDoc = nlp(text)
ents =[]
# See https://spacy.io/api/doc#ents
for ent in parsedDoc.ents:
if ent.label_ != "ORDINAL" and ent.label_ != "CARDINAL" and ent.label_!="DATE" \
and ent.label_!="TIME" and ent.label_!="WORK OF ART" and ent.label_!="QUANTITY" :
ents.append(ent)
# offset = 0
annotatedText = ""
lastOffset=0
entLabels = []
for token in ents:
entValue = token.text.strip()
if len(entValue) > 1:
offset = token.start_char
if offset > lastOffset:
annotatedText += text[lastOffset: offset]
if entValue not in entLabels:
entLabels.append(entValue)
# Add annotation to the text (annotations are entity type and label)
annotatedText+="["+text[offset: token.end_char]+"]"+"("+ urllib.parse.quote(entValue)+"&"+urllib.parse.quote(token.label_) +")"
offset = token.end_char
lastOffset = offset
if len(annotatedText) ==0:
annotatedText =text
else:
if lastOffset < len(text):
annotatedText += text[lastOffset: ]
return {
"entities": entLabels,
"annotated_text": annotatedText
}
numItems=0
with open("feeds.txt", "r") as feedsFile:
for feed in feedsFile:
# See https://pythonhosted.org/feedparser/reference.html
feed = feed.strip()
print ("=======", feed)
d= None
try:
d = feedparser.parse(feed)
except Exception as e:
print("Error parsing feed"+feed+":", e)
continue
if "title" not in d.feed:
print("Error parsing feed" + feed + ", missing title:", d)
continue
print (d.feed.title)
print (d.feed.link)
print (d.feed.subtitle)
for post in d.entries:
numItems=numItems+1
titleText = post.title
# Thank you independent.co.uk for html in what is supposed to be plain-text string
titleText = html2text.html2text(titleText)
tags=[]
ents={
"annotated_text":titleText
}
try:
ents = getEntities(titleText)
for ent in ents["entities"]:
tags.append(ent)
except Exception as e:
print("Exception marking up entity", e)
doc={
"headline": ents["annotated_text"],
"url": post.link,
"published":getIso(d['feed']['title'], post),
"feedLink": d['feed']['link']
}
if len(tags)>0:
doc["tags"]=tags
action = {
"_index": indexName,
'_op_type': 'index',
"_id": post.id,
"_source": doc
}
# print action
if len(post.id)>0:
actions.append(action)
if numItems % 1000 == 0:
print (numItems)
# Flush bulk indexing action if necessary
if len(actions) >= 1000:
helpers.bulk(es, actions, )
## TO check for failures and take appropriate action
del actions[0:len(actions)]
if len(actions) > 0:
helpers.bulk(es, actions)
del actions[0:len(actions)]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment