Last active
August 19, 2023 18:36
-
-
Save edemnati/65649cd1b330d8d766725dc8effe61be to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime, timezone | |
import pandas as pd | |
from pandas.io.json import json_normalize | |
import requests | |
import feedparser | |
from bs4 import BeautifulSoup | |
from google.cloud import storage | |
from google.cloud import pubsub_v1 | |
class news_feed_parser: | |
#Parse rss feed url | |
def parse_rss_feed(self,url): | |
#Read feed xml data | |
#Try 3 times requesting the url if error | |
for i in range(0,4): | |
try: | |
news_feed = feedparser.parse(url) | |
break | |
except: | |
print('ERROR calling URL:',url,'iter: ',(i+1) ) | |
pass | |
#Flatten data | |
df_news_feed=json_normalize(news_feed.entries) | |
return df_news_feed | |
#Process articles dataset | |
def process_article(self,article_url,article_title): | |
#Loop to parse each rss feed article url | |
articles_dataset=pd.DataFrame(columns=['url','title','text']) | |
article_text=self.parse_article(article_url) | |
if article_text!=None: | |
articles_dataset=articles_dataset.append({'url':article_url | |
,'title':article_title | |
,'text':article_text | |
}, ignore_index=True,sort=False) | |
articles_dataset['text_length']=articles_dataset['text'].apply(lambda x : len(x)) | |
return articles_dataset | |
#Build and return parent hierarchy | |
def get_parent_hierarchy(self,article_parents): | |
# Loop for each paragraph parent to extract its element name and id | |
parents_list=[] | |
for parent in article_parents: | |
#Extract the parent id attribute if it exists | |
Parent_id = '' | |
try: | |
Parent_id = parent['id'] | |
except: | |
pass | |
# Append the parent name and id to the parents table | |
parents_list.append(parent.name + 'id: ' + Parent_id) | |
# 2.2 Construct paragraph parent hierarchy | |
parent_element_list = ['' if (x == 'None' or x is None) else x for x in parents_list ] | |
parent_element_list.reverse() | |
parent_hierarchy = ' -> '.join(parent_element_list) | |
return parent_hierarchy | |
#Parse url and return article text | |
def parse_article(self,article_url): | |
#Request the article url to get the web page content. | |
article_result = requests.get(article_url) | |
# 1. extract all paragraph elements inside the page body | |
article_content = BeautifulSoup(article_result.content, 'html.parser') | |
articles_body = article_content.findAll('body') | |
p_blocks = articles_body[0].findAll('p') | |
# 2. for each paragraph, construct its patents elements hierarchy | |
#Create a dataframe to collect p_blocks data | |
p_blocks_df=pd.DataFrame(columns=['element_name','parent_hierarchy','element_text','element_text_Count']) | |
# 2.1 loop for each paragraph block | |
article_text = '' | |
for i in range(0,len(p_blocks)): | |
#Get paragraph parent hierarchy | |
parent_hierarchy = self.get_parent_hierarchy(p_blocks[i].parents) | |
#Append p_blocks_df with the current paragraph data | |
p_blocks_df=p_blocks_df.append({"element_name":p_blocks[i].name | |
,"parent_hierarchy":parent_hierarchy | |
,"element_text":p_blocks[i].text | |
,"element_text_Count":len(str(p_blocks[i].text)) | |
} | |
,ignore_index=True | |
,sort=False | |
) | |
# 3. concatenate paragraphs under the same parent hierarchy | |
if len(p_blocks_df)>0: | |
#Group paragraphs by parent_hierarchy | |
p_blocks_df_groupby_parent_hierarchy=p_blocks_df.groupby(by=['parent_hierarchy']) | |
#Sum the paragraph lenght for each paragraph group | |
p_blocks_df_groupby_parent_hierarchy_sum=p_blocks_df_groupby_parent_hierarchy[['element_text_Count']].sum() | |
p_blocks_df_groupby_parent_hierarchy_sum.reset_index(inplace=True) | |
# 4. select the longest paragraph as the main article | |
max_id=p_blocks_df_groupby_parent_hierarchy_sum.loc[p_blocks_df_groupby_parent_hierarchy_sum['element_text_Count'].idxmax() | |
,'parent_hierarchy'] | |
article_text='\n'.join(p_blocks_df.loc[p_blocks_df['parent_hierarchy']==max_id,'element_text'].to_list()) | |
#Return article text | |
return article_text | |
class gcp_cloud_util: | |
def __init__(self): | |
self._projectid="xxxxxx-xxxxxxx-xxxxxxx" | |
def connect_gcp_storage(self,bucketid): | |
#Open connection | |
storage_client = storage.Client(project=self._projectid) | |
self._bucket = storage_client.get_bucket(bucketid) | |
print('Connected CLoud storage: Project/Bucket: {}/{}'.format(self._projectid,bucketid)) | |
def connect_gcp_pubsub(self,topic): | |
#Open connection | |
self.pubsub_client = pubsub_v1.PublisherClient() | |
#topic = 'projects/' + PROJECTID + '/topics/' + topic | |
self.topic = self.pubsub_client.topic_path(self._projectid, topic) | |
print('Connected Cloud Pub/Sub.') | |
def upload_from_string(self,filename,dataset): | |
blob = self._bucket.blob( filename ) | |
return blob.upload_from_string( dataset) | |
def publish_message(self,data,url,title): | |
response = self.pubsub_client.publish(self.topic, data, url=url,title=title) | |
return response | |
#Cloud Function to web scrap a news article | |
def process_rss_feed_article(event,context): | |
#Try to read url attribute | |
try: | |
url = event['attributes']['url'] | |
except: | |
print("Error: Missing attribute 'url' ") | |
raise | |
#Try to read title attribute | |
try: | |
title = event['attributes']['title'] | |
except: | |
print("Error: Missing attribute 'title' ") | |
raise | |
#Try run the search-and-extract process | |
try: | |
#Get articles from news feed | |
my_rssFeed=news_feed_parser() | |
news_feed_data = my_rssFeed.process_article(url,title) | |
except: | |
print("Error: calling news feed url: {} ".format(url)) | |
raise | |
try: | |
my_storage=gcp_cloud_util() | |
my_storage.connect_gcp_storage("xxxxxx-xxxxxxx-xxxxxxx") | |
except: | |
print("Error: connecting to Cloud Storage") | |
raise | |
#Upload news articles as Json files | |
print('news_feed_data len: ',len(news_feed_data)) | |
for _, row in news_feed_data.iterrows(): | |
try: | |
hash_link = hashlib.md5(row.url.encode()) | |
filename='landing/' + str(datetime.now().isoformat()) + '_' + hash_link.hexdigest() + '.json' | |
dataset=row.to_json() | |
res=my_storage.upload_from_string(filename,dataset) | |
except: | |
print("Error: File '{}' Upload failed.") | |
raise | |
return 'OK' | |
#Cloud Function to web scrap a RSS news feed | |
def process_rss_feed_search(event,context): | |
#Try to read url attribute | |
try: | |
url = event['attributes']['url'] | |
except: | |
print("Error: Missing attribute 'url' ") | |
raise | |
#Try run the search-and-extract process | |
try: | |
#Get articles from news feed | |
my_rssFeed=news_feed_parser() | |
news_feed_data = my_rssFeed.parse_rss_feed(url) | |
except: | |
print("Error: calling news feed url: {} ".format(url)) | |
raise | |
topic='news_article_to_process' | |
try: | |
my_pubsub=gcp_cloud_util() | |
my_pubsub.connect_gcp_pubsub(topic) | |
except: | |
print("Error: connecting to Cloud Pub/Sub.") | |
raise | |
#Upload news articles as Json files | |
print('news_feed_data len: ',len(news_feed_data)) | |
for _, row in news_feed_data.iterrows(): | |
try: | |
response = my_pubsub.publish_message(b'Publish article to be processed.', url=row.link,title=row.title) | |
print('Publish article:\nURL: {url},\nTitle: {title}'.format(url=row.link,title=row.title)) | |
except: | |
print("Error: Publication failed to topic: {}".format(topic)) | |
raise | |
return 'OK' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment