Skip to content

Instantly share code, notes, and snippets.

@edemnati
Last active August 19, 2023 18:36
Show Gist options
  • Save edemnati/65649cd1b330d8d766725dc8effe61be to your computer and use it in GitHub Desktop.
Save edemnati/65649cd1b330d8d766725dc8effe61be to your computer and use it in GitHub Desktop.
from datetime import datetime, timezone
import pandas as pd
from pandas.io.json import json_normalize
import requests
import feedparser
from bs4 import BeautifulSoup
from google.cloud import storage
from google.cloud import pubsub_v1
class news_feed_parser:
#Parse rss feed url
def parse_rss_feed(self,url):
#Read feed xml data
#Try 3 times requesting the url if error
for i in range(0,4):
try:
news_feed = feedparser.parse(url)
break
except:
print('ERROR calling URL:',url,'iter: ',(i+1) )
pass
#Flatten data
df_news_feed=json_normalize(news_feed.entries)
return df_news_feed
#Process articles dataset
def process_article(self,article_url,article_title):
#Loop to parse each rss feed article url
articles_dataset=pd.DataFrame(columns=['url','title','text'])
article_text=self.parse_article(article_url)
if article_text!=None:
articles_dataset=articles_dataset.append({'url':article_url
,'title':article_title
,'text':article_text
}, ignore_index=True,sort=False)
articles_dataset['text_length']=articles_dataset['text'].apply(lambda x : len(x))
return articles_dataset
#Build and return parent hierarchy
def get_parent_hierarchy(self,article_parents):
# Loop for each paragraph parent to extract its element name and id
parents_list=[]
for parent in article_parents:
#Extract the parent id attribute if it exists
Parent_id = ''
try:
Parent_id = parent['id']
except:
pass
# Append the parent name and id to the parents table
parents_list.append(parent.name + 'id: ' + Parent_id)
# 2.2 Construct paragraph parent hierarchy
parent_element_list = ['' if (x == 'None' or x is None) else x for x in parents_list ]
parent_element_list.reverse()
parent_hierarchy = ' -> '.join(parent_element_list)
return parent_hierarchy
#Parse url and return article text
def parse_article(self,article_url):
#Request the article url to get the web page content.
article_result = requests.get(article_url)
# 1. extract all paragraph elements inside the page body
article_content = BeautifulSoup(article_result.content, 'html.parser')
articles_body = article_content.findAll('body')
p_blocks = articles_body[0].findAll('p')
# 2. for each paragraph, construct its patents elements hierarchy
#Create a dataframe to collect p_blocks data
p_blocks_df=pd.DataFrame(columns=['element_name','parent_hierarchy','element_text','element_text_Count'])
# 2.1 loop for each paragraph block
article_text = ''
for i in range(0,len(p_blocks)):
#Get paragraph parent hierarchy
parent_hierarchy = self.get_parent_hierarchy(p_blocks[i].parents)
#Append p_blocks_df with the current paragraph data
p_blocks_df=p_blocks_df.append({"element_name":p_blocks[i].name
,"parent_hierarchy":parent_hierarchy
,"element_text":p_blocks[i].text
,"element_text_Count":len(str(p_blocks[i].text))
}
,ignore_index=True
,sort=False
)
# 3. concatenate paragraphs under the same parent hierarchy
if len(p_blocks_df)>0:
#Group paragraphs by parent_hierarchy
p_blocks_df_groupby_parent_hierarchy=p_blocks_df.groupby(by=['parent_hierarchy'])
#Sum the paragraph lenght for each paragraph group
p_blocks_df_groupby_parent_hierarchy_sum=p_blocks_df_groupby_parent_hierarchy[['element_text_Count']].sum()
p_blocks_df_groupby_parent_hierarchy_sum.reset_index(inplace=True)
# 4. select the longest paragraph as the main article
max_id=p_blocks_df_groupby_parent_hierarchy_sum.loc[p_blocks_df_groupby_parent_hierarchy_sum['element_text_Count'].idxmax()
,'parent_hierarchy']
article_text='\n'.join(p_blocks_df.loc[p_blocks_df['parent_hierarchy']==max_id,'element_text'].to_list())
#Return article text
return article_text
class gcp_cloud_util:
def __init__(self):
self._projectid="xxxxxx-xxxxxxx-xxxxxxx"
def connect_gcp_storage(self,bucketid):
#Open connection
storage_client = storage.Client(project=self._projectid)
self._bucket = storage_client.get_bucket(bucketid)
print('Connected CLoud storage: Project/Bucket: {}/{}'.format(self._projectid,bucketid))
def connect_gcp_pubsub(self,topic):
#Open connection
self.pubsub_client = pubsub_v1.PublisherClient()
#topic = 'projects/' + PROJECTID + '/topics/' + topic
self.topic = self.pubsub_client.topic_path(self._projectid, topic)
print('Connected Cloud Pub/Sub.')
def upload_from_string(self,filename,dataset):
blob = self._bucket.blob( filename )
return blob.upload_from_string( dataset)
def publish_message(self,data,url,title):
response = self.pubsub_client.publish(self.topic, data, url=url,title=title)
return response
#Cloud Function to web scrap a news article
def process_rss_feed_article(event,context):
#Try to read url attribute
try:
url = event['attributes']['url']
except:
print("Error: Missing attribute 'url' ")
raise
#Try to read title attribute
try:
title = event['attributes']['title']
except:
print("Error: Missing attribute 'title' ")
raise
#Try run the search-and-extract process
try:
#Get articles from news feed
my_rssFeed=news_feed_parser()
news_feed_data = my_rssFeed.process_article(url,title)
except:
print("Error: calling news feed url: {} ".format(url))
raise
try:
my_storage=gcp_cloud_util()
my_storage.connect_gcp_storage("xxxxxx-xxxxxxx-xxxxxxx")
except:
print("Error: connecting to Cloud Storage")
raise
#Upload news articles as Json files
print('news_feed_data len: ',len(news_feed_data))
for _, row in news_feed_data.iterrows():
try:
hash_link = hashlib.md5(row.url.encode())
filename='landing/' + str(datetime.now().isoformat()) + '_' + hash_link.hexdigest() + '.json'
dataset=row.to_json()
res=my_storage.upload_from_string(filename,dataset)
except:
print("Error: File '{}' Upload failed.")
raise
return 'OK'
#Cloud Function to web scrap a RSS news feed
def process_rss_feed_search(event,context):
#Try to read url attribute
try:
url = event['attributes']['url']
except:
print("Error: Missing attribute 'url' ")
raise
#Try run the search-and-extract process
try:
#Get articles from news feed
my_rssFeed=news_feed_parser()
news_feed_data = my_rssFeed.parse_rss_feed(url)
except:
print("Error: calling news feed url: {} ".format(url))
raise
topic='news_article_to_process'
try:
my_pubsub=gcp_cloud_util()
my_pubsub.connect_gcp_pubsub(topic)
except:
print("Error: connecting to Cloud Pub/Sub.")
raise
#Upload news articles as Json files
print('news_feed_data len: ',len(news_feed_data))
for _, row in news_feed_data.iterrows():
try:
response = my_pubsub.publish_message(b'Publish article to be processed.', url=row.link,title=row.title)
print('Publish article:\nURL: {url},\nTitle: {title}'.format(url=row.link,title=row.title))
except:
print("Error: Publication failed to topic: {}".format(topic))
raise
return 'OK'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment