Skip to content

Instantly share code, notes, and snippets.

@fullstackwebdev
Created March 22, 2024 00:36
Show Gist options
  • Save fullstackwebdev/4f8fc4931bd4dfba4231c8caf578e15e to your computer and use it in GitHub Desktop.
Save fullstackwebdev/4f8fc4931bd4dfba4231c8caf578e15e to your computer and use it in GitHub Desktop.
import chunk
import datetime
import json
import os
import psycopg2
from .doc2kg import process_knowledge_graph
from .upload_processors.pdf2ocr2md import process_upload_pdf_with_ocr
# from .process_upload_llama_index import process_uploaded_file
from .upload_processors.file2text import process_uploaded_file
from .upload_processors.pdf2epub2md import process_uploaded_file as process_uploaded_file_pdf2epub2md
from .utils.tasks import create_task
from .summary import doc2sumarize
from .doc2kg4 import doc2chunk2kg
from langgraph.graph import Graph, END
GRAPHILE_WORKER_CONNECTION_STRING = os.environ['GRAPHILE_WORKER_CONNECTION_STRING']
workflow = Graph()
def error_node(task_data):
errored_at , error_json, task_id = task_data['errored_at'], task_data['error_json'], task_data['task_id']
# # Update the database with the error and the errored time
with psycopg2.connect(GRAPHILE_WORKER_CONNECTION_STRING) as conn:
with conn.cursor() as cursor:
cursor.execute(
"""UPDATE fmx.task SET status = 'failed', errored_at = %s, error = %s WHERE id = %s;""",
(errored_at, error_json, task_id)
)
conn.commit()
return 'end'
def start_node(arg):
print("start_node called with arg:", arg)
return arg
# async def process_kg_node(task_data):
# return await process_knowledge_graph(task_data)
def should_continue(task_data):
# return 'summary'
return 'kg'
def handle_tools(task_data):
print("handle_tools called with arg:", task_data)
# print("Starting the workflow...")
# Randomly set the condition to simulate different outcomes
return task_data
async def router(task_data):
workflow = Graph()
workflow.add_node("start", start_node)
workflow.set_entry_point("start")
# workflow.add_node("tools", handle_tools)
# workflow.add_node("kg", process_knowledge_graph) # version 1
workflow.add_node("kg", doc2chunk2kg) # version 2
workflow.add_node("summary", doc2sumarize)
workflow.add_node("text",
lambda task_data: process_uploaded_file(task_data,
uploaded_file_id=task_data['payload']['uploaded_file_id']
)
)
workflow.add_edge('start', 'text')
# # workflow.add_node("kg", doc2chunk2kg)
# workflow.add_conditional_edges(
# "text",
# should_continue,
# # lambda task_data: task_data, # Assume task_data contains the next node key
# {
# # "ocr": "ocr",
# # "text": "text",
# "kg": "kg",
# "summary": "summary",
# # "pdf2epub2md": "pdf2epub2md",
# "finish": END,
# # "error": "error_handling",
# # "kg": "kg",
# # "gpt": "gpt",
# # "error": "error_handling",
# # "final_step": END
# }
# )
workflow.add_edge('text', 'summary')
workflow.add_edge('summary', 'kg')
workflow.add_conditional_edges(
"kg",
# should_continue_finish,
lambda task_data: 'finish',
{
"finish": END
}
)
# workflow.add_conditional_edges(
# "summary",
# should_continue_finish,
# {
# "finish": END
# }
# )
chain = workflow.compile()
# chain.invoke(task_data)
await chain.ainvoke(task_data)
# import chunk
# import datetime
# import json
# import os
# import psycopg2
# from .doc2kg import process_knowledge_graph
# from .upload_processors.pdf2ocr2md import process_upload_pdf_with_ocr
# # from .process_upload_llama_index import process_uploaded_file
# from .upload_processors.file2text import process_uploaded_file
# from .upload_processors.pdf2epub2md import process_uploaded_file as process_uploaded_file_pdf2epub2md
# from .utils.tasks import create_task
# # from .summary import doc2sumarize
# from .doc2kg4 import doc2chunk2kg
# from langgraph.graph import Graph, END
# GRAPHILE_WORKER_CONNECTION_STRING = os.environ['GRAPHILE_WORKER_CONNECTION_STRING']
# workflow = Graph()
# def error_handling_node(task_data):
# errored_at , error_json, task_id = task_data['errored_at'], task_data['error_json'], task_data['task_id']
# # # Update the database with the error and the errored time
# with psycopg2.connect(GRAPHILE_WORKER_CONNECTION_STRING) as conn:
# with conn.cursor() as cursor:
# cursor.execute(
# """UPDATE fmx.task SET status = 'failed', errored_at = %s, error = %s WHERE id = %s;""",
# (errored_at, error_json, task_id)
# )
# conn.commit()
# return 'end'
# async def router(task_data):
# def start_node(arg):
# # print("start_node called with arg:", arg)
# # print("Starting the workflow...")
# # Randomly set the condition to simulate different outcomes
# return arg
# # def process_node_1(arg):
# # print("process_node_1 called with arg:", arg)
# # print("Executing Process Node 1")
# # return {"data": arg}
# # def process_node_2(arg):
# # # print("process_node_2 called with arg:", arg)
# # # print("Executing Process Node 2")
# # return {"data": arg}
# # Copy code
# workflow = Graph()
# workflow.add_node("start", start_node)
# # workflow.add_node("step2", process_node_1)
# # workflow.add_node("step3", process_node_2)
# workflow.add_node("text",
# lambda task_data: process_uploaded_file(task_data,
# uploaded_file_id=task_data['payload']['uploaded_file_id']
# )
# )
# async def process_kg_node(task_data):
# return await process_knowledge_graph(task_data)
# # When adding the node to the LangGraph
# workflow.add_node("kg", process_kg_node) # version 1
# workflow.add_node("finish", END)
# # workflow.add_node("kg", doc2chunk2kg) # version 2
# # workflow.add_node("summarize", doc2sumarize)
# # workflow.add_node("finish", doc2sumarize)
# workflow.set_entry_point("start")
# def should_continue(task_data):
# # print("should_continue called with arg:", task_data)
# # print("Starting the workflow...")
# type = task_data['payload']['type']
# if type == 'summarize':
# return 'finish'
# # Randomly set the condition to simulate different outcomes
# # return task_data
# # get type
# if type == 'text':
# return 'text'
# workflow.add_conditional_edges(
# "start",
# should_continue,
# # lambda task_data: task_data, # Assume task_data contains the next node key
# {
# # "ocr": "ocr",
# "text": "text",
# # "kg": "kg",
# # "pdf2epub2md": "pdf2epub2md",
# "finish": "finish",
# # "kg": "kg",
# # "gpt": "gpt",
# # "error": "error_handling",
# # "final_step": END
# }
# )
# # workflow.add_edge('step2', 'step3')
# # workflow.add_edge('text', 'kg')
# workflow.add_edge('text', 'kg')
# # workflow.add_edge('finish', 'start')
# workflow.add_edge('kg', 'finish')
# # workflow.add_edge('summarize', 'step3')
# # workflow.add_conditional_edges(
# # "kg",
# # should_continue,
# # # lambda task_data: task_data, # Assume task_data contains the next node key
# # {
# # # "ocr": "ocr",
# # "text": "text",
# # "summarize": "summarize",
# # # "pdf2epub2md": "pdf2epub2md",
# # # "kg": "kg",
# # # "gpt": "gpt",
# # # "error": "error_handling",
# # # "final_step": END
# # }
# # )
# # workflow.set_finish_point('step3')
# chain = workflow.compile()
# print(f"##########################################")
# # chain.invoke(task_data)
# await chain.ainvoke(task_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment