Skip to content

Instantly share code, notes, and snippets.

Created March 21, 2024 20:57
Show Gist options
  • Save Peilun-Li/a0f26847812e177383a3dc7f17b3d84b to your computer and use it in GitHub Desktop.
Save Peilun-Li/a0f26847812e177383a3dc7f17b3d84b to your computer and use it in GitHub Desktop.
Deepeval async bulk evaluate
# Note: this is developed and tested against deepeval 0.21.00
import asyncio
from import Coroutine
from typing import Any, List
import inspect
import pandas as pd
from deepeval.metrics import BaseMetric
from deepeval.test_case import LLMTestCase
async def gather_limit(
*tasks: Coroutine[None, None, Any],
return_exceptions: bool = False,
max_con: int = 100,
) -> Any:
"""Like asyncio.gather but with the maximum number of concurrent tasks."""
semaphore = asyncio.Semaphore(max_con)
async def sem_task(task: Coroutine[None, None, Any]) -> Any:
async with semaphore:
return await task
return await asyncio.gather(
*(sem_task(task) for task in tasks),
def copy_metric(metric):
# Init parameter field names
init_keys = inspect.signature(metric.__class__.__init__).parameters.keys()
# Fields that we don't need to copy
exclude_keys = set(["self"])
copy_kwargs = {}
for key in init_keys:
if key in exclude_keys:
if hasattr(metric, key):
# Some fields such as model are copied by reference here implicitly
copy_kwargs[key] = getattr(metric, key)
return metric.__class__(**copy_kwargs)
async def bulk_eval(test_cases: List[LLMTestCase], metrics: List[BaseMetric], parllelism: int=10):
# Prepare eval tuple (test_case, metric), with metric object copied
eval_tuples = []
for metric in metrics:
# For g-eval, populate evaluation steps first so that the behavior of the metric will stay consistent through copy
# This might better to be moved to the __init__ of GEval metrics class?
if hasattr(metric, "evaluation_steps") and getattr(metric, "evaluation_steps") is None:
print(f"Populating evaluation_steps for {}")
metric.evaluation_steps = metric._generate_evaluation_steps()
for test_case in test_cases:
eval_tuples.append((test_case, copy_metric(metric)))
tasks = [metric.a_measure(test_case, _show_indicator=True) for test_case, metric in eval_tuples]
# Now run in parallel/async
gather_result = await gather_limit(*tasks, max_con=parllelism)
# Patch the result with the copied metrics object and generate a final dataframe
res = {
"test_case": [x[0] for x in eval_tuples],
"metric_name": [x[1].name for x in eval_tuples],
"metric_obj": [x[1] for x in eval_tuples],
"score": [x for x in gather_result],
return pd.DataFrame(res)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment