Created
December 5, 2021 22:05
-
-
Save jmkim/1f2c71dd1309dc933bd5be0511f02fa1 to your computer and use it in GitHub Desktop.
NIFS crawler
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import bs4 | |
import requests | |
import collections | |
import copy | |
import time | |
URL = 'https://www.nifs.go.kr/page?id=aq_seafood_2_7&type=tot&from=totList&fim_col_id=2018-MF0006689-6-D01' | |
#URL = 'https://www.nifs.go.kr/page?id=aq_seafood_2_7&fim_col_id=2018-MF0006689-Z-D33&mode=all' | |
RULES = { | |
"일반성분": 1, | |
"비타민": 1, | |
"지방산": 1, | |
"아미노산": 2, | |
"무기질": 1, | |
"핵산": 1, | |
"베타인류": 1 | |
} | |
RULES_NUM_CHARS_COMPARE = 2 | |
CSV_PATH = "nifs-" + time.strftime("%Y%m%d-%H%M%S", time.localtime()) + ".csv" | |
# HTML Tabular Manager | |
# Manage the HTML tabular data (th and td), as a table. | |
# Concept: | |
# Description: | |
# Fill the data table with column name. | |
# Add the column name one by one, horizontally (right direction). = Function namely: add_col | |
# When one line done, commit the line and continue to next line. = Function namely: new_row | |
# When the data table is logically broken | |
# (i.e. table data is divided into multiple tables), | |
# first, create each HTMLTabularDataManager for all the broken table data (th or td), | |
# second, merge those multiple HTMLTabularDataManager into one. = Function namely: merge | |
# Terms: | |
# Tabular data: The data inside table row (tr), namely: th and td (respectively, table header and table data) | |
# Data table: The table which contains tabular data of the table. | |
# Cursor: Current write position. | |
# There are two cursors; Vertical cursor and Horizontal cursor. | |
# Functions: | |
# add_col: Add the column name, horizontally (right direction) | |
# new_row: Commit current tabular data line | |
# Move the cursor to next line's first (down direction with the most left empty position) | |
# merge: Merge with another HTMLTabularDataManager | |
class HTMLTabularDataManager: | |
def __init__(self): | |
self.data = {} # Tabular data table (aka. The data table) | |
self.data[0] = {} # Data table's first row initialization | |
self.vert_cur = 0 # Vertical cursor (current write position) of the header table | |
self.hori_cur = 0 # Horizontal cursor (current write position) of the header table | |
# Add the column name, horizontally (right direction) | |
def add_col(self, colname, rowspan, colspan): | |
# Check if the cursor is empty | |
while self.data[self.vert_cur].get(self.hori_cur) is not None: | |
# If the cursor is already written, skip and find the nearest right side empty position. Let's write there | |
self.hori_cur += 1 | |
# Add the column name to the data table | |
self.data[self.vert_cur][self.hori_cur] = colname | |
# Do the row-wise spanning | |
self.span_row(rowspan, colspan) | |
# Do the column-wise spanning | |
self.span_col(colspan, False) | |
# Mark the cursor is written; move the cursor to right position | |
self.hori_cur += 1 | |
# Rowspan procedure | |
# Copy the column name vertically (down direction) in the data table, "rowspan" times | |
def span_row(self, rowspan, colspan=0): | |
if type(rowspan) is str: # If str, convert to int | |
rowspan = int(rowspan) | |
if type(rowspan) is int: # Deal only with int | |
colname = self.data[self.vert_cur][self.hori_cur] | |
# Rowspan procedure starts | |
for row_i in range(1, rowspan): | |
if self.data.get(self.vert_cur + row_i) is None: | |
self.data[self.vert_cur + row_i] = {} | |
self.data[self.vert_cur + row_i][self.hori_cur] = colname | |
#print("vs", self.data, "\n") | |
# Do the column-wise spanning for each spanned rows | |
self.span_col(colspan, True, row_i) | |
# Rowspan procedure ends | |
# Colspan procedure | |
# Copy the column name horizontally (right direction) in the data table, "rowspan" times | |
def span_col(self, colspan, is_dummy=False, row_i=0): | |
if type(colspan) is str: # If str, convert to int | |
colspan = int(colspan) | |
if type(colspan) is int: # Deal only with int | |
colname = self.data[self.vert_cur][self.hori_cur] | |
if is_dummy: | |
# Colspan procedure starts (without updating the cursor) | |
for col_i in range(1, colspan): | |
self.data[self.vert_cur + row_i][self.hori_cur + col_i] = colname | |
# Colspan procedure ends | |
else: | |
# Colspan procedure starts (with updating the cursor) | |
for _ in range(1, colspan): | |
self.hori_cur += 1 | |
self.data[self.vert_cur][self.hori_cur] = colname | |
# Colspan procedure ends | |
# Commit current tabular data line | |
# Move the cursor to next line's first (down direction with the most left empty position) | |
def newrow(self): | |
if len(self.data.get(self.vert_cur)) > 0: | |
self.vert_cur += 1 | |
if self.data.get(self.vert_cur) is None: | |
self.data[self.vert_cur] = {} | |
self.hori_cur = 0 | |
# Return the vertical size of the data table | |
def vert_size(self): | |
return len(self.data) | |
# Return the horizontal size of the data table | |
def hori_size(self): | |
hori_size_arr = [0] * self.vert_size() | |
for row_i in range(self.vert_size()): | |
if type(self.data[row_i]) is dict and len(self.data[row_i]) > 0: | |
hori_size_arr[row_i] = max(self.data[row_i]) + 1 # Column index starts with 0, so row size should start with 1 | |
else: | |
hori_size_arr[row_i] = 0 # Empty row | |
return max(hori_size_arr) | |
# Return the horizontal keys of the data table | |
def hori_keys(self): | |
hori_keys_arr = [] | |
for row_i in range(self.vert_size()): | |
hori_keys_arr.extend(self.data[row_i].keys()) | |
return set(hori_keys_arr) | |
# Merge with another HTMLTabularDataManager | |
def merge(self, right, left_keys=None, padding=0): | |
if left_keys is None: | |
left_keys = self.hori_keys() | |
if self.vert_size() != right.vert_size(): | |
raise ValueError("Size mismatch (", self.vert_size, " != ", right.vert_size, ")") | |
left_hori_size = len(left_keys) | |
for row_i in range(self.vert_size()): | |
# Fill the left dummy column data, which is in left_keys but not yet in the data table | |
for left_k in left_keys: | |
if self.data[row_i].get(left_k) is None: | |
self.data[row_i][left_k] = "" | |
# Append the right column data | |
for right_col_k, right_col_v in right.data[row_i].items(): | |
self.data[row_i][left_hori_size + right_col_k + padding] = right_col_v | |
# Return the data table into JSON string | |
def to_json(self): | |
ret_str = "[\n" | |
sep_outer = " " | |
for row in self.data.values(): | |
ret_str += sep_outer | |
ret_str += "[ " | |
sep_inner = "" | |
for col_k, col_v in collections.OrderedDict(sorted(row.items())).items(): | |
ret_str += sep_inner | |
ret_str += "\"" | |
ret_str += str(col_v) | |
ret_str += "\"" | |
sep_inner = ", " | |
ret_str += " ]" | |
sep_outer = ",\n " | |
ret_str += "\n]" | |
return ret_str | |
# Return the data table into JSON string (with key) | |
def to_json_with_key(self): | |
ret_str = "[\n" | |
sep_outer = " " | |
for row in self.data.values(): | |
ret_str += sep_outer | |
ret_str += "{ " | |
sep_inner = "" | |
for col_k, col_v in collections.OrderedDict(sorted(row.items())).items(): | |
ret_str += sep_inner | |
ret_str += "\"" | |
ret_str += str(col_k) | |
ret_str += "\": \"" | |
ret_str += str(col_v) | |
ret_str += "\"" | |
sep_inner = ", " | |
ret_str += " }" | |
sep_outer = ",\n " | |
ret_str += "\n]" | |
return ret_str | |
# Return the data table into CSV string (multiple lines) | |
def to_csv(self, hori_keys=None): | |
if hori_keys is None: | |
hori_keys = self.hori_keys() | |
ret_str = "" | |
for row in self.data.values(): | |
sep = "" | |
for col_k in hori_keys: | |
ret_str += sep | |
ret_str += "\"" | |
if row.get(col_k) is not None: | |
ret_str += str(row[col_k]) | |
ret_str += "\"" | |
sep = "," | |
return ret_str | |
# Return the data table into CSV string (aggregate into one line) | |
# i.e. If the data table has 3 vertical size data: "A","B" | |
# "C","D" | |
# "E","F" | |
# then print the data table with seperator "-" as: "A-C-E","B-D-F" | |
def to_csv_single_line(self, seperator="-", hori_keys=None): | |
if hori_keys is None: | |
hori_keys = self.hori_keys() | |
colnames_arr = {} | |
for row in self.data.values(): | |
for col_k in hori_keys: | |
if col_k not in colnames_arr: | |
colnames_arr[col_k] = [] | |
if row.get(col_k) is not None: | |
colnames_arr[col_k].append(row[col_k]) | |
ret_str = "" | |
sep_outer = "" | |
for colname in collections.OrderedDict(sorted(colnames_arr.items())).values(): | |
prev = "" | |
ret_str += sep_outer | |
ret_str += "\"" | |
sep_inner = "" | |
for col in colname: | |
if prev == col: | |
continue | |
prev = col | |
ret_str += sep_inner | |
ret_str += col | |
sep_inner = seperator | |
ret_str += "\"" | |
sep_outer = "," | |
return ret_str | |
# Internal interface for Python: str | |
def __str__(self): | |
return self.to_json() | |
# Internal interface for Python: repr | |
def __repr__(self): | |
return self.to_json_with_key() | |
def nifs_parse_table(table): | |
if type(table) is not bs4.element.Tag: | |
raise ValueError("table is not a type of bs4.element.Tag") | |
head_arr = {} | |
data_arr = {} | |
is_data_row = False | |
table_seq = 0 | |
head_arr[0] = HTMLTabularDataManager() | |
data_arr[0] = HTMLTabularDataManager() | |
rows = table.find_all('tr') | |
for row in rows: | |
cols = row.find_all('th') | |
if len(cols) == 0: | |
is_data_row = True | |
elif is_data_row: | |
is_data_row = False | |
table_seq += 1 | |
head_arr[table_seq] = HTMLTabularDataManager() | |
data_arr[table_seq] = HTMLTabularDataManager() | |
if not is_data_row: | |
# Table head (th) crawling procedure starts, for table_seq-th table | |
head_arr[table_seq].newrow() | |
for col in cols: | |
head_arr[table_seq].add_col(col.get_text(), col.get("rowspan"), col.get("colspan")) | |
# Table head (th) crawling procedure ends | |
else: | |
# Table data (td) crawling procedure starts, for table_seq-th table | |
cols = row.find_all('td') | |
data_arr[table_seq].newrow() | |
for col in cols: | |
data_arr[table_seq].add_col(col.get_text(), col.get("rowspan"), col.get("colspan")) | |
# Table data (td) crawling procedure ends | |
head_merged = copy.deepcopy(head_arr[0]) | |
data_merged = copy.deepcopy(data_arr[0]) | |
for i in range(1, len(head_arr)): | |
head_merged.merge(head_arr[i]) | |
data_merged.merge(data_arr[i], head_arr[i - 1].hori_keys()) | |
return head_merged, data_merged | |
def nifs_parse_table_splitted(tables_arr): | |
head_arr = {} | |
data_arr = {} | |
is_data_row = False | |
table_seq = 0 | |
for table in tables_arr: | |
head_arr[table_seq], data_arr[table_seq] = nifs_parse_table(table) | |
table_seq += 1 | |
head_merged = copy.deepcopy(head_arr[0]) | |
data_merged = copy.deepcopy(data_arr[0]) | |
for i in range(1, len(head_arr)): | |
head_merged.merge(head_arr[i]) | |
data_merged.merge(data_arr[i], head_arr[i - 1].hori_keys()) | |
return head_merged, data_merged | |
def nifs_identify_table_position(rules, num_chars_compare): | |
nifs_table_keys = {} | |
for table_seq in range(len(tables)): | |
for row in tables[table_seq].find_all('tr'): | |
for col in row.find_all('td'): | |
table_name = col.get_text().strip() | |
for rule_name, num_of_tables in rules.items(): | |
if table_name[:num_chars_compare] == rule_name[:num_chars_compare]: | |
nifs_table_keys[table_seq + 1] = (table_name, num_of_tables) | |
return nifs_table_keys | |
tt = requests.get(URL) | |
soup = bs4.BeautifulSoup(tt.text, 'html.parser') | |
tables = soup.find_all('table', attrs={'class':'table02'}) | |
nifs_table_keys = nifs_identify_table_position(RULES, RULES_NUM_CHARS_COMPARE) | |
for table_id, table_info in nifs_table_keys.items(): | |
table_name = table_info[0] | |
num_of_tables = table_info[1] | |
csv_str = "" | |
head = None | |
data = None | |
if num_of_tables == 1: | |
head, data = nifs_parse_table(tables[table_id]) | |
elif num_of_tables > 1: | |
head, data = nifs_parse_table_splitted(tables[table_id:table_id + num_of_tables]) | |
csv_str += "\"" + table_name + "\"," + head.to_csv_single_line() + "\n" | |
csv_str += "\"" + "\"," + data.to_csv() + "\n\n" | |
print(csv_str) | |
csv = open(CSV_PATH, "a") | |
csv.write(csv_str) | |
csv.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment