Skip to content

Instantly share code, notes, and snippets.

@jmkim
Created December 5, 2021 22:05
Show Gist options
  • Save jmkim/1f2c71dd1309dc933bd5be0511f02fa1 to your computer and use it in GitHub Desktop.
Save jmkim/1f2c71dd1309dc933bd5be0511f02fa1 to your computer and use it in GitHub Desktop.
NIFS crawler
import bs4
import requests
import collections
import copy
import time
URL = 'https://www.nifs.go.kr/page?id=aq_seafood_2_7&type=tot&from=totList&fim_col_id=2018-MF0006689-6-D01'
#URL = 'https://www.nifs.go.kr/page?id=aq_seafood_2_7&fim_col_id=2018-MF0006689-Z-D33&mode=all'
RULES = {
"일반성분": 1,
"비타민": 1,
"지방산": 1,
"아미노산": 2,
"무기질": 1,
"핵산": 1,
"베타인류": 1
}
RULES_NUM_CHARS_COMPARE = 2
CSV_PATH = "nifs-" + time.strftime("%Y%m%d-%H%M%S", time.localtime()) + ".csv"
# HTML Tabular Manager
# Manage the HTML tabular data (th and td), as a table.
# Concept:
# Description:
# Fill the data table with column name.
# Add the column name one by one, horizontally (right direction). = Function namely: add_col
# When one line done, commit the line and continue to next line. = Function namely: new_row
# When the data table is logically broken
# (i.e. table data is divided into multiple tables),
# first, create each HTMLTabularDataManager for all the broken table data (th or td),
# second, merge those multiple HTMLTabularDataManager into one. = Function namely: merge
# Terms:
# Tabular data: The data inside table row (tr), namely: th and td (respectively, table header and table data)
# Data table: The table which contains tabular data of the table.
# Cursor: Current write position.
# There are two cursors; Vertical cursor and Horizontal cursor.
# Functions:
# add_col: Add the column name, horizontally (right direction)
# new_row: Commit current tabular data line
# Move the cursor to next line's first (down direction with the most left empty position)
# merge: Merge with another HTMLTabularDataManager
class HTMLTabularDataManager:
def __init__(self):
self.data = {} # Tabular data table (aka. The data table)
self.data[0] = {} # Data table's first row initialization
self.vert_cur = 0 # Vertical cursor (current write position) of the header table
self.hori_cur = 0 # Horizontal cursor (current write position) of the header table
# Add the column name, horizontally (right direction)
def add_col(self, colname, rowspan, colspan):
# Check if the cursor is empty
while self.data[self.vert_cur].get(self.hori_cur) is not None:
# If the cursor is already written, skip and find the nearest right side empty position. Let's write there
self.hori_cur += 1
# Add the column name to the data table
self.data[self.vert_cur][self.hori_cur] = colname
# Do the row-wise spanning
self.span_row(rowspan, colspan)
# Do the column-wise spanning
self.span_col(colspan, False)
# Mark the cursor is written; move the cursor to right position
self.hori_cur += 1
# Rowspan procedure
# Copy the column name vertically (down direction) in the data table, "rowspan" times
def span_row(self, rowspan, colspan=0):
if type(rowspan) is str: # If str, convert to int
rowspan = int(rowspan)
if type(rowspan) is int: # Deal only with int
colname = self.data[self.vert_cur][self.hori_cur]
# Rowspan procedure starts
for row_i in range(1, rowspan):
if self.data.get(self.vert_cur + row_i) is None:
self.data[self.vert_cur + row_i] = {}
self.data[self.vert_cur + row_i][self.hori_cur] = colname
#print("vs", self.data, "\n")
# Do the column-wise spanning for each spanned rows
self.span_col(colspan, True, row_i)
# Rowspan procedure ends
# Colspan procedure
# Copy the column name horizontally (right direction) in the data table, "rowspan" times
def span_col(self, colspan, is_dummy=False, row_i=0):
if type(colspan) is str: # If str, convert to int
colspan = int(colspan)
if type(colspan) is int: # Deal only with int
colname = self.data[self.vert_cur][self.hori_cur]
if is_dummy:
# Colspan procedure starts (without updating the cursor)
for col_i in range(1, colspan):
self.data[self.vert_cur + row_i][self.hori_cur + col_i] = colname
# Colspan procedure ends
else:
# Colspan procedure starts (with updating the cursor)
for _ in range(1, colspan):
self.hori_cur += 1
self.data[self.vert_cur][self.hori_cur] = colname
# Colspan procedure ends
# Commit current tabular data line
# Move the cursor to next line's first (down direction with the most left empty position)
def newrow(self):
if len(self.data.get(self.vert_cur)) > 0:
self.vert_cur += 1
if self.data.get(self.vert_cur) is None:
self.data[self.vert_cur] = {}
self.hori_cur = 0
# Return the vertical size of the data table
def vert_size(self):
return len(self.data)
# Return the horizontal size of the data table
def hori_size(self):
hori_size_arr = [0] * self.vert_size()
for row_i in range(self.vert_size()):
if type(self.data[row_i]) is dict and len(self.data[row_i]) > 0:
hori_size_arr[row_i] = max(self.data[row_i]) + 1 # Column index starts with 0, so row size should start with 1
else:
hori_size_arr[row_i] = 0 # Empty row
return max(hori_size_arr)
# Return the horizontal keys of the data table
def hori_keys(self):
hori_keys_arr = []
for row_i in range(self.vert_size()):
hori_keys_arr.extend(self.data[row_i].keys())
return set(hori_keys_arr)
# Merge with another HTMLTabularDataManager
def merge(self, right, left_keys=None, padding=0):
if left_keys is None:
left_keys = self.hori_keys()
if self.vert_size() != right.vert_size():
raise ValueError("Size mismatch (", self.vert_size, " != ", right.vert_size, ")")
left_hori_size = len(left_keys)
for row_i in range(self.vert_size()):
# Fill the left dummy column data, which is in left_keys but not yet in the data table
for left_k in left_keys:
if self.data[row_i].get(left_k) is None:
self.data[row_i][left_k] = ""
# Append the right column data
for right_col_k, right_col_v in right.data[row_i].items():
self.data[row_i][left_hori_size + right_col_k + padding] = right_col_v
# Return the data table into JSON string
def to_json(self):
ret_str = "[\n"
sep_outer = " "
for row in self.data.values():
ret_str += sep_outer
ret_str += "[ "
sep_inner = ""
for col_k, col_v in collections.OrderedDict(sorted(row.items())).items():
ret_str += sep_inner
ret_str += "\""
ret_str += str(col_v)
ret_str += "\""
sep_inner = ", "
ret_str += " ]"
sep_outer = ",\n "
ret_str += "\n]"
return ret_str
# Return the data table into JSON string (with key)
def to_json_with_key(self):
ret_str = "[\n"
sep_outer = " "
for row in self.data.values():
ret_str += sep_outer
ret_str += "{ "
sep_inner = ""
for col_k, col_v in collections.OrderedDict(sorted(row.items())).items():
ret_str += sep_inner
ret_str += "\""
ret_str += str(col_k)
ret_str += "\": \""
ret_str += str(col_v)
ret_str += "\""
sep_inner = ", "
ret_str += " }"
sep_outer = ",\n "
ret_str += "\n]"
return ret_str
# Return the data table into CSV string (multiple lines)
def to_csv(self, hori_keys=None):
if hori_keys is None:
hori_keys = self.hori_keys()
ret_str = ""
for row in self.data.values():
sep = ""
for col_k in hori_keys:
ret_str += sep
ret_str += "\""
if row.get(col_k) is not None:
ret_str += str(row[col_k])
ret_str += "\""
sep = ","
return ret_str
# Return the data table into CSV string (aggregate into one line)
# i.e. If the data table has 3 vertical size data: "A","B"
# "C","D"
# "E","F"
# then print the data table with seperator "-" as: "A-C-E","B-D-F"
def to_csv_single_line(self, seperator="-", hori_keys=None):
if hori_keys is None:
hori_keys = self.hori_keys()
colnames_arr = {}
for row in self.data.values():
for col_k in hori_keys:
if col_k not in colnames_arr:
colnames_arr[col_k] = []
if row.get(col_k) is not None:
colnames_arr[col_k].append(row[col_k])
ret_str = ""
sep_outer = ""
for colname in collections.OrderedDict(sorted(colnames_arr.items())).values():
prev = ""
ret_str += sep_outer
ret_str += "\""
sep_inner = ""
for col in colname:
if prev == col:
continue
prev = col
ret_str += sep_inner
ret_str += col
sep_inner = seperator
ret_str += "\""
sep_outer = ","
return ret_str
# Internal interface for Python: str
def __str__(self):
return self.to_json()
# Internal interface for Python: repr
def __repr__(self):
return self.to_json_with_key()
def nifs_parse_table(table):
if type(table) is not bs4.element.Tag:
raise ValueError("table is not a type of bs4.element.Tag")
head_arr = {}
data_arr = {}
is_data_row = False
table_seq = 0
head_arr[0] = HTMLTabularDataManager()
data_arr[0] = HTMLTabularDataManager()
rows = table.find_all('tr')
for row in rows:
cols = row.find_all('th')
if len(cols) == 0:
is_data_row = True
elif is_data_row:
is_data_row = False
table_seq += 1
head_arr[table_seq] = HTMLTabularDataManager()
data_arr[table_seq] = HTMLTabularDataManager()
if not is_data_row:
# Table head (th) crawling procedure starts, for table_seq-th table
head_arr[table_seq].newrow()
for col in cols:
head_arr[table_seq].add_col(col.get_text(), col.get("rowspan"), col.get("colspan"))
# Table head (th) crawling procedure ends
else:
# Table data (td) crawling procedure starts, for table_seq-th table
cols = row.find_all('td')
data_arr[table_seq].newrow()
for col in cols:
data_arr[table_seq].add_col(col.get_text(), col.get("rowspan"), col.get("colspan"))
# Table data (td) crawling procedure ends
head_merged = copy.deepcopy(head_arr[0])
data_merged = copy.deepcopy(data_arr[0])
for i in range(1, len(head_arr)):
head_merged.merge(head_arr[i])
data_merged.merge(data_arr[i], head_arr[i - 1].hori_keys())
return head_merged, data_merged
def nifs_parse_table_splitted(tables_arr):
head_arr = {}
data_arr = {}
is_data_row = False
table_seq = 0
for table in tables_arr:
head_arr[table_seq], data_arr[table_seq] = nifs_parse_table(table)
table_seq += 1
head_merged = copy.deepcopy(head_arr[0])
data_merged = copy.deepcopy(data_arr[0])
for i in range(1, len(head_arr)):
head_merged.merge(head_arr[i])
data_merged.merge(data_arr[i], head_arr[i - 1].hori_keys())
return head_merged, data_merged
def nifs_identify_table_position(rules, num_chars_compare):
nifs_table_keys = {}
for table_seq in range(len(tables)):
for row in tables[table_seq].find_all('tr'):
for col in row.find_all('td'):
table_name = col.get_text().strip()
for rule_name, num_of_tables in rules.items():
if table_name[:num_chars_compare] == rule_name[:num_chars_compare]:
nifs_table_keys[table_seq + 1] = (table_name, num_of_tables)
return nifs_table_keys
tt = requests.get(URL)
soup = bs4.BeautifulSoup(tt.text, 'html.parser')
tables = soup.find_all('table', attrs={'class':'table02'})
nifs_table_keys = nifs_identify_table_position(RULES, RULES_NUM_CHARS_COMPARE)
for table_id, table_info in nifs_table_keys.items():
table_name = table_info[0]
num_of_tables = table_info[1]
csv_str = ""
head = None
data = None
if num_of_tables == 1:
head, data = nifs_parse_table(tables[table_id])
elif num_of_tables > 1:
head, data = nifs_parse_table_splitted(tables[table_id:table_id + num_of_tables])
csv_str += "\"" + table_name + "\"," + head.to_csv_single_line() + "\n"
csv_str += "\"" + "\"," + data.to_csv() + "\n\n"
print(csv_str)
csv = open(CSV_PATH, "a")
csv.write(csv_str)
csv.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment