Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save JupyterJones/1e2d05c0fb6cdb1d103581a539761b66 to your computer and use it in GitHub Desktop.
Save JupyterJones/1e2d05c0fb6cdb1d103581a539761b66 to your computer and use it in GitHub Desktop.
convert ChatGPT conversations.json to sessions txt html and sqlite_data for searching
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"id": "e0df4f99",
"metadata": {},
"source": [
"# ChatGPT:\n",
"#### conversations2/conversation_json_to_sessions_txt_html_sqlite_data"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6aa9c155",
"metadata": {},
"outputs": [],
"source": [
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "77ab72f5",
"metadata": {},
"outputs": [],
"source": [
"#!/home/jack/miniconda3/envs/cloned_base/bin/python\n",
"import json\n",
"import logging\n",
"import os\n",
"import glob\n",
"import subprocess\n",
"\n",
"def split_and_save_and_convert(conversations_file, output_folder):\n",
" try:\n",
" with open(conversations_file, 'r', encoding='utf-8') as file:\n",
" data = json.load(file)\n",
" \n",
" for conversation in data:\n",
" title = conversation.get('title', 'Unknown_Title')\n",
" title_with_underscores = title.replace(' ', '_')\n",
" title_with_underscores = title_with_underscores.replace(':', '_')\n",
" title_with_underscores = title_with_underscores.replace(\"'\", \"_\")\n",
" title_with_underscores = title_with_underscores.replace(\"&\", \"_\")\n",
" title_with_underscores = title_with_underscores.replace(\"*\", \"_\")\n",
" title_with_underscores = title_with_underscores.replace(\"(\", \"_\")\n",
" title_with_underscores = title_with_underscores.replace(\")\", \"_\")\n",
" chapter_filename = f\"{title_with_underscores}.json\"\n",
" chapter_filepath = os.path.join(output_folder, chapter_filename)\n",
" \n",
" logging.info(f\"Saving data for conversation '{title}' to {chapter_filepath}\")\n",
" \n",
" with open(chapter_filepath, 'w', encoding='utf-8') as chapter_file:\n",
" json.dump([conversation], chapter_file, indent=2)\n",
"\n",
" # Convert JSON to HTML\n",
" html_output_file = os.path.join(output_folder, f\"{title_with_underscores}.html\")\n",
" convert_to_html(chapter_filepath, html_output_file)\n",
"\n",
" # Convert JSON to TXT\n",
" txt_output_file = os.path.join(output_folder, f\"{title_with_underscores}.txt\")\n",
" convert_to_txt(chapter_filepath, txt_output_file)\n",
"\n",
" except FileNotFoundError:\n",
" logging.error(f\"File not found: {conversations_file}\")\n",
" except json.JSONDecodeError:\n",
" logging.error(f\"Error decoding JSON in file: {conversations_file}\")\n",
" except Exception as e:\n",
" logging.error(f\"An unexpected error occurred: {e}\")\n",
"\n",
"def convert_to_html(json_file, html_output_file):\n",
" with open(json_file, 'r', encoding='utf-8') as file:\n",
" json_data = json.load(file)\n",
"\n",
" result_str = get_conversation_result(json_data)\n",
"\n",
" with open(html_output_file, \"w\", encoding='utf-8') as html_output:\n",
" result_html = result_str.replace(\"/n\", \"XXXXXXX\\n\")\n",
" result_html = result_html.replace(\"<\", \"&lt;\")\n",
" result_html = result_html.replace(\">\", \"&gt;\")\n",
" for line in result_html.split(\"XXXXXXX\"):\n",
" line = line.replace(\"\\n\", \"<br />\\n\")\n",
" html_output.write(line)\n",
"\n",
"def convert_to_txt(json_file, txt_output_file):\n",
" with open(json_file, 'r', encoding='utf-8') as file:\n",
" json_data = json.load(file)\n",
"\n",
" result_str = get_conversation_result(json_data)\n",
"\n",
" with open(txt_output_file, \"w\", encoding='utf-8') as txt_output:\n",
" result_txt = result_str.replace(\"/n\", \"XXXXXXX\\n\")\n",
" for line in result_txt.split(\"XXXXXXX\"):\n",
" txt_output.write(line)\n",
"\n",
"def get_conversation_result(json_data):\n",
" result_str = \"\"\n",
" for conversation in json_data:\n",
" title = conversation.get('title', '')\n",
" messages = get_conversation_messages(conversation)\n",
"\n",
" result_str += title + '\\n'\n",
" for message in messages:\n",
" result_str += message['author'] + '\\n' + message['text'] + '\\n'\n",
" result_str += '\\n'\n",
"\n",
" return result_str\n",
"\n",
"def get_conversation_messages(conversation):\n",
" messages = []\n",
" current_node = conversation.get('current_node')\n",
" while current_node:\n",
" node = conversation['mapping'][current_node]\n",
" message = node.get('message')\n",
" if (message and message.get('content') and message['content'].get('content_type') == 'text' and\n",
" len(message['content'].get('parts', [])) > 0 and len(message['content']['parts'][0]) > 0 and\n",
" (message['author']['role'] != 'system' or message.get('metadata', {}).get('is_user_system_message'))):\n",
" author = message['author']['role']\n",
" if author == 'assistant':\n",
" author = 'ChatGPT'\n",
" elif author == 'system' and message['metadata'].get('is_user_system_message'):\n",
" author = 'Custom user info'\n",
" messages.append({'author': author, 'text': message['content']['parts'][0]})\n",
" current_node = node.get('parent')\n",
" return messages[::-1]\n",
"\n",
"# Example usage\n",
"conversations_file_path = 'CHATGPT/conversations.json'\n",
"output_folder = 'CHATGPT/output_all_in_one'\n",
"\n",
"# Ensure the output folder exists\n",
"os.makedirs(output_folder, exist_ok=True)\n",
"\n",
"# Configure logging\n",
"logging.basicConfig(level=logging.INFO)\n",
"\n",
"# Call the split, save, and convert function\n",
"split_and_save_and_convert(conversations_file_path, output_folder)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "99a4779c",
"metadata": {},
"outputs": [],
"source": [
"import sqlite3\n",
"import os\n",
"import hashlib\n",
"\n",
"# Connect to SQLite database (creates a new database if it doesn't exist)\n",
"db_path = 'chat_database.db'\n",
"conn = sqlite3.connect(db_path)\n",
"cursor = conn.cursor()\n",
"\n",
"# Create a table to store file information\n",
"cursor.execute('''\n",
" CREATE TABLE IF NOT EXISTS files (\n",
" id INTEGER PRIMARY KEY,\n",
" filename TEXT NOT NULL,\n",
" content BLOB NOT NULL,\n",
" text_content TEXT NOT NULL,\n",
" hash_value TEXT NOT NULL,\n",
" format TEXT NOT NULL\n",
" )\n",
"''')\n",
"\n",
"# Commit changes and close the connection\n",
"conn.commit()\n",
"conn.close()\n",
"\n",
"# Function to calculate SHA-256 hash of a file\n",
"def calculate_hash(file_path):\n",
" sha256 = hashlib.sha256()\n",
" with open(file_path, 'rb') as file:\n",
" while chunk := file.read(8192): # Read in 8KB chunks\n",
" sha256.update(chunk)\n",
" return sha256.hexdigest()\n",
"\n",
"# Function to insert a file into the database\n",
"def insert_file(filename, content, text_content, hash_value, file_format):\n",
" conn = sqlite3.connect(db_path)\n",
" cursor = conn.cursor()\n",
" cursor.execute('INSERT INTO files (filename, content, text_content, hash_value, format) VALUES (?, ?, ?, ?, ?)',\n",
" (filename, content, text_content, hash_value, file_format))\n",
" conn.commit()\n",
" conn.close()\n",
"\n",
"# Function to insert HTML files recursively\n",
"def insert_html_files(directory):\n",
" for root, _, files in os.walk(directory):\n",
" for file_name in files:\n",
" if file_name.endswith('.html'):\n",
" file_path = os.path.join(root, file_name)\n",
" with open(file_path, 'rb') as file:\n",
" file_content = file.read()\n",
" text_content = file_content.decode('utf-8', errors='ignore') # Convert bytes to string\n",
" hash_value = calculate_hash(file_path)\n",
" insert_file(file_name, file_content, text_content, hash_value, 'html')\n",
" print(f\"Inserted: {file_name}\")\n",
"\n",
"# Example: Insert HTML files recursively from the specified directory\n",
"insert_html_files('CHATDPT/')\n",
"\n",
"print('Insertion process completed.')\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a484992e",
"metadata": {},
"outputs": [],
"source": [
"import sqlite3\n",
"import uuid\n",
"\n",
"# Connect to SQLite database\n",
"db_path = 'chat_database.db'\n",
"conn = sqlite3.connect(db_path)\n",
"cursor = conn.cursor()\n",
"\n",
"def retrieve_file_content(filename):\n",
" cursor.execute('SELECT content FROM files WHERE filename = ?', (filename,))\n",
" result = cursor.fetchone()\n",
" return result[0] if result else None\n",
"\n",
"def search_and_print_fourth_file(search_terms):\n",
" Data = \"\"\n",
"\n",
" # Prepare the SQL query for searching files based on the given terms\n",
" query = '''\n",
" SELECT filename\n",
" FROM files\n",
" WHERE {}\n",
" '''.format(' AND '.join(['text_content LIKE ?' for _ in search_terms]))\n",
"\n",
" # Add % around search terms for a partial match with spaces\n",
" search_terms = ['% {} %'.format(term) for term in search_terms]\n",
"\n",
" # Execute the query and retrieve matching files\n",
" cursor.execute(query, search_terms)\n",
" matching_files = cursor.fetchall()\n",
"\n",
" # Check if there are at least 2 matching files\n",
" if matching_files and len(matching_files) >= 2:\n",
" fourth_file = matching_files[1][0] # Get the filename of the second matching file\n",
" print(fourth_file)\n",
"\n",
" # Retrieve the content of the matching file\n",
" content = retrieve_file_content(fourth_file)\n",
" \n",
" if content:\n",
" # Decode the content and append it to the Data variable\n",
" Data = Data + f'{content.decode(\"utf-8\", errors=\"ignore\")}'\n",
" print(Data)\n",
" return Data\n",
" else:\n",
" print(f'Error: Content not found for {fourth_file}')\n",
" else:\n",
" print('Error: No matching files found or less than two matching files.')\n",
"\n",
"# Example: Search for files containing 'flask' and '5200'\n",
"search_terms = ['Cephalux','Morpholux']\n",
"DATA = search_and_print_fourth_file(search_terms)\n",
"\n",
"# Close the connection to the database\n",
"conn.close()\n",
"\n",
"# If data is found, create a unique filename and write the content to an HTML file\n",
"if len(DATA) > 2:\n",
" uid = str(uuid.uuid4()) # Generate a unique ID using uuid\n",
" FileName = \"_\".join(search_terms) + \"_\" + uid + \".html\"\n",
" print(FileName)\n",
"\n",
" # Open the file for writing\n",
" with open(FileName, \"w\") as IN:\n",
" # Split the data into lines and write each line to the file with \"<br />\" appended\n",
" ndata = DATA.split(\"<br />\\n\")\n",
" for line in ndata:\n",
" print(line)\n",
" IN.write(line + \"<br />\\n\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0c007963",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"gist": {
"data": {
"description": "conversation_json_to_sessions_txt_html_sqlite_data.ipynb",
"public": true
},
"id": ""
},
"kernelspec": {
"display_name": "cloned-base",
"language": "python",
"name": "cloned-base"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
@JupyterJones
Copy link
Author

Create a main containing the sub directory CHATGPT

run your notebook from the main directory and place conversations.json in CHATGPT

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment