Created
March 19, 2024 03:11
-
-
Save syusuke9999/2003ef76e561db43357fb42d2c511f03 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import pytz | |
import json | |
import logging | |
from tkinter import filedialog | |
from tkinter import Tk | |
from datetime import datetime | |
from tqdm import tqdm | |
from typing import Optional | |
time_zone_name = 'Asia/Tokyo' | |
time_zone: Optional[pytz.tzinfo.BaseTzInfo] = pytz.timezone(time_zone_name) | |
def safe_unicode_str(s): | |
return s.encode('utf-16', 'surrogatepass').decode('utf-16') | |
def get_filename_without_extension(file_path: str) -> str: | |
""" | |
Returns the filename without the extension from the given file path. | |
Args: | |
file_path (str): The path of the file. | |
Returns: | |
str: The filename without the extension. | |
""" | |
# Get base_name | |
base_name = os.path.basename(file_path) | |
# Split base_name and extension | |
file_name, _ = os.path.splitext(base_name) | |
return file_name | |
def remove_u0000(item): | |
if isinstance(item, dict): | |
return {key: remove_u0000(value) for key, value in item.items()} | |
elif isinstance(item, list): | |
return [remove_u0000(element) for element in item] | |
elif isinstance(item, str): | |
# Check for JSON format | |
try: | |
json_obj = json.loads(item) | |
return json.dumps(remove_u0000(json_obj)) | |
except json.JSONDecodeError: | |
return item.replace('\u0000', '') | |
else: | |
return item | |
def decode_unicode_escaped(json_obj, key=None): | |
global time_zone | |
if isinstance(json_obj, dict): | |
return {k: decode_unicode_escaped(v, k) for k, v in json_obj.items()} | |
elif isinstance(json_obj, list): | |
return [decode_unicode_escaped(element) for element in json_obj] | |
elif isinstance(json_obj, str): | |
try: | |
escaped_str = json_obj.encode('utf-8').decode('utf-8') | |
except UnicodeEncodeError as en: | |
logging.debug(f"Error: {en}") | |
return | |
except UnicodeDecodeError as de: | |
logging.debug(f"Error: {de}") | |
return | |
else: | |
return escaped_str | |
elif isinstance(json_obj, (int, float)): | |
special_keys = { | |
'timestamp': 10, | |
'create_time': 10, | |
'update_time': 10, | |
'feedback_start_time': 13, | |
'compare_step_start_time': 13, | |
'new_completion_load_end_time': 13, | |
'frontend_submission_time': 13, | |
'new_completion_load_start_time': 16, | |
'createdAt': 13, | |
} | |
if key in special_keys: | |
try: | |
divisor = 1 | |
if special_keys[key] == 13: # type: ignore | |
divisor = 1000 | |
elif special_keys[key] == 16: # type: ignore | |
divisor = 1000000 | |
utc_time = datetime.fromtimestamp(json_obj / divisor, tz=pytz.utc) | |
jst_time = utc_time.astimezone(time_zone) | |
return jst_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + " " + time_zone_name | |
except() as e: | |
logging.debug(f"Error: {e}") | |
logging.debug(f"Error: {e.traback()}") | |
return json_obj | |
else: | |
return json_obj | |
else: | |
return json_obj | |
def process_json_file(input_file: str): | |
base_name_of_file = get_filename_without_extension(input_file) | |
# Skip if the base_name_of_file ends with _coverted | |
if base_name_of_file.endswith("_decoded"): | |
return | |
with open(input_file, 'r', encoding='utf-8-sig') as f: | |
try: | |
json_data = json.load(f) | |
except json.JSONDecodeError: | |
logging.debug(f"Error: {input_file} is not a valid JSON file.") | |
f.close() | |
return | |
else: | |
cleaned_data = remove_u0000(json_data) | |
decoded_json = decode_unicode_escaped(cleaned_data) | |
output_file = os.path.join(os.path.dirname(input_file), base_name_of_file + "_decoded.json") | |
with open(output_file, 'w', encoding='utf-8-sig') as out: | |
json_str = json.dumps(decoded_json, ensure_ascii=False, indent=4) | |
safe_json_str = safe_unicode_str(json_str) | |
out.write(safe_json_str) | |
out.close() | |
def get_all_file_paths(directory: str): | |
file_paths_str: list = [] | |
for root_path, _, files in os.walk(directory): | |
for filename in files: | |
file_path = os.path.join(root_path, filename) | |
file_paths_str.append(file_path) | |
return file_paths_str | |
if __name__ == "__main__": | |
root = Tk() | |
root.withdraw() | |
# Display folder selection dialog box | |
folder_path = filedialog.askdirectory() | |
if folder_path: | |
file_paths = get_all_file_paths(folder_path) | |
for filepath in tqdm(file_paths): | |
if filepath.endswith(".json"): | |
process_json_file(filepath) | |
else: | |
logging.debug("No folder selected.") | |
root.destroy() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment