Skip to content

Instantly share code, notes, and snippets.

@yaniv-aknin
Last active July 1, 2024 11:46
Show Gist options
  • Save yaniv-aknin/00669984a6ed4d052a65fbe7ae678993 to your computer and use it in GitHub Desktop.
Save yaniv-aknin/00669984a6ed4d052a65fbe7ae678993 to your computer and use it in GitHub Desktop.
Langfuse through child processes

I made this while working on Langfuse integration and due to langfuse/langfuse#2495.

tl;dr is that I couldn't get Langfuse to flush() an observation while it's still running, and that has ramifications in long running observations or when observations might terminate unexpectedly.

In this example, I replace the observe() decorator with a version that will first populate the trace from the environment or explicit arguments passed to it and only then invoke the decorator with the pre-populated trace. This way there's no need update_current_trace() after the fact and no need to flush() it.

#!/usr/bin/env python3
import subprocess
import sys
import os
import functools
import dotenv
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
dotenv.load_dotenv()
def observe_with_custom_trace(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
client = Langfuse()
env_mapping = (
("LANGFUSE_TRACE_ID_HINT", "id"),
("LANGFUSE_TRACE_NAME_HINT", "name"),
("LANGFUSE_USER_ID_HINT", "user_id"),
("LANGFUSE_SESSION_ID_HINT", "session_id"),
)
trace_kwargs = {
kwarg: os.environ[env_var]
for env_var, kwarg in env_mapping
if env_var in os.environ
}
trace_kwargs.update(kwargs.pop("langfuse_trace_kwargs", {}))
trace = client.trace(**trace_kwargs)
client.flush()
kwargs["langfuse_observation_id"] = trace.id
return observe(name=trace_kwargs.get("name"))(func)(*args, **kwargs)
return wrapper
@observe()
def child_span():
input("Press Enter to exit")
@observe_with_custom_trace
def child_main():
print("Child process", os.getpid())
child_span()
@observe_with_custom_trace
def parent_main():
print("Parent process", os.getpid())
print("URL:", langfuse_context.get_current_trace_url())
subprocess.run(
["python3", sys.argv[0], "child"],
env={
"LANGFUSE_TRACE_ID_HINT": langfuse_context.get_current_trace_id(),
"LANGFUSE_TRACE_NAME_HINT": "greatname",
}
| os.environ,
)
def main():
if len(sys.argv) != 2:
print("Usage: lfml parent|child")
return
if sys.argv[1] == "parent":
return parent_main(langfuse_trace_kwargs={"name": "awesomename"})
child_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment