Skip to content

Instantly share code, notes, and snippets.

@Peilun-Li
Created March 21, 2024 20:57
Show Gist options
  • Save Peilun-Li/a0f26847812e177383a3dc7f17b3d84b to your computer and use it in GitHub Desktop.
Save Peilun-Li/a0f26847812e177383a3dc7f17b3d84b to your computer and use it in GitHub Desktop.
Deepeval async bulk evaluate
# Note: this is developed and tested against deepeval 0.21.00
import asyncio
from collections.abc import Coroutine
from typing import Any, List
import inspect
import pandas as pd
from deepeval.metrics import BaseMetric
from deepeval.test_case import LLMTestCase
# https://gist.github.com/Guiforge/6d82df3e01d343eac960f0f12c0ecdfa
async def gather_limit(
*tasks: Coroutine[None, None, Any],
return_exceptions: bool = False,
max_con: int = 100,
) -> Any:
"""Like asyncio.gather but with the maximum number of concurrent tasks."""
semaphore = asyncio.Semaphore(max_con)
async def sem_task(task: Coroutine[None, None, Any]) -> Any:
async with semaphore:
return await task
return await asyncio.gather(
*(sem_task(task) for task in tasks),
return_exceptions=return_exceptions,
)
def copy_metric(metric):
# Init parameter field names
init_keys = inspect.signature(metric.__class__.__init__).parameters.keys()
# Fields that we don't need to copy
exclude_keys = set(["self"])
copy_kwargs = {}
for key in init_keys:
if key in exclude_keys:
continue
if hasattr(metric, key):
# Some fields such as model are copied by reference here implicitly
copy_kwargs[key] = getattr(metric, key)
return metric.__class__(**copy_kwargs)
async def bulk_eval(test_cases: List[LLMTestCase], metrics: List[BaseMetric], parllelism: int=10):
# Prepare eval tuple (test_case, metric), with metric object copied
eval_tuples = []
for metric in metrics:
# For g-eval, populate evaluation steps first so that the behavior of the metric will stay consistent through copy
# This might better to be moved to the __init__ of GEval metrics class?
if hasattr(metric, "evaluation_steps") and getattr(metric, "evaluation_steps") is None:
print(f"Populating evaluation_steps for {metric.name}")
metric.evaluation_steps = metric._generate_evaluation_steps()
for test_case in test_cases:
eval_tuples.append((test_case, copy_metric(metric)))
tasks = [metric.a_measure(test_case, _show_indicator=True) for test_case, metric in eval_tuples]
# Now run in parallel/async
gather_result = await gather_limit(*tasks, max_con=parllelism)
# Patch the result with the copied metrics object and generate a final dataframe
res = {
"test_case": [x[0] for x in eval_tuples],
"metric_name": [x[1].name for x in eval_tuples],
"metric_obj": [x[1] for x in eval_tuples],
"score": [x for x in gather_result],
}
return pd.DataFrame(res)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment