Skip to content

Instantly share code, notes, and snippets.

@IReese
Created February 22, 2019 03:28
Show Gist options
  • Save IReese/4ed20d3b054ebdd192801e7f76ab0dcf to your computer and use it in GitHub Desktop.
Save IReese/4ed20d3b054ebdd192801e7f76ab0dcf to your computer and use it in GitHub Desktop.
search_xml
import csv
import psycopg2
from bs4 import BeautifulSoup
from itertools import zip_longest
#list ids of sql infile
def get_ids():
connection = psycopg2.connect(user="postgres", password="postgres", host="localhost", port="5432", database="geonetwork")
cursor = connection.cursor()
#id_select_query = "SELECT id FROM public.metadata where id = 36344"
#id_select_query = "SELECT id FROM public.metadata LIMIT 53"
id_select_query = "SELECT id FROM public.metadata"
cursor.execute(id_select_query)
id_nums = [row[0] for row in cursor.fetchall()]
return (id_nums)
# Main
list = get_ids()
#LIST ID CHECK
print (list)
#SET EMPTY LISTS
orgnamelist = []
datasetlist = []
topiclist = []
contactlist = []
emaillist = []
dateofcreation = []
webaddresslist = []
for xml_data in list:
connection = psycopg2.connect(user="postgres", password="postgres", host="localhost", port="5432", database="geonetwork")
cursor = connection.cursor()
xml_query = "SELECT data FROM public.metadata WHERE id=%s" % (xml_data)
cursor.execute(xml_query)
xml_output = [row[0] for row in cursor.fetchall()]
xml_output_tostr = str(xml_output)
xml_clean_leadtrail = xml_output_tostr.strip("['']")
xml_clean = xml_clean_leadtrail.replace('\\r\\n','')
xml_output_doc = open("/home/ireese/TESTING/POSTGRES_Testing/xml_output.xml", "w+")
xml_output_doc.write(xml_clean)
xml_output_doc.close()
#FORMAT OUTPUT SUITABLE FOR BEAUTIFUL SOUP TO READ
temp_xml_formatted = open("/home/ireese/TESTING/POSTGRES_Testing/xml_output.xml", "r")
contents = temp_xml_formatted.read()
tempsoup = BeautifulSoup(contents, 'xml')
with open("/home/ireese/TESTING/POSTGRES_Testing/xml_output_for_reading.xml", "w") as pretty_read:
print (tempsoup.prettify, file=pretty_read)
xml_formatted = open("/home/ireese/TESTING/POSTGRES_Testing/xml_output_for_reading.xml", "r")
contents = xml_formatted.read()
soup = BeautifulSoup(contents, 'xml')
#DATA OWNER
if (soup.find_all('organisationName', limit=1)):
for orgname in (soup.find_all('organisationName', limit=1)):
print ("Title: ", orgname.text)
orgnamelist.append(orgname.text)
else:
orgnamelist.append("null")
#LAYER NAME FIND
if (soup.find_all('identificationInfo')):
for identification_find in (soup.find_all('identificationInfo')):
for citation in identification_find.find_all('citation'):
for title in citation.find_all('title'):
print ("Layer Name: ", title.text)
datasetlist.append(title.text)
else:
datasetlist.append("null")
#DATA TYPE FIND
if soup.find_all('topicCategory'):
for topic in (soup.find_all('topicCategory', limit=1)):
for md_topic in topic.find_all('MD_TopicCategoryCode'):
print ("Category: ", md_topic.text)
topic_format = md_topic.text
topiclist.append(topic_format)
else:
topiclist.append("null")
#POINT OF CONTACT NAME
if (soup.find_all('pointOfContact', limit=1)):
for pointofcontact in (soup.find_all('pointOfContact', limit=1)):
if (pointofcontact.find_all('individualName', limit=1)):
for indivname in (pointofcontact.find_all('individualName', limit=1)):
print ("Contact: ", indivname.text)
contactlist.append(indivname.text)
else:
contactlist.append("null")
else:
contactlist.append("null")
#POINT OF CONTACT EAMIL
if (soup.find_all('pointOfContact', limit=1)):
for poc_email_find in (soup.find_all('pointOfContact', limit=1)):
if (poc_email_find.find_all('electronicMailAddress', limit=1)):
for poc_email in (poc_email_find.find_all('electronicMailAddress', limit=1)):
print ("Email: ", poc_email_find.text)
emaillist.append(poc_email_find.text)
else:
emaillist.append("null")
else:
emaillist.append("null")
# DATE OF CREATION FIND
if (soup.find_all('date', limit=1)):
for doc in (soup.find_all('date', limit=1)):
if doc.find_all('DateTime'):
for datetime in doc.find_all('DateTime', limit=1):
print ("Date of Creation: ", datetime.text)
dateofcreation.append(datetime.text)
else:
dateofcreation.append("null")
else:
dateofcreation.append("null")
#WEB ADDRESS FIND
if soup.find_all('MD_DigitalTransferOptions', limit=1):
for webaddress in soup.find_all('MD_DigitalTransferOptions', limit=1):
if webaddress.find_all('URL', limit=1):
for webaddress_text in webaddress.find_all('URL', limit=1):
print ("webaddress_raw: ", webaddress_text.text)
webaddresslist.append(webaddress_text.text)
else:
webaddresslist.append("null")
else:
webaddresslist.append("null")
#write CSV row from list
print (len(orgnamelist))
print (len(datasetlist))
print (len(topiclist))
print (len(contactlist))
print (len(emaillist))
print (len(dateofcreation))
print (len(webaddresslist))
print (datasetlist)
print (topiclist)
print (contactlist)
print (emaillist)
print (dateofcreation)
print (webaddresslist)
print (orgnamelist)
#WRITE TO CSV FORMAT
cols = [orgnamelist, datasetlist, topiclist, contactlist, emaillist, dateofcreation, webaddresslist]
export_data = zip_longest(*cols, fillvalue = '')
csvfile = ("/home/ireese/TESTING/POSTGRES_Testing/csv_iii_format.csv")
outfile = open(csvfile, 'w', newline='')
writer = csv.writer(outfile, delimiter="|")
#wr = csv.writer(myfile)
writer.writerow(("organization", "dataset", "topic", "contact", "email", "date of creation", "possible web address" ))
writer.writerows(export_data)
outfile.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment