Benchmark
The Benchmark class is the core orchestrator for running multi-agent system experiments. It manages the complete execution lifecycle—from setting up environments and agents to running tasks and evaluating results. By implementing a few abstract methods, you define your benchmark's specific logic while the framework handles task iteration, data collection, and reproducibility.
Think of it as the experiment controller: you describe what to set up and how agents should run, and the framework handles when and how many times everything executes.
Each method below is documented starting with the formal method definition and a following How to use section that is more educational and should help getting started.
Benchmark
Bases: ABC
Abstract base class for orchestrating multi-agent system execution and evaluation.
Benchmark provides a structured framework for running reproducible agent experiments across a collection of tasks. It manages the complete execution lifecycle: environment initialization, agent instantiation, task execution, and performance evaluation. The class enforces a three-stage pattern (setup, run, evaluate) while handling task iteration, callback orchestration, message history collection, and result aggregation.
How to use
- Subclass Benchmark and implement the abstract methods to define your benchmark logic
- Implement setup methods to specify how environments, agents, and evaluators are created
- Implement run_agents to define how your multi-agent system solves a single task
- Instantiate your benchmark with agent configuration
- Call benchmark.run(tasks) to execute the complete benchmark
Example workflow:
class MyBenchmark(Benchmark):
def setup_environment(self, agent_data, task):
return MyEnvironment(task.environment_data)
def setup_agents(self, agent_data, environment, task, user):
agent = MyAgent(model=agent_data["model"])
agent_adapter = AgentAdapter(agent, "agent")
return [agent_adapter], {"agent": agent_adapter}
def run_agents(self, agents, task, environment, query):
return agents[0].run(query)
# ... implement other abstract methods
# Run the benchmark
config = {"model": "gpt-4", "temperature": 0.7}
benchmark = MyBenchmark()
reports = benchmark.run(tasks=my_tasks, agent_data=config)
# Retry failed tasks elegantly (graceful task failure handling by default)
failed_tasks = benchmark.get_failed_tasks()
if len(failed_tasks) > 0:
retry_reports = benchmark.run(tasks=failed_tasks, agent_data=config)
# Parallel execution for I/O-bound workloads
benchmark = MyBenchmark(num_workers=4)
reports = benchmark.run(tasks=my_tasks, agent_data=config)
# Or use strict mode for debugging (fail fast)
benchmark = MyBenchmark(
fail_on_task_error=True,
fail_on_evaluation_error=True,
fail_on_setup_error=True
)
reports = benchmark.run(tasks=my_tasks, agent_data=config)
The framework handles task iteration, repetitions for statistical robustness, callback notifications, and result collection. You focus on defining agent behavior and evaluation criteria for your specific domain.
Note
Task timeout (via TaskProtocol.timeout_seconds) is checked at phase boundaries
(after setup, before execution, before evaluation). Timeout detection during agent
execution is not yet supported.
seed_generator
property
seed_generator: SeedGenerator
The seed generator for this benchmark.
The seed generator is configured at benchmark initialization via the seed
or seed_generator parameters. When seed=None (the default), the generator's
derive_seed() method returns None, effectively disabling seeding while
maintaining a uniform interface.
| RETURNS | DESCRIPTION |
|---|---|
SeedGenerator
|
The root |
usage
property
usage: Usage
Running usage total across all task repetitions.
Queryable at any time, including while the benchmark is still running. Returns the grand total of all usage collected so far.
usage_by_component
property
usage_by_component: Dict[str, Usage]
Per-component running usage totals across all repetitions.
Keys are registry keys (e.g., "models:main_model").
__init__
__init__(
callbacks: Optional[List[BenchmarkCallback]] = None,
n_task_repeats: int = 1,
max_invocations: int = 1,
num_workers: int = 1,
fail_on_setup_error: bool = False,
fail_on_task_error: bool = False,
fail_on_evaluation_error: bool = False,
progress_bar: bool | str = True,
seed: Optional[int] = None,
seed_generator: Optional[SeedGenerator] = None,
)
Initialize a benchmark with execution configuration.
| PARAMETER | DESCRIPTION |
|---|---|
callbacks
|
Optional list of callback handlers for monitoring execution, tracing messages, or collecting custom metrics during the benchmark run.
TYPE:
|
n_task_repeats
|
Number of times to repeat each task. Useful for measuring variance in stochastic agent behaviors. Must be at least 1.
TYPE:
|
max_invocations
|
Maximum number of agent invocations per task in the execution loop. For simple benchmarks, the default (1) means agents run once per task. For interactive benchmarks with user feedback loops, set higher (e.g., 5 for MACS) to allow multiple agent-user interaction rounds.
TYPE:
|
num_workers
|
Number of parallel task executions. Default 1 (sequential). Set higher for I/O-bound workloads (e.g., LLM API calls). This controls the ThreadPoolExecutor worker count for concurrent task processing.
TYPE:
|
fail_on_setup_error
|
If True, raise exceptions when setup fails (environment, agents, evaluators). If False (default), catch exceptions during setup and record them in the report with status SETUP_FAILED. This allows the benchmark to continue running remaining tasks even if setup fails.
TYPE:
|
fail_on_task_error
|
If True, raise exceptions when task execution fails. If False (default), catch exceptions during task execution and record them in the report with status TASK_EXECUTION_FAILED. This allows the benchmark to continue running remaining tasks.
TYPE:
|
fail_on_evaluation_error
|
If True, raise exceptions when evaluation fails. If False (default), catch exceptions during evaluation and record them in the report with status EVALUATION_FAILED. This allows the benchmark to continue even if evaluation logic has errors.
TYPE:
|
progress_bar
|
Controls progress bar during benchmark runs. When enabled, one of the
ProgressBarCallback will be added. Options:
- True (default): Automatically adds a TqdmProgressBarCallback if no ProgressBarCallback
is already present in the
TYPE:
|
seed
|
Global seed for reproducible benchmark runs. When provided, a
TYPE:
|
seed_generator
|
Custom seed generator for advanced use cases. If provided, takes
precedence over the
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If n_task_repeats is less than 1. |
ValueError
|
If both |
How to use
Configure execution settings at initialization:
# Sequential execution (default)
benchmark = MyBenchmark()
# Parallel execution for faster I/O-bound workloads
benchmark = MyBenchmark(num_workers=4)
# Strict mode - fail fast on any error (useful for debugging)
benchmark = MyBenchmark(
fail_on_task_error=True,
fail_on_evaluation_error=True,
fail_on_setup_error=True
)
# Progress bar configuration
benchmark = MyBenchmark() # Default: adds TqdmProgressBarCallback
benchmark = MyBenchmark(progress_bar=True) # Explicit: TqdmProgressBarCallback
benchmark = MyBenchmark(progress_bar="rich") # Uses RichProgressBarCallback
benchmark = MyBenchmark(progress_bar=False) # No automatic callback
# Custom callbacks
benchmark = MyBenchmark(callbacks=[MyCustomProgressBarCallback()])
add_callback
add_callback(callback: BenchmarkCallback) -> None
Register a callback handler to monitor benchmark execution.
| PARAMETER | DESCRIPTION |
|---|---|
callback
|
A BenchmarkCallback instance that will receive execution events.
TYPE:
|
How to use
Callbacks receive notifications at key lifecycle points for tracing, progress tracking,
or custom metrics collection. See BenchmarkCallback
for available hooks and their signatures.
from maseval.core.callbacks import MessageTracingCallback
benchmark = MyBenchmark(tasks=tasks, agent_data=config)
benchmark.add_callback(MessageTracingCallback(output_dir="logs"))
results = benchmark.run()
clear_registry
clear_registry() -> None
Clear the component registry after a task repetition completes.
This method is called automatically by run() after each task repetition
to ensure components are not carried over between repetitions. The
reports list persists across all repetitions for aggregated analysis.
collect_all_configs
collect_all_configs() -> Dict[str, Any]
Collect configuration from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition completes
and before evaluation begins. It gathers comprehensive configuration from all registered
components (agents, models, tools, simulators, callbacks, etc.) for that specific
repetition. After collection, the registry is cleared for the next repetition.
The collected configs are stored in benchmark.reports list along with traces
for persistent access across all task repetitions.
Output fields:
metadata- Collection timestamp and thread infoagents- Dict mapping agent names to their config (settings, parameters)models- Dict mapping model names to their config (model IDs, parameters)tools- Dict mapping tool names to their config (specifications, settings)simulators- Dict mapping simulator names to their config (parameters, templates)callbacks- Dict mapping callback names to their config (settings)environment- Direct config from the environment (not nested), orNoneif not presentuser- Direct config from the user simulator (not nested), orNoneif not presentother- Dict for any other registered componentsbenchmark- Benchmark-level configuration (git, system, packages)
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing configuration from all registered components. |
How to use
This method is called automatically by run() after each task repetition:
# Automatic collection (recommended)
results = benchmark.run()
# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
# Agents is a dict: agent_name -> config
print(f"Agent config: {report['config']['agents']['my_agent']}")
# Environment and user are direct (not nested)
print(f"Environment config: {report['config']['environment']}")
print(f"User config: {report['config']['user']}")
# Benchmark-level config
print(f"Git commit: {report['config']['benchmark']['git']['commit_hash']}")
The collected configs are available in the results for reproducibility analysis.
collect_all_traces
collect_all_traces() -> Dict[str, Any]
Collect execution traces from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition completes
and before evaluation begins. It gathers comprehensive traces from all registered
components (agents, models, tools, simulators, callbacks, etc.) for that specific
repetition. After collection, the registry is cleared for the next repetition.
The collected traces are stored in benchmark.reports list along with configs
for persistent access across all task repetitions.
Output fields:
metadata- Collection timestamp and thread infoagents- Dict mapping agent names to their traces (messages, execution data)models- Dict mapping model names to their traces (API calls, timing, errors)tools- Dict mapping tool names to their traces (invocations, parameters)simulators- Dict mapping simulator names to their traces (attempts, outcomes)callbacks- Dict mapping callback names to their traces (custom data)environment- Direct traces from the environment (not nested), orNoneif not presentuser- Direct traces from the user simulator (not nested), orNoneif not presentother- Dict for any other registered components
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing execution traces from all registered components. |
How to use
This method is called automatically by run() after each task repetition:
# Automatic collection (recommended)
results = benchmark.run()
# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
# Agents is a dict: agent_name -> traces
print(f"Agent messages: {report['traces']['agents']['my_agent']}")
# Environment and user are direct (not nested)
print(f"Environment state: {report['traces']['environment']}")
print(f"User interactions: {report['traces']['user']}")
The collected traces are passed to the evaluator's evaluate() method
and stored in benchmark.reports for later analysis.
collect_all_usage
collect_all_usage() -> Dict[str, Any]
Collect usage from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition
completes. It gathers usage from all registered UsageTrackableMixin
components and also accumulates into persistent running totals accessible
via usage and usage_by_component.
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing usage from all registered components. |
evaluate
abstractmethod
evaluate(
evaluators: Sequence[Evaluator],
agents: Dict[str, AgentAdapter],
final_answer: Any,
traces: Dict[str, Any],
) -> List[Dict[str, Any]]
Execute evaluators to score agent performance on the task.
This method calls each evaluator with the collected execution data to produce evaluation results. The framework automatically collects comprehensive execution traces from all registered components before calling this method.
| PARAMETER | DESCRIPTION |
|---|---|
evaluators
|
The evaluator instances created by setup_evaluators().
TYPE:
|
agents
|
Dictionary of all agent instances, useful if evaluators need to query agent state beyond message history.
TYPE:
|
final_answer
|
The return value from run_agents() - typically the final output, answer, or result from the agent system's execution.
TYPE:
|
traces
|
Comprehensive execution traces from all registered components including: - agents: Dict mapping agent names to their traces (messages, execution metadata) - models: Dict mapping model names to traces (API calls, timing, token usage, errors) - tools: Dict mapping tool names to traces (invocations with parameters and results) - simulators: Dict mapping simulator names to traces (attempts and outcomes) - callbacks: Dict mapping callback names to traces (custom callback data) - environment: Direct environment traces (state, tools) - not nested in a dict - user: Direct user simulator traces (interactions) - not nested in a dict - metadata: Collection timestamp and thread info Plus any other registered component categories.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[Dict[str, Any]]
|
List of evaluation result dictionaries, typically one per evaluator. Each dict should |
List[Dict[str, Any]]
|
contain metrics, scores, or judgments about the agent's performance. |
How to use
Evaluators can access comprehensive execution data including final answers and traces:
def evaluate(self, evaluators, agents, final_answer, traces):
results = []
for evaluator in evaluators:
# Access agent message histories from traces
agent_traces = {name: trace_data.get('messages', MessageHistory())
for name, trace_data in traces.get('agents', {}).items()}
# Access additional execution data
model_traces = traces.get("models", {})
tool_traces = traces.get("tools", {})
environment_traces = traces.get("environment") # Direct, not nested
user_traces = traces.get("user") # Direct, not nested
# Pass comprehensive data to evaluator
result = evaluator(
final_answer=final_answer,
traces=agent_traces,
model_calls=model_traces,
tool_usage=tool_traces,
environment_state=environment_traces,
user_interactions=user_traces
)
results.append(result)
return results
execution_loop
execution_loop(
agents: Sequence[AgentAdapter],
task: Task,
environment: Environment,
user: Optional[User],
) -> Any
Execute agents with optional user interaction loop.
This method orchestrates the agent-user interaction pattern. When a user is
present, the user initiates the conversation using user.get_initial_query().
If no user is present, task.query is used as the initial query.
Interaction Flow
By default, agents execute once (max_invocations=1). For multi-turn
interaction, set self.max_invocations > 1 in your benchmark's __init__.
The loop continues until max_invocations is reached or user.is_done()
returns True (e.g., max turns reached or stop token detected).
Note
Override this method in your benchmark subclass to implement custom interaction patterns (e.g., agent-initiated conversations, different termination conditions, or specialized query routing).
| PARAMETER | DESCRIPTION |
|---|---|
agents
|
Agents to execute (typically the orchestrator).
TYPE:
|
task
|
The task being solved.
TYPE:
|
environment
|
The environment providing tools and state.
TYPE:
|
user
|
Optional user simulator. If provided, the user initiates and drives
the conversation. If None, a single agent execution with
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
Final answer from the last agent execution. |
Example
For interactive benchmarks, enable multi-turn interaction::
def __init__(self, ...):
super().__init__(...)
self.max_invocations = 5 # Up to 5 agent-user exchanges
get_failed_tasks
get_failed_tasks(
status_filter: Optional[
Union[
TaskExecutionStatus, List[TaskExecutionStatus]
]
] = None,
reports: Optional[List[Dict[str, Any]]] = None,
) -> SequentialTaskQueue
Get tasks that failed during benchmark execution.
This method retrieves failed tasks based on their execution status, useful for debugging, retry logic, or failure analysis.
| PARAMETER | DESCRIPTION |
|---|---|
status_filter
|
Filter by specific failure status(es). If None, returns all failed tasks (any status except SUCCESS). Can be a single TaskExecutionStatus or a list of them. Examples: - TaskExecutionStatus.TASK_EXECUTION_FAILED: Only tasks that failed during execution - TaskExecutionStatus.EVALUATION_FAILED: Only tasks where evaluation failed - [TaskExecutionStatus.TASK_EXECUTION_FAILED, TaskExecutionStatus.SETUP_FAILED]: Tasks that failed during execution or setup
TYPE:
|
reports
|
Optional list of reports to analyze. If None, uses the reports from the last run() call. This allows analyzing externally stored or modified reports.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SequentialTaskQueue
|
SequentialTaskQueue containing the failed tasks. Empty if no failures match the filter. |
| RAISES | DESCRIPTION |
|---|---|
RuntimeError
|
If reports is None and run() has not been executed yet. |
How to use
# Run benchmark
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)
# Get all failed tasks (from internal state)
failed = benchmark.get_failed_tasks()
print(f"Failed: {len(failed)}/{len(benchmark.tasks)} tasks")
# Or work with returned reports (safe from internal state changes)
failed = benchmark.get_failed_tasks(reports=reports)
# Get only tasks that failed during execution (not evaluation)
execution_failures = benchmark.get_failed_tasks(
TaskExecutionStatus.TASK_EXECUTION_FAILED,
reports=reports
)
# Get setup and execution failures
critical_failures = benchmark.get_failed_tasks(
status_filter=[
TaskExecutionStatus.SETUP_FAILED,
TaskExecutionStatus.TASK_EXECUTION_FAILED
],
reports=reports
)
# Retry failed tasks elegantly - this is the key use case!
if len(failed) > 0:
retry_reports = benchmark.run(tasks=failed)
# Or more concisely
reports = benchmark.run(tasks=tasks)
retry_reports = benchmark.run(tasks=benchmark.get_failed_tasks())
get_model_adapter
abstractmethod
get_model_adapter(
model_id: str, **kwargs: Any
) -> ModelAdapter
Provide a ModelAdapter for benchmark components that require LLM access.
Many benchmark components beyond the agents themselves require access to language models. Common examples include:
- Tool simulators: Simulating tool responses when real APIs aren't available
- User simulators: Generating realistic user responses in multi-turn dialogues
- Judges/Evaluators: Using LLMs to assess agent performance against criteria
- Reward models: Computing scores for reinforcement learning
This method centralizes model provisioning, giving you control over which models are used throughout the benchmark. Implement this to return a configured ModelAdapter for the requested model.
| PARAMETER | DESCRIPTION |
|---|---|
model_id
|
The model identifier to use (e.g., "gemini-2.5-flash", "openrouter/google/gemini-2.5-flash", "gpt-4o"). This is passed by the benchmark when setting up components that need model access.
TYPE:
|
**kwargs
|
Additional arguments for adapter creation or registration. Common kwargs: - register_category: Category for trace registration (e.g., "models") - register_name: Name for trace registration (e.g., "evaluator_user_gsr")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ModelAdapter
|
A ModelAdapter instance configured for the specified model. For proper tracing, |
ModelAdapter
|
return a fresh adapter for each call rather than reusing instances. You can |
ModelAdapter
|
still share the underlying API client for efficiency. |
How to use
For proper tracing, register the adapter after creation using the kwargs:
def get_model_adapter(self, model_id: str, **kwargs: Any) -> ModelAdapter:
adapter = GoogleGenAIModelAdapter(self.client, model_id=model_id)
# Register for tracing if registration info provided
category = kwargs.get("register_category", "models")
name = kwargs.get("register_name", model_id)
self.register(category, name, adapter)
return adapter
The benchmark calls this method when setting up tools, user simulators, and evaluators. Each call creates a fresh adapter with its own trace log.
register
register(
category: str,
name: str,
component: RegisterableComponent,
) -> RegisterableComponent
Register a component for comprehensive trace and configuration collection.
All core MASEval components (AgentAdapter, ModelAdapter, Environment, User, LLMSimulator, BenchmarkCallback) inherit from TraceableMixin and/or ConfigurableMixin, and are automatically registered for both trace and configuration collection before evaluation.
Note: Most components are automatically registered when returned from
setup methods (setup_environment, setup_user, setup_agents). You only
need to manually register additional components like models, simulators, or
tools that aren't automatically captured.
| PARAMETER | DESCRIPTION |
|---|---|
category
|
Component category (e.g., "agents", "models", "tools", "simulators", "callbacks", "user", "environment", "seeding"). Use plural form to match the structure in collect_all_traces() and collect_all_configs().
TYPE:
|
name
|
Unique identifier for this component within its category
TYPE:
|
component
|
Any object inheriting from TraceableMixin and/or ConfigurableMixin
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RegisterableComponent
|
The component (for chaining convenience) |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the component is already registered under a different name |
How to use
Most components are auto-registered. Manual registration is only needed for additional components:
def setup_agents(self, agent_data, environment, task, user):
# Create model (needs manual registration)
model = MyModelAdapter(...)
self.register("models", "main_model", model)
# Create agent (auto-registered when returned)
agent = MyAgent(model=model)
agent_adapter = AgentAdapter(agent, "agent1")
# Environment and user are also auto-registered
return [agent_adapter], {"agent1": agent_adapter}
Traces and configs are automatically collected before evaluation via
collect_all_traces() and collect_all_configs() which are called
internally by the run() method.
run
run(
tasks: Union[
Task, BaseTaskQueue, Iterable[Union[Task, dict]]
],
agent_data: Dict[str, Any] | Iterable[Dict[str, Any]],
) -> List[Dict[str, Any]]
Initialize and execute the complete benchmark loop across all tasks.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
Task source for execution. Can be: - A single Task object - A BaseTaskQueue (SequentialTaskQueue, PriorityTaskQueue, or custom AdaptiveTaskQueue) - An iterable of Task objects or dicts that will be converted to Tasks When a BaseTaskQueue is provided, it controls the task ordering. AdaptiveTaskQueue subclasses are automatically registered as callbacks to receive task completion notifications.
TYPE:
|
agent_data
|
Configuration for agents. Either a single dict applied to all tasks, or an iterable of dicts with one configuration per task. Agent data typically includes model parameters, agent architecture details, and tool specifications.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[Dict[str, Any]]
|
List of report dictionaries, one per task repetition. Each report contains: |
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If agent_data length doesn't match number of tasks (when agent_data is an iterable). |
How to use
This is the framework's main orchestration method that runs your entire benchmark. It iterates through all tasks, handles repetitions, and manages the three-stage lifecycle for each execution. You don't implement this method—instead, you call it to start the benchmark after implementing the setup and execution methods.
By default, the benchmark will continue executing remaining tasks even if some fail.
You can change this behavior by setting fail_on_task_error=True,
fail_on_evaluation_error=True, or fail_on_setup_error=True when instantiating
the benchmark. Each task execution returns a status indicating success or the specific
failure type (see TaskExecutionStatus).
For each task execution, the framework:
- Calls your setup methods to initialize components
- Calls your
run_agents()method to execute the task - Collects message histories and calls evaluators
- Stores results and triggers callbacks
Pseudocode structure:
for task in tasks:
for repeat in range(n_task_repeats):
# Setup stage
environment = setup_environment(agent_data, task)
user = setup_user(agent_data, environment, task)
agents_to_run, agents_dict = setup_agents(agent_data, environment, task, user)
evaluators = setup_evaluators(environment, task, agents_to_run, user)
# Run stage (execution_loop handles multi-turn if user exists)
agents_output = execution_loop(agents_to_run, task, environment, user)
# Evaluate stage
traces = collect_message_histories(agents_dict)
eval_results = evaluate(evaluators, traces, agents_dict)
# Store results
store_result(task_id, traces, eval_results)
Callback hooks are triggered at these points:
- on_run_start: Before processing any tasks
- on_task_start: Before processing a task (once per task, not per repeat)
- on_task_repeat_start: Before each repetition of a task
- on_task_repeat_end: After each repetition completes
- on_task_end: After all repetitions of a task complete
- on_run_end: After all tasks complete
# Typical usage
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)
# Analyze results
for report in reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}: {report['eval']}")
print(f"Config: {report['config']}")
print(f"Traces: {report['traces']}")
# Parallel execution with 4 workers
benchmark = MyBenchmark(num_workers=4)
reports = benchmark.run(tasks=tasks, agent_data=config)
# Single agent config for all tasks
reports = benchmark.run(tasks=tasks, agent_data={"model": "gpt-4"})
# Task-specific agent configs (must match task count)
reports = benchmark.run(
tasks=tasks,
agent_data=[
{"model": "gpt-4", "difficulty": "easy"},
{"model": "gpt-4", "difficulty": "hard"},
]
)
# Priority-based execution
from maseval.core.task import PriorityTaskQueue
for task in tasks:
task.protocol.priority = compute_priority(task)
queue = PriorityTaskQueue(tasks)
reports = benchmark.run(tasks=queue, agent_data=config)
# Adaptive queue (auto-registered as callback)
queue = MyAdaptiveTaskQueue(tasks)
reports = benchmark.run(tasks=queue) # queue receives on_task_complete callbacks
run_agents
abstractmethod
run_agents(
agents: Sequence[AgentAdapter],
task: Task,
environment: Environment,
query: str,
) -> Any
Execute the agent system to solve a single task instance.
This method is called once per task repetition by the framework's run() loop.
| PARAMETER | DESCRIPTION |
|---|---|
agents
|
Sequence of agents to execute (typically just the orchestrator or main agent).
TYPE:
|
task
|
The Task object with the query and any metadata needed for execution.
TYPE:
|
environment
|
The environment instance providing tools and state.
TYPE:
|
query
|
The query string to pass to agents. For single-turn benchmarks this is typically task.query. For multi-turn with users, this may be an initial prompt or simulated user response.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
The final answer or result from the agent system's execution. This could be: |
Any
|
|
Any
|
|
Any
|
|
Any
|
|
Note
|
Message traces are captured automatically through the tracing system and
TYPE:
|
Any
|
passed to evaluate() via the traces parameter. Do NOT return message histories here. |
How to use
This is where you implement your multi-agent system logic for solving one task, one time. You define how your agents interact, communicate, and collaborate to complete the task.
The agents parameter contains only the primary agents to run (from agents_to_run in setup_agents). Worker agents called indirectly through an orchestrator do not appear here.
Note: This method handles a single task execution. The framework's run() method manages
task iteration, repetitions, and the complete benchmark lifecycle.
def run_agents(self, agents, task, environment, query):
# Simple single-agent execution - returns final answer string
orchestrator = agents[0]
final_answer = orchestrator.run(query)
return final_answer
# Or for multiple agents returning a list of answers:
def run_agents(self, agents, task, environment, query):
answers = []
for agent in agents:
answer = agent.run(query)
answers.append(answer)
return answers
setup_agents
abstractmethod
setup_agents(
agent_data: Dict[str, Any],
environment: Environment,
task: Task,
user: Optional[User],
seed_generator: SeedGenerator,
) -> Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]
Instantiate and configure the agent system for a task.
Note: All agents in the returned agents_dict are automatically registered
for tracing. You don't need to manually call register() for them. However, you
should manually register models, simulators, or other components used by agents.
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Configuration dict containing agent specifications, model parameters, and tool assignments for this task.
TYPE:
|
environment
|
The initialized environment providing tools to the agents.
TYPE:
|
task
|
The Task object with query and metadata.
TYPE:
|
user
|
Optional user simulator for agent-user interactions.
TYPE:
|
seed_generator
|
Seed generator for deriving deterministic seeds for agents and their
models. Use
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Sequence[AgentAdapter]
|
A tuple of (agents_to_run, agents_dict) where: |
Dict[str, AgentAdapter]
|
|
Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]
|
|
How to use
This method constructs your agent architecture—single agent, multiple collaborative agents, or an orchestrator managing workers. Each agent is wrapped in AgentAdapter for uniform message history tracking.
The dual return structure serves different purposes:
- agents_to_run: Only agents directly invoked in run_agents() (typically the orchestrator)
- agents_dict: All agents in the system for message history collection from workers called indirectly through the orchestrator
def setup_agents(self, agent_data, environment, task, user, seed_generator):
# Use child() for logical paths like "agents/experimental"
# derive_seed() returns None if seeding is disabled
agent_gen = seed_generator.child("agents")
# Vary experimental agent per rep, keep baseline constant
experimental_seed = agent_gen.derive_seed("experimental", per_repetition=True)
baseline_seed = agent_gen.derive_seed("baseline", per_repetition=False)
# For worker agents, nest further: "agents/workers/analyst"
worker_gen = agent_gen.child("workers")
analyst_seed = worker_gen.derive_seed("analyst")
# Create agents with seeds (model adapters accept Optional[int])
model = self.get_model_adapter(model_id, seed=experimental_seed)
# ... create agents ...
return [orchestrator_adapter], all_agents
setup_environment
abstractmethod
setup_environment(
agent_data: Dict[str, Any],
task: Task,
seed_generator: SeedGenerator,
) -> Environment
Create and initialize the environment for a task.
This method is called once per task execution to create a fresh environment with the task's initial conditions.
Note: The returned environment is automatically registered for tracing.
You don't need to manually call register() for it.
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Configuration dict containing agent specifications, model parameters, and other settings that may influence environment setup (e.g., framework type).
TYPE:
|
task
|
The Task object containing environment_data, query, and metadata needed to construct the environment.
TYPE:
|
seed_generator
|
Seed generator for deriving deterministic seeds for environment
components (tools, simulators, etc.). Use
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Environment
|
An Environment instance with initialized state and tools for this specific task. |
How to use
The environment encapsulates task state and provides tools/APIs that agents can use. Your implementation should:
- Extract environment state from task.environment_data
- Initialize any databases, simulators, or API clients
- Create and configure tools that agents can invoke
- Set up domain-specific state (inventory, user profiles, etc.)
- Optionally use agent_data for framework-specific tool initialization
- Use seed_generator to derive seeds for tool simulators
def setup_environment(self, agent_data, task, seed_generator):
env = TravelEnvironment(task.environment_data)
# Use nested child() for logical paths like "environment/tools/weather"
# derive_seed() returns None if seeding is disabled
env_gen = seed_generator.child("environment")
tools_gen = env_gen.child("tools")
for tool in env.tools:
tool_seed = tools_gen.derive_seed(tool.name) # Optional[int]
tool_model = self.get_model_adapter(model_id, seed=tool_seed)
tool.set_simulator(tool_model)
return env
The environment is automatically registered for tracing when returned.
setup_evaluators
abstractmethod
setup_evaluators(
environment: Environment,
task: Task,
agents: Sequence[AgentAdapter],
user: Optional[User],
seed_generator: SeedGenerator,
) -> Sequence[Evaluator]
Create evaluators to assess agent performance on a task.
| PARAMETER | DESCRIPTION |
|---|---|
environment
|
The environment instance with final state after agent execution.
TYPE:
|
task
|
The Task object with evaluation criteria in evaluation_data.
TYPE:
|
agents
|
The agents that will execute the task (useful for context in evaluation).
TYPE:
|
user
|
Optional user simulator, if evaluation needs user interaction data.
TYPE:
|
seed_generator
|
Seed generator for deriving deterministic seeds for evaluators
that use LLMs (e.g., LLM judges).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Sequence[Evaluator]
|
A sequence of Evaluator instances that will be called with execution traces. |
How to use
Evaluators judge whether agents successfully completed the task or satisfied specific criteria. Multiple evaluators can measure different performance aspects (accuracy, efficiency, conversation quality, etc.).
If an evaluator encounters an unexpected condition, prefer raising the exception.
The benchmark runner will enforce the configured policy through
fail_on_evaluation_error (fail-fast when True, mark task as
evaluation_failed and continue when False).
def setup_evaluators(self, environment, task, agents, user, seed_generator):
# Use child() to create logical namespace - results in "evaluators/judge"
# derive_seed() returns None if seeding is disabled
eval_gen = seed_generator.child("evaluators")
judge_seed = eval_gen.derive_seed("judge") # Optional[int]
return [
SuccessEvaluator(task.evaluation_data["gold_answer"]),
LLMJudgeEvaluator(model=self.get_model_adapter(model_id, seed=judge_seed))
]
setup_user
setup_user(
agent_data: Dict[str, Any],
environment: Environment,
task: Task,
seed_generator: SeedGenerator,
) -> Optional[User]
Create an optional user simulator for interactive tasks.
This method is optional. Return None if your benchmark does not require user simulation.
Note: The returned user is automatically registered for tracing.
You don't need to manually call register() for it.
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Configuration dict containing agent specifications and settings that may influence user simulator setup (e.g., framework type for creating compatible tools).
TYPE:
|
environment
|
The environment instance created for this task.
TYPE:
|
task
|
The Task object with user profile data or scenario information.
TYPE:
|
seed_generator
|
Seed generator for deriving deterministic seeds for the user simulator.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Optional[User]
|
A User instance that can respond to agent queries, or None if not needed. |
How to use
User simulators enable agent-user interactions by responding to queries with preferences, clarifications, or feedback. Useful for benchmarks testing conversational agents or systems requiring user input during execution.
def setup_user(self, agent_data, environment, task, seed_generator):
# Use child() to create logical namespace - results in "simulators/user"
# derive_seed() returns None if seeding is disabled
sim_gen = seed_generator.child("simulators")
user_seed = sim_gen.derive_seed("user") # Optional[int]
user_model = self.get_model_adapter(model_id, seed=user_seed)
return LLMUser(model=user_model, ...)
# Or skip user simulation entirely
def setup_user(self, agent_data, environment, task, seed_generator):
return None
The user is automatically registered for tracing when returned.
TaskExecutionStatus
Bases: Enum
Status of task execution and evaluation.
This enum tracks the execution state of a task through the benchmark lifecycle, enabling graceful failure handling and comprehensive result reporting.
The status distinguishes between errors caused by the agent (agent's fault) and errors caused by the evaluation infrastructure (environment, user simulator). This enables fair scoring by excluding infrastructure failures.
| ATTRIBUTE | DESCRIPTION |
|---|---|
SUCCESS |
Task executed and evaluated successfully.
|
AGENT_ERROR |
Agent violated contract at a boundary (agent's fault, counts against score).
|
ENVIRONMENT_ERROR |
Environment/tool infrastructure failed (not agent's fault, exclude from scoring).
|
USER_ERROR |
User simulator failed (not agent's fault, exclude from scoring).
|
TASK_TIMEOUT |
Task execution exceeded configured timeout (resource constraint).
|
UNKNOWN_EXECUTION_ERROR |
Unclassified execution error (e.g., agent framework internal failure).
|
EVALUATION_FAILED |
Task executed but evaluation raised an exception.
|
SETUP_FAILED |
Setup phase (environment, agents, evaluators) raised an exception.
|
Scoring Guidance
- Include in agent score: SUCCESS, AGENT_ERROR
- Exclude from agent score: ENVIRONMENT_ERROR, USER_ERROR, TASK_TIMEOUT, UNKNOWN_EXECUTION_ERROR
- Handle separately: EVALUATION_FAILED, SETUP_FAILED