Skip to content

Benchmark

The Benchmark class is the core orchestrator for running multi-agent system experiments. It manages the complete execution lifecycle—from setting up environments and agents to running tasks and evaluating results. By implementing a few abstract methods, you define your benchmark's specific logic while the framework handles task iteration, data collection, and reproducibility.

Think of it as the experiment controller: you describe what to set up and how agents should run, and the framework handles when and how many times everything executes.

Each method below is documented starting with the formal method definition and a following How to use section that is more educational and should help getting started.

View source

Benchmark

Bases: ABC

Abstract base class for orchestrating multi-agent system execution and evaluation.

Benchmark provides a structured framework for running reproducible agent experiments across a collection of tasks. It manages the complete execution lifecycle: environment initialization, agent instantiation, task execution, and performance evaluation. The class enforces a three-stage pattern (setup, run, evaluate) while handling task iteration, callback orchestration, message history collection, and result aggregation.

How to use
  1. Subclass Benchmark and implement the abstract methods to define your benchmark logic
  2. Implement setup methods to specify how environments, agents, and evaluators are created
  3. Implement run_agents to define how your multi-agent system solves a single task
  4. Instantiate your benchmark with agent configuration
  5. Call benchmark.run(tasks) to execute the complete benchmark

Example workflow:

class MyBenchmark(Benchmark):
    def setup_environment(self, agent_data, task):
        return MyEnvironment(task.environment_data)

    def setup_agents(self, agent_data, environment, task, user):
        agent = MyAgent(model=agent_data["model"])
        agent_adapter = AgentAdapter(agent, "agent")
        return [agent_adapter], {"agent": agent_adapter}

    def run_agents(self, agents, task, environment, query):
        return agents[0].run(query)

    # ... implement other abstract methods

# Run the benchmark
config = {"model": "gpt-4", "temperature": 0.7}
benchmark = MyBenchmark()
reports = benchmark.run(tasks=my_tasks, agent_data=config)

# Retry failed tasks elegantly (graceful task failure handling by default)
failed_tasks = benchmark.get_failed_tasks()
if len(failed_tasks) > 0:
    retry_reports = benchmark.run(tasks=failed_tasks, agent_data=config)

# Parallel execution for I/O-bound workloads
benchmark = MyBenchmark(num_workers=4)
reports = benchmark.run(tasks=my_tasks, agent_data=config)

# Or use strict mode for debugging (fail fast)
benchmark = MyBenchmark(
    fail_on_task_error=True,
    fail_on_evaluation_error=True,
    fail_on_setup_error=True
)
reports = benchmark.run(tasks=my_tasks, agent_data=config)

The framework handles task iteration, repetitions for statistical robustness, callback notifications, and result collection. You focus on defining agent behavior and evaluation criteria for your specific domain.

Note

Task timeout (via TaskProtocol.timeout_seconds) is checked at phase boundaries (after setup, before execution, before evaluation). Timeout detection during agent execution is not yet supported.

seed_generator property

seed_generator: SeedGenerator

The seed generator for this benchmark.

The seed generator is configured at benchmark initialization via the seed or seed_generator parameters. When seed=None (the default), the generator's derive_seed() method returns None, effectively disabling seeding while maintaining a uniform interface.

RETURNS DESCRIPTION
SeedGenerator

The root SeedGenerator instance.

usage property

usage: Usage

Running usage total across all task repetitions.

Queryable at any time, including while the benchmark is still running. Returns the grand total of all usage collected so far.

usage_by_component property

usage_by_component: Dict[str, Usage]

Per-component running usage totals across all repetitions.

Keys are registry keys (e.g., "models:main_model").

__init__

__init__(
    callbacks: Optional[List[BenchmarkCallback]] = None,
    n_task_repeats: int = 1,
    max_invocations: int = 1,
    num_workers: int = 1,
    fail_on_setup_error: bool = False,
    fail_on_task_error: bool = False,
    fail_on_evaluation_error: bool = False,
    progress_bar: bool | str = True,
    seed: Optional[int] = None,
    seed_generator: Optional[SeedGenerator] = None,
)

Initialize a benchmark with execution configuration.

PARAMETER DESCRIPTION
callbacks

Optional list of callback handlers for monitoring execution, tracing messages, or collecting custom metrics during the benchmark run.

TYPE: Optional[List[BenchmarkCallback]] DEFAULT: None

n_task_repeats

Number of times to repeat each task. Useful for measuring variance in stochastic agent behaviors. Must be at least 1.

TYPE: int DEFAULT: 1

max_invocations

Maximum number of agent invocations per task in the execution loop. For simple benchmarks, the default (1) means agents run once per task. For interactive benchmarks with user feedback loops, set higher (e.g., 5 for MACS) to allow multiple agent-user interaction rounds.

TYPE: int DEFAULT: 1

num_workers

Number of parallel task executions. Default 1 (sequential). Set higher for I/O-bound workloads (e.g., LLM API calls). This controls the ThreadPoolExecutor worker count for concurrent task processing.

TYPE: int DEFAULT: 1

fail_on_setup_error

If True, raise exceptions when setup fails (environment, agents, evaluators). If False (default), catch exceptions during setup and record them in the report with status SETUP_FAILED. This allows the benchmark to continue running remaining tasks even if setup fails.

TYPE: bool DEFAULT: False

fail_on_task_error

If True, raise exceptions when task execution fails. If False (default), catch exceptions during task execution and record them in the report with status TASK_EXECUTION_FAILED. This allows the benchmark to continue running remaining tasks.

TYPE: bool DEFAULT: False

fail_on_evaluation_error

If True, raise exceptions when evaluation fails. If False (default), catch exceptions during evaluation and record them in the report with status EVALUATION_FAILED. This allows the benchmark to continue even if evaluation logic has errors.

TYPE: bool DEFAULT: False

progress_bar

Controls progress bar during benchmark runs. When enabled, one of the ProgressBarCallback will be added. Options: - True (default): Automatically adds a TqdmProgressBarCallback if no ProgressBarCallback is already present in the callbacks argument. - False: Does not automatically add a progress bar callback. Manually provided ProgressBarCallback instances in the callbacks list will still work normally. - "tqdm": Automatically adds a TqdmProgressBarCallback (same as True) - "rich": Automatically adds a RichProgressBarCallback (uses the rich library)

TYPE: bool | str DEFAULT: True

seed

Global seed for reproducible benchmark runs. When provided, a DefaultSeedGenerator is created and passed to setup methods, enabling deterministic seed derivation for all components (agents, tools, user simulators, etc.). Seeds cascade: changing the global seed changes all derived seeds.

TYPE: Optional[int] DEFAULT: None

seed_generator

Custom seed generator for advanced use cases. If provided, takes precedence over the seed parameter. Use this to provide a custom SeedGenerator implementation (e.g., database-backed seeds, different hash algorithms).

TYPE: Optional[SeedGenerator] DEFAULT: None

RAISES DESCRIPTION
ValueError

If n_task_repeats is less than 1.

ValueError

If both seed and seed_generator are provided.

How to use

Configure execution settings at initialization:

# Sequential execution (default)
benchmark = MyBenchmark()

# Parallel execution for faster I/O-bound workloads
benchmark = MyBenchmark(num_workers=4)

# Strict mode - fail fast on any error (useful for debugging)
benchmark = MyBenchmark(
    fail_on_task_error=True,
    fail_on_evaluation_error=True,
    fail_on_setup_error=True
)

# Progress bar configuration
benchmark = MyBenchmark()  # Default: adds TqdmProgressBarCallback
benchmark = MyBenchmark(progress_bar=True)  # Explicit: TqdmProgressBarCallback
benchmark = MyBenchmark(progress_bar="rich")  # Uses RichProgressBarCallback
benchmark = MyBenchmark(progress_bar=False)  # No automatic callback

# Custom callbacks
benchmark = MyBenchmark(callbacks=[MyCustomProgressBarCallback()])

add_callback

add_callback(callback: BenchmarkCallback) -> None

Register a callback handler to monitor benchmark execution.

PARAMETER DESCRIPTION
callback

A BenchmarkCallback instance that will receive execution events.

TYPE: BenchmarkCallback

How to use

Callbacks receive notifications at key lifecycle points for tracing, progress tracking, or custom metrics collection. See BenchmarkCallback for available hooks and their signatures.

from maseval.core.callbacks import MessageTracingCallback

benchmark = MyBenchmark(tasks=tasks, agent_data=config)
benchmark.add_callback(MessageTracingCallback(output_dir="logs"))
results = benchmark.run()

clear_registry

clear_registry() -> None

Clear the component registry after a task repetition completes.

This method is called automatically by run() after each task repetition to ensure components are not carried over between repetitions. The reports list persists across all repetitions for aggregated analysis.

collect_all_configs

collect_all_configs() -> Dict[str, Any]

Collect configuration from all registered components for the current task repetition.

This method is called automatically by run() after each task repetition completes and before evaluation begins. It gathers comprehensive configuration from all registered components (agents, models, tools, simulators, callbacks, etc.) for that specific repetition. After collection, the registry is cleared for the next repetition.

The collected configs are stored in benchmark.reports list along with traces for persistent access across all task repetitions.

Output fields:

  • metadata - Collection timestamp and thread info
  • agents - Dict mapping agent names to their config (settings, parameters)
  • models - Dict mapping model names to their config (model IDs, parameters)
  • tools - Dict mapping tool names to their config (specifications, settings)
  • simulators - Dict mapping simulator names to their config (parameters, templates)
  • callbacks - Dict mapping callback names to their config (settings)
  • environment - Direct config from the environment (not nested), or None if not present
  • user - Direct config from the user simulator (not nested), or None if not present
  • other - Dict for any other registered components
  • benchmark - Benchmark-level configuration (git, system, packages)
RETURNS DESCRIPTION
Dict[str, Any]

Structured dictionary containing configuration from all registered components.

How to use

This method is called automatically by run() after each task repetition:

# Automatic collection (recommended)
results = benchmark.run()

# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
    print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
    # Agents is a dict: agent_name -> config
    print(f"Agent config: {report['config']['agents']['my_agent']}")
    # Environment and user are direct (not nested)
    print(f"Environment config: {report['config']['environment']}")
    print(f"User config: {report['config']['user']}")
    # Benchmark-level config
    print(f"Git commit: {report['config']['benchmark']['git']['commit_hash']}")

The collected configs are available in the results for reproducibility analysis.

collect_all_traces

collect_all_traces() -> Dict[str, Any]

Collect execution traces from all registered components for the current task repetition.

This method is called automatically by run() after each task repetition completes and before evaluation begins. It gathers comprehensive traces from all registered components (agents, models, tools, simulators, callbacks, etc.) for that specific repetition. After collection, the registry is cleared for the next repetition.

The collected traces are stored in benchmark.reports list along with configs for persistent access across all task repetitions.

Output fields:

  • metadata - Collection timestamp and thread info
  • agents - Dict mapping agent names to their traces (messages, execution data)
  • models - Dict mapping model names to their traces (API calls, timing, errors)
  • tools - Dict mapping tool names to their traces (invocations, parameters)
  • simulators - Dict mapping simulator names to their traces (attempts, outcomes)
  • callbacks - Dict mapping callback names to their traces (custom data)
  • environment - Direct traces from the environment (not nested), or None if not present
  • user - Direct traces from the user simulator (not nested), or None if not present
  • other - Dict for any other registered components
RETURNS DESCRIPTION
Dict[str, Any]

Structured dictionary containing execution traces from all registered components.

How to use

This method is called automatically by run() after each task repetition:

# Automatic collection (recommended)
results = benchmark.run()

# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
    print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
    # Agents is a dict: agent_name -> traces
    print(f"Agent messages: {report['traces']['agents']['my_agent']}")
    # Environment and user are direct (not nested)
    print(f"Environment state: {report['traces']['environment']}")
    print(f"User interactions: {report['traces']['user']}")

The collected traces are passed to the evaluator's evaluate() method and stored in benchmark.reports for later analysis.

collect_all_usage

collect_all_usage() -> Dict[str, Any]

Collect usage from all registered components for the current task repetition.

This method is called automatically by run() after each task repetition completes. It gathers usage from all registered UsageTrackableMixin components and also accumulates into persistent running totals accessible via usage and usage_by_component.

RETURNS DESCRIPTION
Dict[str, Any]

Structured dictionary containing usage from all registered components.

evaluate abstractmethod

evaluate(
    evaluators: Sequence[Evaluator],
    agents: Dict[str, AgentAdapter],
    final_answer: Any,
    traces: Dict[str, Any],
) -> List[Dict[str, Any]]

Execute evaluators to score agent performance on the task.

This method calls each evaluator with the collected execution data to produce evaluation results. The framework automatically collects comprehensive execution traces from all registered components before calling this method.

PARAMETER DESCRIPTION
evaluators

The evaluator instances created by setup_evaluators().

TYPE: Sequence[Evaluator]

agents

Dictionary of all agent instances, useful if evaluators need to query agent state beyond message history.

TYPE: Dict[str, AgentAdapter]

final_answer

The return value from run_agents() - typically the final output, answer, or result from the agent system's execution.

TYPE: Any

traces

Comprehensive execution traces from all registered components including: - agents: Dict mapping agent names to their traces (messages, execution metadata) - models: Dict mapping model names to traces (API calls, timing, token usage, errors) - tools: Dict mapping tool names to traces (invocations with parameters and results) - simulators: Dict mapping simulator names to traces (attempts and outcomes) - callbacks: Dict mapping callback names to traces (custom callback data) - environment: Direct environment traces (state, tools) - not nested in a dict - user: Direct user simulator traces (interactions) - not nested in a dict - metadata: Collection timestamp and thread info Plus any other registered component categories.

TYPE: Dict[str, Any]

RETURNS DESCRIPTION
List[Dict[str, Any]]

List of evaluation result dictionaries, typically one per evaluator. Each dict should

List[Dict[str, Any]]

contain metrics, scores, or judgments about the agent's performance.

How to use

Evaluators can access comprehensive execution data including final answers and traces:

def evaluate(self, evaluators, agents, final_answer, traces):
    results = []
    for evaluator in evaluators:
        # Access agent message histories from traces
        agent_traces = {name: trace_data.get('messages', MessageHistory())
                       for name, trace_data in traces.get('agents', {}).items()}

        # Access additional execution data
        model_traces = traces.get("models", {})
        tool_traces = traces.get("tools", {})
        environment_traces = traces.get("environment")  # Direct, not nested
        user_traces = traces.get("user")  # Direct, not nested

        # Pass comprehensive data to evaluator
        result = evaluator(
            final_answer=final_answer,
            traces=agent_traces,
            model_calls=model_traces,
            tool_usage=tool_traces,
            environment_state=environment_traces,
            user_interactions=user_traces
        )
        results.append(result)
    return results

execution_loop

execution_loop(
    agents: Sequence[AgentAdapter],
    task: Task,
    environment: Environment,
    user: Optional[User],
) -> Any

Execute agents with optional user interaction loop.

This method orchestrates the agent-user interaction pattern. When a user is present, the user initiates the conversation using user.get_initial_query(). If no user is present, task.query is used as the initial query.

Interaction Flow

By default, agents execute once (max_invocations=1). For multi-turn interaction, set self.max_invocations > 1 in your benchmark's __init__. The loop continues until max_invocations is reached or user.is_done() returns True (e.g., max turns reached or stop token detected).

Note

Override this method in your benchmark subclass to implement custom interaction patterns (e.g., agent-initiated conversations, different termination conditions, or specialized query routing).

PARAMETER DESCRIPTION
agents

Agents to execute (typically the orchestrator).

TYPE: Sequence[AgentAdapter]

task

The task being solved.

TYPE: Task

environment

The environment providing tools and state.

TYPE: Environment

user

Optional user simulator. If provided, the user initiates and drives the conversation. If None, a single agent execution with task.query.

TYPE: Optional[User]

RETURNS DESCRIPTION
Any

Final answer from the last agent execution.

Example

For interactive benchmarks, enable multi-turn interaction::

def __init__(self, ...):
    super().__init__(...)
    self.max_invocations = 5  # Up to 5 agent-user exchanges

get_failed_tasks

get_failed_tasks(
    status_filter: Optional[
        Union[
            TaskExecutionStatus, List[TaskExecutionStatus]
        ]
    ] = None,
    reports: Optional[List[Dict[str, Any]]] = None,
) -> SequentialTaskQueue

Get tasks that failed during benchmark execution.

This method retrieves failed tasks based on their execution status, useful for debugging, retry logic, or failure analysis.

PARAMETER DESCRIPTION
status_filter

Filter by specific failure status(es). If None, returns all failed tasks (any status except SUCCESS). Can be a single TaskExecutionStatus or a list of them. Examples: - TaskExecutionStatus.TASK_EXECUTION_FAILED: Only tasks that failed during execution - TaskExecutionStatus.EVALUATION_FAILED: Only tasks where evaluation failed - [TaskExecutionStatus.TASK_EXECUTION_FAILED, TaskExecutionStatus.SETUP_FAILED]: Tasks that failed during execution or setup

TYPE: Optional[Union[TaskExecutionStatus, List[TaskExecutionStatus]]] DEFAULT: None

reports

Optional list of reports to analyze. If None, uses the reports from the last run() call. This allows analyzing externally stored or modified reports.

TYPE: Optional[List[Dict[str, Any]]] DEFAULT: None

RETURNS DESCRIPTION
SequentialTaskQueue

SequentialTaskQueue containing the failed tasks. Empty if no failures match the filter.

RAISES DESCRIPTION
RuntimeError

If reports is None and run() has not been executed yet.

How to use
# Run benchmark
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)

# Get all failed tasks (from internal state)
failed = benchmark.get_failed_tasks()
print(f"Failed: {len(failed)}/{len(benchmark.tasks)} tasks")

# Or work with returned reports (safe from internal state changes)
failed = benchmark.get_failed_tasks(reports=reports)

# Get only tasks that failed during execution (not evaluation)
execution_failures = benchmark.get_failed_tasks(
    TaskExecutionStatus.TASK_EXECUTION_FAILED,
    reports=reports
)

# Get setup and execution failures
critical_failures = benchmark.get_failed_tasks(
    status_filter=[
        TaskExecutionStatus.SETUP_FAILED,
        TaskExecutionStatus.TASK_EXECUTION_FAILED
    ],
    reports=reports
)

# Retry failed tasks elegantly - this is the key use case!
if len(failed) > 0:
    retry_reports = benchmark.run(tasks=failed)

# Or more concisely
reports = benchmark.run(tasks=tasks)
retry_reports = benchmark.run(tasks=benchmark.get_failed_tasks())

get_model_adapter abstractmethod

get_model_adapter(
    model_id: str, **kwargs: Any
) -> ModelAdapter

Provide a ModelAdapter for benchmark components that require LLM access.

Many benchmark components beyond the agents themselves require access to language models. Common examples include:

  • Tool simulators: Simulating tool responses when real APIs aren't available
  • User simulators: Generating realistic user responses in multi-turn dialogues
  • Judges/Evaluators: Using LLMs to assess agent performance against criteria
  • Reward models: Computing scores for reinforcement learning

This method centralizes model provisioning, giving you control over which models are used throughout the benchmark. Implement this to return a configured ModelAdapter for the requested model.

PARAMETER DESCRIPTION
model_id

The model identifier to use (e.g., "gemini-2.5-flash", "openrouter/google/gemini-2.5-flash", "gpt-4o"). This is passed by the benchmark when setting up components that need model access.

TYPE: str

**kwargs

Additional arguments for adapter creation or registration. Common kwargs: - register_category: Category for trace registration (e.g., "models") - register_name: Name for trace registration (e.g., "evaluator_user_gsr")

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
ModelAdapter

A ModelAdapter instance configured for the specified model. For proper tracing,

ModelAdapter

return a fresh adapter for each call rather than reusing instances. You can

ModelAdapter

still share the underlying API client for efficiency.

How to use

For proper tracing, register the adapter after creation using the kwargs:

def get_model_adapter(self, model_id: str, **kwargs: Any) -> ModelAdapter:
    adapter = GoogleGenAIModelAdapter(self.client, model_id=model_id)

    # Register for tracing if registration info provided
    category = kwargs.get("register_category", "models")
    name = kwargs.get("register_name", model_id)
    self.register(category, name, adapter)

    return adapter

The benchmark calls this method when setting up tools, user simulators, and evaluators. Each call creates a fresh adapter with its own trace log.

register

register(
    category: str,
    name: str,
    component: RegisterableComponent,
) -> RegisterableComponent

Register a component for comprehensive trace and configuration collection.

All core MASEval components (AgentAdapter, ModelAdapter, Environment, User, LLMSimulator, BenchmarkCallback) inherit from TraceableMixin and/or ConfigurableMixin, and are automatically registered for both trace and configuration collection before evaluation.

Note: Most components are automatically registered when returned from setup methods (setup_environment, setup_user, setup_agents). You only need to manually register additional components like models, simulators, or tools that aren't automatically captured.

PARAMETER DESCRIPTION
category

Component category (e.g., "agents", "models", "tools", "simulators", "callbacks", "user", "environment", "seeding"). Use plural form to match the structure in collect_all_traces() and collect_all_configs().

TYPE: str

name

Unique identifier for this component within its category

TYPE: str

component

Any object inheriting from TraceableMixin and/or ConfigurableMixin

TYPE: RegisterableComponent

RETURNS DESCRIPTION
RegisterableComponent

The component (for chaining convenience)

RAISES DESCRIPTION
ValueError

If the component is already registered under a different name

How to use

Most components are auto-registered. Manual registration is only needed for additional components:

def setup_agents(self, agent_data, environment, task, user):
    # Create model (needs manual registration)
    model = MyModelAdapter(...)
    self.register("models", "main_model", model)

    # Create agent (auto-registered when returned)
    agent = MyAgent(model=model)
    agent_adapter = AgentAdapter(agent, "agent1")

    # Environment and user are also auto-registered
    return [agent_adapter], {"agent1": agent_adapter}

Traces and configs are automatically collected before evaluation via collect_all_traces() and collect_all_configs() which are called internally by the run() method.

run

run(
    tasks: Union[
        Task, BaseTaskQueue, Iterable[Union[Task, dict]]
    ],
    agent_data: Dict[str, Any] | Iterable[Dict[str, Any]],
) -> List[Dict[str, Any]]

Initialize and execute the complete benchmark loop across all tasks.

PARAMETER DESCRIPTION
tasks

Task source for execution. Can be: - A single Task object - A BaseTaskQueue (SequentialTaskQueue, PriorityTaskQueue, or custom AdaptiveTaskQueue) - An iterable of Task objects or dicts that will be converted to Tasks

When a BaseTaskQueue is provided, it controls the task ordering. AdaptiveTaskQueue subclasses are automatically registered as callbacks to receive task completion notifications.

TYPE: Union[Task, BaseTaskQueue, Iterable[Union[Task, dict]]]

agent_data

Configuration for agents. Either a single dict applied to all tasks, or an iterable of dicts with one configuration per task. Agent data typically includes model parameters, agent architecture details, and tool specifications.

TYPE: Dict[str, Any] | Iterable[Dict[str, Any]]

RETURNS DESCRIPTION
List[Dict[str, Any]]

List of report dictionaries, one per task repetition. Each report contains:

List[Dict[str, Any]]
  • task_id: Task identifier (UUID)
List[Dict[str, Any]]
  • repeat_idx: Repetition index (0 to n_task_repeats-1)
List[Dict[str, Any]]
  • status: Execution status (one of TaskExecutionStatus enum values)
List[Dict[str, Any]]
  • traces: Execution traces from all registered components
List[Dict[str, Any]]
  • config: Configuration from all registered components and benchmark level
List[Dict[str, Any]]
  • eval: Evaluation results (None if task or evaluation failed)
List[Dict[str, Any]]
  • error: Error details dict (only present if status is not SUCCESS), containing:
  • error_type: Exception class name
  • error_message: Exception message
  • traceback: Full traceback string
RAISES DESCRIPTION
ValueError

If agent_data length doesn't match number of tasks (when agent_data is an iterable).

How to use

This is the framework's main orchestration method that runs your entire benchmark. It iterates through all tasks, handles repetitions, and manages the three-stage lifecycle for each execution. You don't implement this method—instead, you call it to start the benchmark after implementing the setup and execution methods.

By default, the benchmark will continue executing remaining tasks even if some fail. You can change this behavior by setting fail_on_task_error=True, fail_on_evaluation_error=True, or fail_on_setup_error=True when instantiating the benchmark. Each task execution returns a status indicating success or the specific failure type (see TaskExecutionStatus).

For each task execution, the framework:

  1. Calls your setup methods to initialize components
  2. Calls your run_agents() method to execute the task
  3. Collects message histories and calls evaluators
  4. Stores results and triggers callbacks

Pseudocode structure:

for task in tasks:
    for repeat in range(n_task_repeats):
        # Setup stage
        environment = setup_environment(agent_data, task)
        user = setup_user(agent_data, environment, task)
        agents_to_run, agents_dict = setup_agents(agent_data, environment, task, user)
        evaluators = setup_evaluators(environment, task, agents_to_run, user)

        # Run stage (execution_loop handles multi-turn if user exists)
        agents_output = execution_loop(agents_to_run, task, environment, user)

        # Evaluate stage
        traces = collect_message_histories(agents_dict)
        eval_results = evaluate(evaluators, traces, agents_dict)

        # Store results
        store_result(task_id, traces, eval_results)

Callback hooks are triggered at these points:

  • on_run_start: Before processing any tasks
  • on_task_start: Before processing a task (once per task, not per repeat)
  • on_task_repeat_start: Before each repetition of a task
  • on_task_repeat_end: After each repetition completes
  • on_task_end: After all repetitions of a task complete
  • on_run_end: After all tasks complete
# Typical usage
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)

# Analyze results
for report in reports:
    print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}: {report['eval']}")
    print(f"Config: {report['config']}")
    print(f"Traces: {report['traces']}")

# Parallel execution with 4 workers
benchmark = MyBenchmark(num_workers=4)
reports = benchmark.run(tasks=tasks, agent_data=config)

# Single agent config for all tasks
reports = benchmark.run(tasks=tasks, agent_data={"model": "gpt-4"})

# Task-specific agent configs (must match task count)
reports = benchmark.run(
    tasks=tasks,
    agent_data=[
        {"model": "gpt-4", "difficulty": "easy"},
        {"model": "gpt-4", "difficulty": "hard"},
    ]
)

# Priority-based execution
from maseval.core.task import PriorityTaskQueue
for task in tasks:
    task.protocol.priority = compute_priority(task)
queue = PriorityTaskQueue(tasks)
reports = benchmark.run(tasks=queue, agent_data=config)

# Adaptive queue (auto-registered as callback)
queue = MyAdaptiveTaskQueue(tasks)
reports = benchmark.run(tasks=queue)  # queue receives on_task_complete callbacks

run_agents abstractmethod

run_agents(
    agents: Sequence[AgentAdapter],
    task: Task,
    environment: Environment,
    query: str,
) -> Any

Execute the agent system to solve a single task instance.

This method is called once per task repetition by the framework's run() loop.

PARAMETER DESCRIPTION
agents

Sequence of agents to execute (typically just the orchestrator or main agent).

TYPE: Sequence[AgentAdapter]

task

The Task object with the query and any metadata needed for execution.

TYPE: Task

environment

The environment instance providing tools and state.

TYPE: Environment

query

The query string to pass to agents. For single-turn benchmarks this is typically task.query. For multi-turn with users, this may be an initial prompt or simulated user response.

TYPE: str

RETURNS DESCRIPTION
Any

The final answer or result from the agent system's execution. This could be:

Any
  • A string containing the final answer
Any
  • A dict with structured output
Any
  • A list of answers from multiple agents
Any
  • Any other representation of the task solution
Note

Message traces are captured automatically through the tracing system and

TYPE: Any

Any

passed to evaluate() via the traces parameter. Do NOT return message histories here.

How to use

This is where you implement your multi-agent system logic for solving one task, one time. You define how your agents interact, communicate, and collaborate to complete the task.

The agents parameter contains only the primary agents to run (from agents_to_run in setup_agents). Worker agents called indirectly through an orchestrator do not appear here.

Note: This method handles a single task execution. The framework's run() method manages task iteration, repetitions, and the complete benchmark lifecycle.

def run_agents(self, agents, task, environment, query):
    # Simple single-agent execution - returns final answer string
    orchestrator = agents[0]
    final_answer = orchestrator.run(query)
    return final_answer

# Or for multiple agents returning a list of answers:
def run_agents(self, agents, task, environment, query):
    answers = []
    for agent in agents:
        answer = agent.run(query)
        answers.append(answer)
    return answers

setup_agents abstractmethod

setup_agents(
    agent_data: Dict[str, Any],
    environment: Environment,
    task: Task,
    user: Optional[User],
    seed_generator: SeedGenerator,
) -> Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]

Instantiate and configure the agent system for a task.

Note: All agents in the returned agents_dict are automatically registered for tracing. You don't need to manually call register() for them. However, you should manually register models, simulators, or other components used by agents.

PARAMETER DESCRIPTION
agent_data

Configuration dict containing agent specifications, model parameters, and tool assignments for this task.

TYPE: Dict[str, Any]

environment

The initialized environment providing tools to the agents.

TYPE: Environment

task

The Task object with query and metadata.

TYPE: Task

user

Optional user simulator for agent-user interactions.

TYPE: Optional[User]

seed_generator

Seed generator for deriving deterministic seeds for agents and their models. Use per_repetition=True for agents that should vary across repetitions, or per_repetition=False for baseline agents that should remain constant. derive_seed() returns None if seeding is disabled (global_seed=None).

TYPE: SeedGenerator

RETURNS DESCRIPTION
Sequence[AgentAdapter]

A tuple of (agents_to_run, agents_dict) where:

Dict[str, AgentAdapter]
  • agents_to_run: Sequence of agents to invoke in run_agents() (typically 1 orchestrator)
Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]
  • agents_dict: Dictionary mapping agent names/IDs to all agent instances for monitoring
How to use

This method constructs your agent architecture—single agent, multiple collaborative agents, or an orchestrator managing workers. Each agent is wrapped in AgentAdapter for uniform message history tracking.

The dual return structure serves different purposes:

  • agents_to_run: Only agents directly invoked in run_agents() (typically the orchestrator)
  • agents_dict: All agents in the system for message history collection from workers called indirectly through the orchestrator
def setup_agents(self, agent_data, environment, task, user, seed_generator):
    # Use child() for logical paths like "agents/experimental"
    # derive_seed() returns None if seeding is disabled
    agent_gen = seed_generator.child("agents")

    # Vary experimental agent per rep, keep baseline constant
    experimental_seed = agent_gen.derive_seed("experimental", per_repetition=True)
    baseline_seed = agent_gen.derive_seed("baseline", per_repetition=False)

    # For worker agents, nest further: "agents/workers/analyst"
    worker_gen = agent_gen.child("workers")
    analyst_seed = worker_gen.derive_seed("analyst")

    # Create agents with seeds (model adapters accept Optional[int])
    model = self.get_model_adapter(model_id, seed=experimental_seed)
    # ... create agents ...

    return [orchestrator_adapter], all_agents

setup_environment abstractmethod

setup_environment(
    agent_data: Dict[str, Any],
    task: Task,
    seed_generator: SeedGenerator,
) -> Environment

Create and initialize the environment for a task.

This method is called once per task execution to create a fresh environment with the task's initial conditions.

Note: The returned environment is automatically registered for tracing. You don't need to manually call register() for it.

PARAMETER DESCRIPTION
agent_data

Configuration dict containing agent specifications, model parameters, and other settings that may influence environment setup (e.g., framework type).

TYPE: Dict[str, Any]

task

The Task object containing environment_data, query, and metadata needed to construct the environment.

TYPE: Task

seed_generator

Seed generator for deriving deterministic seeds for environment components (tools, simulators, etc.). Use derive_seed() to get seeds for individual components. Returns None if seeding is disabled (global_seed=None).

TYPE: SeedGenerator

RETURNS DESCRIPTION
Environment

An Environment instance with initialized state and tools for this specific task.

How to use

The environment encapsulates task state and provides tools/APIs that agents can use. Your implementation should:

  • Extract environment state from task.environment_data
  • Initialize any databases, simulators, or API clients
  • Create and configure tools that agents can invoke
  • Set up domain-specific state (inventory, user profiles, etc.)
  • Optionally use agent_data for framework-specific tool initialization
  • Use seed_generator to derive seeds for tool simulators
def setup_environment(self, agent_data, task, seed_generator):
    env = TravelEnvironment(task.environment_data)

    # Use nested child() for logical paths like "environment/tools/weather"
    # derive_seed() returns None if seeding is disabled
    env_gen = seed_generator.child("environment")
    tools_gen = env_gen.child("tools")
    for tool in env.tools:
        tool_seed = tools_gen.derive_seed(tool.name)  # Optional[int]
        tool_model = self.get_model_adapter(model_id, seed=tool_seed)
        tool.set_simulator(tool_model)

    return env

The environment is automatically registered for tracing when returned.

setup_evaluators abstractmethod

setup_evaluators(
    environment: Environment,
    task: Task,
    agents: Sequence[AgentAdapter],
    user: Optional[User],
    seed_generator: SeedGenerator,
) -> Sequence[Evaluator]

Create evaluators to assess agent performance on a task.

PARAMETER DESCRIPTION
environment

The environment instance with final state after agent execution.

TYPE: Environment

task

The Task object with evaluation criteria in evaluation_data.

TYPE: Task

agents

The agents that will execute the task (useful for context in evaluation).

TYPE: Sequence[AgentAdapter]

user

Optional user simulator, if evaluation needs user interaction data.

TYPE: Optional[User]

seed_generator

Seed generator for deriving deterministic seeds for evaluators that use LLMs (e.g., LLM judges). derive_seed() returns None if seeding is disabled (global_seed=None).

TYPE: SeedGenerator

RETURNS DESCRIPTION
Sequence[Evaluator]

A sequence of Evaluator instances that will be called with execution traces.

How to use

Evaluators judge whether agents successfully completed the task or satisfied specific criteria. Multiple evaluators can measure different performance aspects (accuracy, efficiency, conversation quality, etc.).

If an evaluator encounters an unexpected condition, prefer raising the exception. The benchmark runner will enforce the configured policy through fail_on_evaluation_error (fail-fast when True, mark task as evaluation_failed and continue when False).

def setup_evaluators(self, environment, task, agents, user, seed_generator):
    # Use child() to create logical namespace - results in "evaluators/judge"
    # derive_seed() returns None if seeding is disabled
    eval_gen = seed_generator.child("evaluators")
    judge_seed = eval_gen.derive_seed("judge")  # Optional[int]

    return [
        SuccessEvaluator(task.evaluation_data["gold_answer"]),
        LLMJudgeEvaluator(model=self.get_model_adapter(model_id, seed=judge_seed))
    ]

setup_user

setup_user(
    agent_data: Dict[str, Any],
    environment: Environment,
    task: Task,
    seed_generator: SeedGenerator,
) -> Optional[User]

Create an optional user simulator for interactive tasks.

This method is optional. Return None if your benchmark does not require user simulation.

Note: The returned user is automatically registered for tracing. You don't need to manually call register() for it.

PARAMETER DESCRIPTION
agent_data

Configuration dict containing agent specifications and settings that may influence user simulator setup (e.g., framework type for creating compatible tools).

TYPE: Dict[str, Any]

environment

The environment instance created for this task.

TYPE: Environment

task

The Task object with user profile data or scenario information.

TYPE: Task

seed_generator

Seed generator for deriving deterministic seeds for the user simulator. derive_seed() returns None if seeding is disabled (global_seed=None).

TYPE: SeedGenerator

RETURNS DESCRIPTION
Optional[User]

A User instance that can respond to agent queries, or None if not needed.

How to use

User simulators enable agent-user interactions by responding to queries with preferences, clarifications, or feedback. Useful for benchmarks testing conversational agents or systems requiring user input during execution.

def setup_user(self, agent_data, environment, task, seed_generator):
    # Use child() to create logical namespace - results in "simulators/user"
    # derive_seed() returns None if seeding is disabled
    sim_gen = seed_generator.child("simulators")
    user_seed = sim_gen.derive_seed("user")  # Optional[int]

    user_model = self.get_model_adapter(model_id, seed=user_seed)
    return LLMUser(model=user_model, ...)

# Or skip user simulation entirely
def setup_user(self, agent_data, environment, task, seed_generator):
    return None

The user is automatically registered for tracing when returned.

TaskExecutionStatus

Bases: Enum

Status of task execution and evaluation.

This enum tracks the execution state of a task through the benchmark lifecycle, enabling graceful failure handling and comprehensive result reporting.

The status distinguishes between errors caused by the agent (agent's fault) and errors caused by the evaluation infrastructure (environment, user simulator). This enables fair scoring by excluding infrastructure failures.

ATTRIBUTE DESCRIPTION
SUCCESS

Task executed and evaluated successfully.

AGENT_ERROR

Agent violated contract at a boundary (agent's fault, counts against score).

ENVIRONMENT_ERROR

Environment/tool infrastructure failed (not agent's fault, exclude from scoring).

USER_ERROR

User simulator failed (not agent's fault, exclude from scoring).

TASK_TIMEOUT

Task execution exceeded configured timeout (resource constraint).

UNKNOWN_EXECUTION_ERROR

Unclassified execution error (e.g., agent framework internal failure).

EVALUATION_FAILED

Task executed but evaluation raised an exception.

SETUP_FAILED

Setup phase (environment, agents, evaluators) raised an exception.

Scoring Guidance
  • Include in agent score: SUCCESS, AGENT_ERROR
  • Exclude from agent score: ENVIRONMENT_ERROR, USER_ERROR, TASK_TIMEOUT, UNKNOWN_EXECUTION_ERROR
  • Handle separately: EVALUATION_FAILED, SETUP_FAILED