Skip to content

Instantly share code, notes, and snippets.

@suatatan
Last active December 22, 2020 14:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save suatatan/7b8d2cd2f20812cb3caa0c4bf41b8558 to your computer and use it in GitHub Desktop.
Save suatatan/7b8d2cd2f20812cb3caa0c4bf41b8558 to your computer and use it in GitHub Desktop.
To convert jsons fetched by the newsplease package to SQLite db:
import json
import os
import pandas as pd
import datetime
import sqlite3
import string
import time
import timeit
import pathlib
import os
import textwrap
import inspect
class NewsPleaseParser:
def __init__(self,
CONFIG_DEFAULT_TABLE_NAME = 'tab_headline',
CONFIG_SQLITEDB_URL = "newspleasearchive.sqlite",
CONFIG_EASY_DEBUG = True,
CONFIG_TEST_VAR = "suatatan",
CONFIG_TEST_MODE = False,
CONFIG_FULL_TEXT_MODE = True,
):
self.CONFIG_DEFAULT_TABLE_NAME = CONFIG_DEFAULT_TABLE_NAME
self.CONFIG_SQLITEDB_URL = CONFIG_SQLITEDB_URL
self.CONFIG_EASY_DEBUG = CONFIG_EASY_DEBUG # If true all messages will shown
self.CONFIG_TEST_MODE = CONFIG_TEST_MODE # When it is True some long tasks become shorter to debug
self.CONFIG_FULL_TEXT_MODE = CONFIG_FULL_TEXT_MODE
self.CONFIG_TEST_VAR = CONFIG_TEST_VAR
self.easydebug(f"""CONFIGURATION:
CONFIG_DEFAULT_TABLE_NAME = {CONFIG_DEFAULT_TABLE_NAME},
CONFIG_SQLITEDB_URL = {CONFIG_SQLITEDB_URL}
CONFIG_EASY_DEBUG = {CONFIG_EASY_DEBUG}
CONFIG_TEST_VAR = {CONFIG_TEST_VAR}
CONFIG_TEST_MODE = {CONFIG_TEST_MODE}
CONFIG_FULL_TEXT_MODE = {CONFIG_FULL_TEXT_MODE}""")
def easydebug(self,msg = None,show = None):
show = self.CONFIG_EASY_DEBUG if show == None else show
if show:
print(msg)
else:
pass
def _private_generate_table_if_dont_exists(self, table_name = None,
sqlite_db_url = None):
"""Creates tab_headline table if it doesn't exist
Parameters:
------------
table_name: str,optional
The table name that will be generate in the database
sqlite_db_url : str, optional
The file path of the SQLite file
"""
sqlite_db_url = self.CONFIG_SQLITEDB_URL if sqlite_db_url == None else sqlite_db_url
table_name = self.CONFIG_DEFAULT_TABLE_NAME if table_name == None else table_name
try:
vt = sqlite3.connect(sqlite_db_url)
im = vt.cursor()
sql = f"""CREATE TABLE IF NOT EXISTS {table_name}(
headline_id INTEGER PRIMARY KEY AUTOINCREMENT,
headline TEXT,
fulltext TEXT,
date TEXT,
source TEXT,
source_name TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP);"""
im.execute(sql)
vt.commit()
vt.close()
return True
except Exception as ex:
self.easydebug("Error: Giardino "+str(ex))
return False
def check_whether_headline_exist_in_archive(self, source,
table_name = None,
sqlite_db_url = None):
"""Check whether any headline url is exist in archive, if any result exist it returns True"""
sqlite_db_url = self.CONFIG_SQLITEDB_URL if sqlite_db_url == None else sqlite_db_url
table_name = self.CONFIG_DEFAULT_TABLE_NAME if table_name == None else table_name
self.easydebug("check_whether_headline_exist_in_archive runned")
vt = sqlite3.connect(sqlite_db_url)
im = vt.cursor()
sql = f"select * from {table_name} where source = '{source}'"
im.execute(sql)
result = im.fetchone()
ret = result != None
vt.close()
if ret:
msg = f"item is exists {source}"
else:
msg = f"item is not exists {source}"
self.easydebug(msg)
return ret
def save_news_item_to_sqlite(self,headline,
fulltext = None,
date = None,
source = None,
source_name = "unknown_source",
table_name=None,
sqlite_db_url = None):
"""Save single headline to table in the database. If there is not any table it will create."""
sqlite_db_url = self.CONFIG_SQLITEDB_URL if sqlite_db_url == None else sqlite_db_url
table_name = self.CONFIG_DEFAULT_TABLE_NAME if table_name == None else table_name
try:
# for first run
self._private_generate_table_if_dont_exists()
is_source_does_not_exists_before = not(self.check_whether_headline_exist_in_archive(source))
# is_source_does_not_exists_before = True
if is_source_does_not_exists_before:
#item does not exists
vt = sqlite3.connect(sqlite_db_url)
im = vt.cursor()
sql = f"insert into {table_name}(headline,fulltext,date,source,source_name) values(?,?,?,?,?)"
veri = (headline,fulltext,date,source,source_name)
im.execute(sql,veri)
vt.commit()
vt.close()
self.easydebug(f"Item saved: Title: {headline}")
return True
else:
# source is already exists
self.easydebug(f"Source is already exists, passed: {source}")
return False
except Exception as ex:
self.easydebug(f"""Error: Aeroplane,
Hey where is your db and table check it!:
sqlite_db_url: {sqlite_db_url} """ + str(ex))
return False
def collect_json(self):
dizin = os.listdir(".")
for dosya in dizin:
if ".html.json" in dosya:
with open(dosya, 'r') as myfile:
data=myfile.read()
obj = json.loads(data)
baslik = str(obj['title'])
fulltext = str(obj['description'])
date = str(obj['date_publish'])
source_name = "haberler.com"
source = str(obj['url'])
self.save_news_item_to_sqlite(headline = baslik,table_name="tab_headline",fulltext=fulltext,date=date,source=source,source_name=source_name)
self.easydebug(baslik)
# Tests
npp = NewsPleaseParser()
npp.collect_json()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment