Last active
March 2, 2022 14:22
-
-
Save ciaranchen/ac8dd8084134a626006e8cdb765662db to your computer and use it in GitHub Desktop.
json filter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json, os | |
import requests | |
# https://lod-cloud.net/versions/2021-05-05/lod-data.json | |
json_path = './lod.json' | |
class JsonFilter(object): | |
def __init__(self, operator, threshold): | |
self.operator = operator | |
self.threshold = threshold | |
def set_2p(self, data_path, output_path): | |
opaths = output_path.split(',') | |
if len(opaths) == 1: | |
opaths += [] | |
assert data_path.startswith(opaths[0]) | |
self.same_path = opaths[0].split('.') | |
self.output_path = opaths[1].split('.') | |
self.data_path = data_path[len(opaths[0]):].lstrip('.').split('.') | |
assert '%' not in self.data_path and '%' not in self.output_path | |
return self.same_path, self.data_path, self.output_path | |
def set_3p(self, same_path, data_path, output_path): | |
self.same_path = same_path | |
self.data_path = data_path | |
assert '%' not in self.data_path and '%' not in self.output_path | |
self.output_path = output_path | |
return self.same_path, self.data_path, self.output_path | |
def _get_value_all(self, data, past_path=[]): | |
res = [] | |
if isinstance(data, dict): | |
# filter its value | |
for key, value in data.items(): | |
vs = self._get_value(value, past_path + [key]) | |
res.extend(vs) | |
elif isinstance(data, list): | |
for i, value in enumerate(data): | |
vs = self._get_value(value, past_path + [str(i)]) | |
res.extend(vs) | |
return res | |
def _get_value(self, data, past_path, diff=None): | |
# print(data, past_path) | |
if diff is None and len(past_path) > len(self.same_path): | |
v1 = self._get_value(data, past_path, diff=self.data_path) | |
v2 = self._get_value(data, past_path, diff=self.output_path) | |
return [(past_path, v1, v2)] | |
if diff and len(past_path) > len(diff) + len(self.same_path): | |
return data | |
index = diff[len(past_path) - len(self.same_path)-1] if diff else self.same_path[len(past_path)-1] | |
# print(index) | |
if index == '%': | |
return self._get_value_all(data, past_path) | |
try: | |
if index.isnumeric(): | |
return self._get_value(data[int(index)], past_path + [index], diff) | |
else: | |
return self._get_value(data[index], past_path + [index], diff) | |
except Exception as e: | |
raise e | |
# return [(past_path + [index], None)] | |
def filter(self, data): | |
_judge = lambda x: self.operator(x, self.threshold) | |
res = self._get_value_all(data) | |
print(len(res)) | |
for path, jv, ov in res: | |
if _judge(jv): | |
self.output(path, jv, ov) | |
def output(self, path, jv, ov): | |
print('.'.join(path), jv, ov) | |
def lt(value, threshold): | |
return value < threshold | |
def gt(value, threshold): | |
return value > threshold | |
def eq(value, threshold): | |
return value == threshold | |
data_path = r'full_download.%.status' | |
output_path = r'full_download.%,download_url' | |
jf = JsonFilter(eq, 'OK') | |
jf.set_2p(data_path, output_path) | |
jf.filter(json.load(open(json_path))) | |
class Downloader(JsonFilter): | |
def __init__(self, op, value): | |
os.makedirs(r'.\data', exist_ok=True) | |
super().__init__(op, value) | |
def output(self, path, jv, ov): | |
print(path) | |
fp = path[0] | |
dnum = path[-1] | |
fp = fp.replace(' ', '_') | |
fname = r'.\data\{}\{}.xml'.format(fp, dnum) | |
if not os.path.exists(fname): | |
os.makedirs(r'.\data\{}'.format(key), exist_ok=True) | |
url = self._get_value(value, 0, self.output_path) | |
try: | |
req = requests.get(url) | |
with open(fname, 'wb') as f: | |
f.write(req.content) | |
except Exception as e: | |
print(e) | |
# jf = Downloader(eq, 'OK') | |
# jf.set_2p(data_path, output_path) | |
# jf.filter(json.load(open(json_path))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment