-
-
Save fullstackwebdev/4f8fc4931bd4dfba4231c8caf578e15e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import chunk | |
import datetime | |
import json | |
import os | |
import psycopg2 | |
from .doc2kg import process_knowledge_graph | |
from .upload_processors.pdf2ocr2md import process_upload_pdf_with_ocr | |
# from .process_upload_llama_index import process_uploaded_file | |
from .upload_processors.file2text import process_uploaded_file | |
from .upload_processors.pdf2epub2md import process_uploaded_file as process_uploaded_file_pdf2epub2md | |
from .utils.tasks import create_task | |
from .summary import doc2sumarize | |
from .doc2kg4 import doc2chunk2kg | |
from langgraph.graph import Graph, END | |
GRAPHILE_WORKER_CONNECTION_STRING = os.environ['GRAPHILE_WORKER_CONNECTION_STRING'] | |
workflow = Graph() | |
def error_node(task_data): | |
errored_at , error_json, task_id = task_data['errored_at'], task_data['error_json'], task_data['task_id'] | |
# # Update the database with the error and the errored time | |
with psycopg2.connect(GRAPHILE_WORKER_CONNECTION_STRING) as conn: | |
with conn.cursor() as cursor: | |
cursor.execute( | |
"""UPDATE fmx.task SET status = 'failed', errored_at = %s, error = %s WHERE id = %s;""", | |
(errored_at, error_json, task_id) | |
) | |
conn.commit() | |
return 'end' | |
def start_node(arg): | |
print("start_node called with arg:", arg) | |
return arg | |
# async def process_kg_node(task_data): | |
# return await process_knowledge_graph(task_data) | |
def should_continue(task_data): | |
# return 'summary' | |
return 'kg' | |
def handle_tools(task_data): | |
print("handle_tools called with arg:", task_data) | |
# print("Starting the workflow...") | |
# Randomly set the condition to simulate different outcomes | |
return task_data | |
async def router(task_data): | |
workflow = Graph() | |
workflow.add_node("start", start_node) | |
workflow.set_entry_point("start") | |
# workflow.add_node("tools", handle_tools) | |
# workflow.add_node("kg", process_knowledge_graph) # version 1 | |
workflow.add_node("kg", doc2chunk2kg) # version 2 | |
workflow.add_node("summary", doc2sumarize) | |
workflow.add_node("text", | |
lambda task_data: process_uploaded_file(task_data, | |
uploaded_file_id=task_data['payload']['uploaded_file_id'] | |
) | |
) | |
workflow.add_edge('start', 'text') | |
# # workflow.add_node("kg", doc2chunk2kg) | |
# workflow.add_conditional_edges( | |
# "text", | |
# should_continue, | |
# # lambda task_data: task_data, # Assume task_data contains the next node key | |
# { | |
# # "ocr": "ocr", | |
# # "text": "text", | |
# "kg": "kg", | |
# "summary": "summary", | |
# # "pdf2epub2md": "pdf2epub2md", | |
# "finish": END, | |
# # "error": "error_handling", | |
# # "kg": "kg", | |
# # "gpt": "gpt", | |
# # "error": "error_handling", | |
# # "final_step": END | |
# } | |
# ) | |
workflow.add_edge('text', 'summary') | |
workflow.add_edge('summary', 'kg') | |
workflow.add_conditional_edges( | |
"kg", | |
# should_continue_finish, | |
lambda task_data: 'finish', | |
{ | |
"finish": END | |
} | |
) | |
# workflow.add_conditional_edges( | |
# "summary", | |
# should_continue_finish, | |
# { | |
# "finish": END | |
# } | |
# ) | |
chain = workflow.compile() | |
# chain.invoke(task_data) | |
await chain.ainvoke(task_data) | |
# import chunk | |
# import datetime | |
# import json | |
# import os | |
# import psycopg2 | |
# from .doc2kg import process_knowledge_graph | |
# from .upload_processors.pdf2ocr2md import process_upload_pdf_with_ocr | |
# # from .process_upload_llama_index import process_uploaded_file | |
# from .upload_processors.file2text import process_uploaded_file | |
# from .upload_processors.pdf2epub2md import process_uploaded_file as process_uploaded_file_pdf2epub2md | |
# from .utils.tasks import create_task | |
# # from .summary import doc2sumarize | |
# from .doc2kg4 import doc2chunk2kg | |
# from langgraph.graph import Graph, END | |
# GRAPHILE_WORKER_CONNECTION_STRING = os.environ['GRAPHILE_WORKER_CONNECTION_STRING'] | |
# workflow = Graph() | |
# def error_handling_node(task_data): | |
# errored_at , error_json, task_id = task_data['errored_at'], task_data['error_json'], task_data['task_id'] | |
# # # Update the database with the error and the errored time | |
# with psycopg2.connect(GRAPHILE_WORKER_CONNECTION_STRING) as conn: | |
# with conn.cursor() as cursor: | |
# cursor.execute( | |
# """UPDATE fmx.task SET status = 'failed', errored_at = %s, error = %s WHERE id = %s;""", | |
# (errored_at, error_json, task_id) | |
# ) | |
# conn.commit() | |
# return 'end' | |
# async def router(task_data): | |
# def start_node(arg): | |
# # print("start_node called with arg:", arg) | |
# # print("Starting the workflow...") | |
# # Randomly set the condition to simulate different outcomes | |
# return arg | |
# # def process_node_1(arg): | |
# # print("process_node_1 called with arg:", arg) | |
# # print("Executing Process Node 1") | |
# # return {"data": arg} | |
# # def process_node_2(arg): | |
# # # print("process_node_2 called with arg:", arg) | |
# # # print("Executing Process Node 2") | |
# # return {"data": arg} | |
# # Copy code | |
# workflow = Graph() | |
# workflow.add_node("start", start_node) | |
# # workflow.add_node("step2", process_node_1) | |
# # workflow.add_node("step3", process_node_2) | |
# workflow.add_node("text", | |
# lambda task_data: process_uploaded_file(task_data, | |
# uploaded_file_id=task_data['payload']['uploaded_file_id'] | |
# ) | |
# ) | |
# async def process_kg_node(task_data): | |
# return await process_knowledge_graph(task_data) | |
# # When adding the node to the LangGraph | |
# workflow.add_node("kg", process_kg_node) # version 1 | |
# workflow.add_node("finish", END) | |
# # workflow.add_node("kg", doc2chunk2kg) # version 2 | |
# # workflow.add_node("summarize", doc2sumarize) | |
# # workflow.add_node("finish", doc2sumarize) | |
# workflow.set_entry_point("start") | |
# def should_continue(task_data): | |
# # print("should_continue called with arg:", task_data) | |
# # print("Starting the workflow...") | |
# type = task_data['payload']['type'] | |
# if type == 'summarize': | |
# return 'finish' | |
# # Randomly set the condition to simulate different outcomes | |
# # return task_data | |
# # get type | |
# if type == 'text': | |
# return 'text' | |
# workflow.add_conditional_edges( | |
# "start", | |
# should_continue, | |
# # lambda task_data: task_data, # Assume task_data contains the next node key | |
# { | |
# # "ocr": "ocr", | |
# "text": "text", | |
# # "kg": "kg", | |
# # "pdf2epub2md": "pdf2epub2md", | |
# "finish": "finish", | |
# # "kg": "kg", | |
# # "gpt": "gpt", | |
# # "error": "error_handling", | |
# # "final_step": END | |
# } | |
# ) | |
# # workflow.add_edge('step2', 'step3') | |
# # workflow.add_edge('text', 'kg') | |
# workflow.add_edge('text', 'kg') | |
# # workflow.add_edge('finish', 'start') | |
# workflow.add_edge('kg', 'finish') | |
# # workflow.add_edge('summarize', 'step3') | |
# # workflow.add_conditional_edges( | |
# # "kg", | |
# # should_continue, | |
# # # lambda task_data: task_data, # Assume task_data contains the next node key | |
# # { | |
# # # "ocr": "ocr", | |
# # "text": "text", | |
# # "summarize": "summarize", | |
# # # "pdf2epub2md": "pdf2epub2md", | |
# # # "kg": "kg", | |
# # # "gpt": "gpt", | |
# # # "error": "error_handling", | |
# # # "final_step": END | |
# # } | |
# # ) | |
# # workflow.set_finish_point('step3') | |
# chain = workflow.compile() | |
# print(f"##########################################") | |
# # chain.invoke(task_data) | |
# await chain.ainvoke(task_data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment