Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save syusuke9999/2003ef76e561db43357fb42d2c511f03 to your computer and use it in GitHub Desktop.
Save syusuke9999/2003ef76e561db43357fb42d2c511f03 to your computer and use it in GitHub Desktop.
import os
import pytz
import json
import logging
from tkinter import filedialog
from tkinter import Tk
from datetime import datetime
from tqdm import tqdm
from typing import Optional
time_zone_name = 'Asia/Tokyo'
time_zone: Optional[pytz.tzinfo.BaseTzInfo] = pytz.timezone(time_zone_name)
def safe_unicode_str(s):
return s.encode('utf-16', 'surrogatepass').decode('utf-16')
def get_filename_without_extension(file_path: str) -> str:
"""
Returns the filename without the extension from the given file path.
Args:
file_path (str): The path of the file.
Returns:
str: The filename without the extension.
"""
# Get base_name
base_name = os.path.basename(file_path)
# Split base_name and extension
file_name, _ = os.path.splitext(base_name)
return file_name
def remove_u0000(item):
if isinstance(item, dict):
return {key: remove_u0000(value) for key, value in item.items()}
elif isinstance(item, list):
return [remove_u0000(element) for element in item]
elif isinstance(item, str):
# Check for JSON format
try:
json_obj = json.loads(item)
return json.dumps(remove_u0000(json_obj))
except json.JSONDecodeError:
return item.replace('\u0000', '')
else:
return item
def decode_unicode_escaped(json_obj, key=None):
global time_zone
if isinstance(json_obj, dict):
return {k: decode_unicode_escaped(v, k) for k, v in json_obj.items()}
elif isinstance(json_obj, list):
return [decode_unicode_escaped(element) for element in json_obj]
elif isinstance(json_obj, str):
try:
escaped_str = json_obj.encode('utf-8').decode('utf-8')
except UnicodeEncodeError as en:
logging.debug(f"Error: {en}")
return
except UnicodeDecodeError as de:
logging.debug(f"Error: {de}")
return
else:
return escaped_str
elif isinstance(json_obj, (int, float)):
special_keys = {
'timestamp': 10,
'create_time': 10,
'update_time': 10,
'feedback_start_time': 13,
'compare_step_start_time': 13,
'new_completion_load_end_time': 13,
'frontend_submission_time': 13,
'new_completion_load_start_time': 16,
'createdAt': 13,
}
if key in special_keys:
try:
divisor = 1
if special_keys[key] == 13: # type: ignore
divisor = 1000
elif special_keys[key] == 16: # type: ignore
divisor = 1000000
utc_time = datetime.fromtimestamp(json_obj / divisor, tz=pytz.utc)
jst_time = utc_time.astimezone(time_zone)
return jst_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + " " + time_zone_name
except() as e:
logging.debug(f"Error: {e}")
logging.debug(f"Error: {e.traback()}")
return json_obj
else:
return json_obj
else:
return json_obj
def process_json_file(input_file: str):
base_name_of_file = get_filename_without_extension(input_file)
# Skip if the base_name_of_file ends with _coverted
if base_name_of_file.endswith("_decoded"):
return
with open(input_file, 'r', encoding='utf-8-sig') as f:
try:
json_data = json.load(f)
except json.JSONDecodeError:
logging.debug(f"Error: {input_file} is not a valid JSON file.")
f.close()
return
else:
cleaned_data = remove_u0000(json_data)
decoded_json = decode_unicode_escaped(cleaned_data)
output_file = os.path.join(os.path.dirname(input_file), base_name_of_file + "_decoded.json")
with open(output_file, 'w', encoding='utf-8-sig') as out:
json_str = json.dumps(decoded_json, ensure_ascii=False, indent=4)
safe_json_str = safe_unicode_str(json_str)
out.write(safe_json_str)
out.close()
def get_all_file_paths(directory: str):
file_paths_str: list = []
for root_path, _, files in os.walk(directory):
for filename in files:
file_path = os.path.join(root_path, filename)
file_paths_str.append(file_path)
return file_paths_str
if __name__ == "__main__":
root = Tk()
root.withdraw()
# Display folder selection dialog box
folder_path = filedialog.askdirectory()
if folder_path:
file_paths = get_all_file_paths(folder_path)
for filepath in tqdm(file_paths):
if filepath.endswith(".json"):
process_json_file(filepath)
else:
logging.debug("No folder selected.")
root.destroy()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment