Skip to content

GAIA2: Dynamic Multi-Step Scenario Benchmark (Beta)

Beta

This benchmark has been implemented carefully, but it is highly complex and we have not yet validated the results against the original implementation. Use with caution when comparing with existing results or the original paper's numbers. Contributions and compute donations welcome!

The GAIA2 Benchmark evaluates LLM-based agents on dynamic, multi-step scenarios using Meta's ARE (Agent Research Environments) platform. It tests agents across multiple capability dimensions in a simulated mobile environment.

Overview

GAIA2 is designed to evaluate agents in realistic, time-sensitive scenarios. The benchmark features:

  • ARE simulation environment with real-time dynamics and event scheduling
  • Tool-based time control via wait_for_notification() for temporal reasoning
  • 5 capability dimensions: execution, search, adaptability, time, ambiguity
  • Deterministic evaluation via GraphPerEventJudge comparing completed vs expected events
  • 12 app tools: Calendar, Email, Messaging, Contacts, Shopping, Cab, City, FileSystem, Browser, ChatsApp, SystemApp, Timer

Reference Paper: "GAIA-2: A Controllable Multi-Turn Conversational Benchmark for Agents"

Check out the BENCHMARKS.md file for more information including licenses.

Installation

GAIA2 requires additional dependencies:

pip install maseval[gaia2]

Or with uv:

uv add maseval --extra gaia2

Quick Start

from maseval.benchmark.gaia2 import (
    Gaia2Benchmark, Gaia2Environment, Gaia2Evaluator,
    load_tasks, configure_model_ids, compute_gaia2_metrics,
)

# Load tasks (downloads from HuggingFace automatically)
tasks = load_tasks(capability="execution", limit=5)

# Optionally configure LLM-based judge
configure_model_ids(tasks, evaluator_model_id="gpt-4o")

# Create your framework-specific benchmark subclass
class MyGaia2Benchmark(Gaia2Benchmark):
    def setup_agents(self, agent_data, environment, task, user, seed_generator):
        tools = environment.create_tools()
        # Create your agent with these tools
        ...

    def get_model_adapter(self, model_id, **kwargs):
        adapter = MyModelAdapter(model_id)
        if "register_name" in kwargs:
            self.register("models", kwargs["register_name"], adapter)
        return adapter

# Run benchmark
benchmark = MyGaia2Benchmark()
results = benchmark.run(tasks)

# Compute metrics
metrics = compute_gaia2_metrics(results)
print(f"GSR: {metrics['gsr']:.2%}")
print(f"By capability: {metrics['by_capability']}")

For baseline comparisons, use DefaultAgentGaia2Benchmark which provides a ReAct-style reference agent:

from maseval.benchmark.gaia2 import DefaultAgentGaia2Benchmark

# Note: You must subclass to provide get_model_adapter()
class MyDefaultGaia2Benchmark(DefaultAgentGaia2Benchmark):
    def get_model_adapter(self, model_id, **kwargs):
        adapter = MyModelAdapter(model_id)
        if "register_name" in kwargs:
            self.register("models", kwargs["register_name"], adapter)
        return adapter

benchmark = MyDefaultGaia2Benchmark(
    agent_data={"model_id": "gpt-4o"},
)
results = benchmark.run(tasks)

Capabilities

GAIA2 tasks are organized by capability dimension:

Capability Description
execution Basic task execution
search Information retrieval tasks
adaptability Adapting to changing requirements
time Temporal reasoning tasks
ambiguity Handling ambiguous instructions

Load specific capabilities:

# Load only time-related tasks
tasks = load_tasks(capability="time", limit=10)

# Load all capabilities
tasks = load_tasks(limit=50)

Multi-Turn Notification Loop

GAIA2 uses an event-driven multi-turn architecture. Scenarios have scheduled events (e.g., "calendar events added at t=240s", "friend replies at t=300s") that the agent must wait for and react to.

The benchmark invokes the agent once. The agent handles multi-turn internally via the notification loop:

  1. Agent calls SystemApp__wait_for_notification(timeout=N) as a normal tool.
  2. The ARE environment processes scheduled events, advances simulation time, and queues resulting notifications — all synchronously during the tool call.
  3. The tool returns. The agent's loop continues (it does not terminate).
  4. Before the next LLM call, the agent polls environment.poll_notifications() to retrieve messages that arrived during the wait.
  5. The agent injects those messages into its context and continues reasoning.
  6. Eventually the agent calls AgentUserInterface__send_message_to_user — the only termination signal.

What custom agents must implement

The ARE tools handle all environment-side mechanics automatically (event processing, time advancement, notification queuing). No callbacks or hooks required. Custom agents must handle two things:

1. Do not terminate on wait_for_notification. Treat it as a regular tool call. Only terminate on AgentUserInterface__send_message_to_user.

2. Poll notifications between steps. After wait_for_notification returns, new messages are in the queue. Call environment.poll_notifications() to drain them:

# Between agent steps (e.g., before each LLM call):
user_msgs, env_notifs, has_stop = environment.poll_notifications()

# Inject into agent context (format matches ARE's convention):
if user_msgs:
    content = "\n".join(user_msgs)
    messages.append({"role": "user", "content": f"User messages updates:\n***\n{content}\n***\n"})
if env_notifs:
    content = "\n".join(env_notifs)
    messages.append({"role": "user", "content": f"Environment notifications updates:\n***\n{content}\n***\n"})
if has_stop:
    # Environment signalled simulation end — stop the agent loop
    break

See DefaultGaia2Agent source for the canonical single-loop implementation.

API Reference

Gaia2Benchmark

Bases: Benchmark

MASEval wrapper for Gaia2/ARE benchmark.

Hybrid approach: Uses ARE for simulation and evaluation while providing MASEval orchestration, tracing, and agent flexibility.

The ARE simulation runs internally; agents interact purely via tool calls. Time control happens through SystemApp__wait_for_notification.

Subclasses must implement:

  • setup_agents() — Create agents for the task
  • get_model_adapter() — Provide model adapters
Multi-Turn Architecture

GAIA2 uses ARE's two-level loop architecture:

  • Outer loop (turns): drains the notification queue, formats user messages as [TASK], re-queues environment notifications, then runs the inner step loop.
  • Inner loop (steps): ReAct cycle. Terminates on send_message_to_user (TERMINATED — turn complete) or wait_for_notification (PAUSED — outer loop continues).

ARE are_simulation_main.py:agent_loop()

What custom agents must do:

  • Terminate inner loop on both send_message_to_user and wait_for_notification. The former completes a turn; the latter pauses the agent while ARE processes events.
  • Between turns (outer loop): drain notifications via environment.get_turn_notifications() which re-queues environment notifications and returns user messages for [TASK] formatting.
  • Within turns (inner loop pre-step): poll notifications via environment.poll_notifications() to pick up re-queued environment notifications and new messages.

See the default agent implementation for the reference two-level loop approach.

seed_generator property

seed_generator: SeedGenerator

The seed generator for this benchmark.

The seed generator is configured at benchmark initialization via the seed or seed_generator parameters. When seed=None (the default), the generator's derive_seed() method returns None, effectively disabling seeding while maintaining a uniform interface.

RETURNS DESCRIPTION
SeedGenerator

The root SeedGenerator instance.

usage property

usage: Usage

Running usage total across all task repetitions.

Queryable at any time, including while the benchmark is still running. Returns the grand total of all usage collected so far.

usage_by_component property

usage_by_component: Dict[str, Usage]

Per-component running usage totals across all repetitions.

Keys are registry keys (e.g., "models:main_model").

__init__

__init__(
    callbacks: Optional[List[BenchmarkCallback]] = None,
    n_task_repeats: int = 1,
    max_invocations: int = MAX_INVOCATIONS,
    num_workers: int = 1,
    fail_on_setup_error: bool = False,
    fail_on_task_error: bool = False,
    fail_on_evaluation_error: bool = False,
    progress_bar: bool | str = True,
    seed: Optional[int] = None,
    seed_generator: Optional[SeedGenerator] = None,
)

Initialize benchmark with Gaia2-specific defaults.

PARAMETER DESCRIPTION
callbacks

Optional list of callback handlers for monitoring execution.

TYPE: Optional[List[BenchmarkCallback]] DEFAULT: None

n_task_repeats

Number of times to repeat each task. Default 1.

TYPE: int DEFAULT: 1

max_invocations

Maximum agent invocations (default: 1 for single-turn).

TYPE: int DEFAULT: MAX_INVOCATIONS

num_workers

Number of parallel task executions. Default 1 (sequential).

TYPE: int DEFAULT: 1

fail_on_setup_error

If True, raise on setup errors. Default False.

TYPE: bool DEFAULT: False

fail_on_task_error

If True, raise on task execution errors. Default False.

TYPE: bool DEFAULT: False

fail_on_evaluation_error

If True, raise on evaluation errors. Default False.

TYPE: bool DEFAULT: False

progress_bar

Progress display. True (default) for tqdm, "rich" for Rich, or False to disable.

TYPE: bool | str DEFAULT: True

seed

Global seed for reproducible benchmark runs.

TYPE: Optional[int] DEFAULT: None

seed_generator

Custom seed generator (takes precedence over seed).

TYPE: Optional[SeedGenerator] DEFAULT: None

add_callback

add_callback(callback: BenchmarkCallback) -> None

Register a callback handler to monitor benchmark execution.

PARAMETER DESCRIPTION
callback

A BenchmarkCallback instance that will receive execution events.

TYPE: BenchmarkCallback

How to use

Callbacks receive notifications at key lifecycle points for tracing, progress tracking, or custom metrics collection. See BenchmarkCallback for available hooks and their signatures.

from maseval.core.callbacks import MessageTracingCallback

benchmark = MyBenchmark(tasks=tasks, agent_data=config)
benchmark.add_callback(MessageTracingCallback(output_dir="logs"))
results = benchmark.run()

clear_registry

clear_registry() -> None

Clear the component registry after a task repetition completes.

This method is called automatically by run() after each task repetition to ensure components are not carried over between repetitions. The reports list persists across all repetitions for aggregated analysis.

collect_all_configs

collect_all_configs() -> Dict[str, Any]

Collect configuration from all registered components for the current task repetition.

This method is called automatically by run() after each task repetition completes and before evaluation begins. It gathers comprehensive configuration from all registered components (agents, models, tools, simulators, callbacks, etc.) for that specific repetition. After collection, the registry is cleared for the next repetition.

The collected configs are stored in benchmark.reports list along with traces for persistent access across all task repetitions.

Output fields:

  • metadata - Collection timestamp and thread info
  • agents - Dict mapping agent names to their config (settings, parameters)
  • models - Dict mapping model names to their config (model IDs, parameters)
  • tools - Dict mapping tool names to their config (specifications, settings)
  • simulators - Dict mapping simulator names to their config (parameters, templates)
  • callbacks - Dict mapping callback names to their config (settings)
  • environment - Direct config from the environment (not nested), or None if not present
  • user - Direct config from the user simulator (not nested), or None if not present
  • other - Dict for any other registered components
  • benchmark - Benchmark-level configuration (git, system, packages)
RETURNS DESCRIPTION
Dict[str, Any]

Structured dictionary containing configuration from all registered components.

How to use

This method is called automatically by run() after each task repetition:

# Automatic collection (recommended)
results = benchmark.run()

# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
    print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
    # Agents is a dict: agent_name -> config
    print(f"Agent config: {report['config']['agents']['my_agent']}")
    # Environment and user are direct (not nested)
    print(f"Environment config: {report['config']['environment']}")
    print(f"User config: {report['config']['user']}")
    # Benchmark-level config
    print(f"Git commit: {report['config']['benchmark']['git']['commit_hash']}")

The collected configs are available in the results for reproducibility analysis.

collect_all_traces

collect_all_traces() -> Dict[str, Any]

Collect execution traces from all registered components for the current task repetition.

This method is called automatically by run() after each task repetition completes and before evaluation begins. It gathers comprehensive traces from all registered components (agents, models, tools, simulators, callbacks, etc.) for that specific repetition. After collection, the registry is cleared for the next repetition.

The collected traces are stored in benchmark.reports list along with configs for persistent access across all task repetitions.

Output fields:

  • metadata - Collection timestamp and thread info
  • agents - Dict mapping agent names to their traces (messages, execution data)
  • models - Dict mapping model names to their traces (API calls, timing, errors)
  • tools - Dict mapping tool names to their traces (invocations, parameters)
  • simulators - Dict mapping simulator names to their traces (attempts, outcomes)
  • callbacks - Dict mapping callback names to their traces (custom data)
  • environment - Direct traces from the environment (not nested), or None if not present
  • user - Direct traces from the user simulator (not nested), or None if not present
  • other - Dict for any other registered components
RETURNS DESCRIPTION
Dict[str, Any]

Structured dictionary containing execution traces from all registered components.

How to use

This method is called automatically by run() after each task repetition:

# Automatic collection (recommended)
results = benchmark.run()

# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
    print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
    # Agents is a dict: agent_name -> traces
    print(f"Agent messages: {report['traces']['agents']['my_agent']}")
    # Environment and user are direct (not nested)
    print(f"Environment state: {report['traces']['environment']}")
    print(f"User interactions: {report['traces']['user']}")

The collected traces are passed to the evaluator's evaluate() method and stored in benchmark.reports for later analysis.

collect_all_usage

collect_all_usage() -> Dict[str, Any]

Collect usage from all registered components for the current task repetition.

This method is called automatically by run() after each task repetition completes. It gathers usage from all registered UsageTrackableMixin components and also accumulates into persistent running totals accessible via usage and usage_by_component.

RETURNS DESCRIPTION
Dict[str, Any]

Structured dictionary containing usage from all registered components.

evaluate

evaluate(
    evaluators: Sequence[Evaluator],
    agents: Dict[str, AgentAdapter],
    final_answer: Any,
    traces: Dict[str, Any],
) -> List[Dict[str, Any]]

Evaluate using Gaia2 evaluators.

Uses each evaluator's filter_traces() method to extract relevant data, then calls the evaluator with the filtered traces.

Returns Gaia2 format
  • gsr: Goal Success Rate
  • partial_gsr: Partial success rate
  • passed: Boolean
  • rationale: Judge rationale (if available)
PARAMETER DESCRIPTION
evaluators

List of evaluators

TYPE: Sequence[Evaluator]

agents

Dict of agents

TYPE: Dict[str, AgentAdapter]

final_answer

Final answer from agents

TYPE: Any

traces

Execution traces

TYPE: Dict[str, Any]

RETURNS DESCRIPTION
List[Dict[str, Any]]

List of evaluation result dicts

execution_loop

execution_loop(
    agents: Sequence[AgentAdapter],
    task: Task,
    environment: Environment,
    user: Optional[User],
) -> Any

Execute agents with optional user interaction loop.

This method orchestrates the agent-user interaction pattern. When a user is present, the user initiates the conversation using user.get_initial_query(). If no user is present, task.query is used as the initial query.

Interaction Flow

By default, agents execute once (max_invocations=1). For multi-turn interaction, set self.max_invocations > 1 in your benchmark's __init__. The loop continues until max_invocations is reached or user.is_done() returns True (e.g., max turns reached or stop token detected).

Note

Override this method in your benchmark subclass to implement custom interaction patterns (e.g., agent-initiated conversations, different termination conditions, or specialized query routing).

PARAMETER DESCRIPTION
agents

Agents to execute (typically the orchestrator).

TYPE: Sequence[AgentAdapter]

task

The task being solved.

TYPE: Task

environment

The environment providing tools and state.

TYPE: Environment

user

Optional user simulator. If provided, the user initiates and drives the conversation. If None, a single agent execution with task.query.

TYPE: Optional[User]

RETURNS DESCRIPTION
Any

Final answer from the last agent execution.

Example

For interactive benchmarks, enable multi-turn interaction::

def __init__(self, ...):
    super().__init__(...)
    self.max_invocations = 5  # Up to 5 agent-user exchanges

get_failed_tasks

get_failed_tasks(
    status_filter: Optional[
        Union[
            TaskExecutionStatus, List[TaskExecutionStatus]
        ]
    ] = None,
    reports: Optional[List[Dict[str, Any]]] = None,
) -> SequentialTaskQueue

Get tasks that failed during benchmark execution.

This method retrieves failed tasks based on their execution status, useful for debugging, retry logic, or failure analysis.

PARAMETER DESCRIPTION
status_filter

Filter by specific failure status(es). If None, returns all failed tasks (any status except SUCCESS). Can be a single TaskExecutionStatus or a list of them. Examples: - TaskExecutionStatus.TASK_EXECUTION_FAILED: Only tasks that failed during execution - TaskExecutionStatus.EVALUATION_FAILED: Only tasks where evaluation failed - [TaskExecutionStatus.TASK_EXECUTION_FAILED, TaskExecutionStatus.SETUP_FAILED]: Tasks that failed during execution or setup

TYPE: Optional[Union[TaskExecutionStatus, List[TaskExecutionStatus]]] DEFAULT: None

reports

Optional list of reports to analyze. If None, uses the reports from the last run() call. This allows analyzing externally stored or modified reports.

TYPE: Optional[List[Dict[str, Any]]] DEFAULT: None

RETURNS DESCRIPTION
SequentialTaskQueue

SequentialTaskQueue containing the failed tasks. Empty if no failures match the filter.

RAISES DESCRIPTION
RuntimeError

If reports is None and run() has not been executed yet.

How to use
# Run benchmark
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)

# Get all failed tasks (from internal state)
failed = benchmark.get_failed_tasks()
print(f"Failed: {len(failed)}/{len(benchmark.tasks)} tasks")

# Or work with returned reports (safe from internal state changes)
failed = benchmark.get_failed_tasks(reports=reports)

# Get only tasks that failed during execution (not evaluation)
execution_failures = benchmark.get_failed_tasks(
    TaskExecutionStatus.TASK_EXECUTION_FAILED,
    reports=reports
)

# Get setup and execution failures
critical_failures = benchmark.get_failed_tasks(
    status_filter=[
        TaskExecutionStatus.SETUP_FAILED,
        TaskExecutionStatus.TASK_EXECUTION_FAILED
    ],
    reports=reports
)

# Retry failed tasks elegantly - this is the key use case!
if len(failed) > 0:
    retry_reports = benchmark.run(tasks=failed)

# Or more concisely
reports = benchmark.run(tasks=tasks)
retry_reports = benchmark.run(tasks=benchmark.get_failed_tasks())

get_model_adapter abstractmethod

get_model_adapter(
    model_id: str, **kwargs: Any
) -> ModelAdapter

Provide a ModelAdapter for benchmark components that require LLM access.

Many benchmark components beyond the agents themselves require access to language models. Common examples include:

  • Tool simulators: Simulating tool responses when real APIs aren't available
  • User simulators: Generating realistic user responses in multi-turn dialogues
  • Judges/Evaluators: Using LLMs to assess agent performance against criteria
  • Reward models: Computing scores for reinforcement learning

This method centralizes model provisioning, giving you control over which models are used throughout the benchmark. Implement this to return a configured ModelAdapter for the requested model.

PARAMETER DESCRIPTION
model_id

The model identifier to use (e.g., "gemini-2.5-flash", "openrouter/google/gemini-2.5-flash", "gpt-4o"). This is passed by the benchmark when setting up components that need model access.

TYPE: str

**kwargs

Additional arguments for adapter creation or registration. Common kwargs: - register_category: Category for trace registration (e.g., "models") - register_name: Name for trace registration (e.g., "evaluator_user_gsr")

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
ModelAdapter

A ModelAdapter instance configured for the specified model. For proper tracing,

ModelAdapter

return a fresh adapter for each call rather than reusing instances. You can

ModelAdapter

still share the underlying API client for efficiency.

How to use

For proper tracing, register the adapter after creation using the kwargs:

def get_model_adapter(self, model_id: str, **kwargs: Any) -> ModelAdapter:
    adapter = GoogleGenAIModelAdapter(self.client, model_id=model_id)

    # Register for tracing if registration info provided
    category = kwargs.get("register_category", "models")
    name = kwargs.get("register_name", model_id)
    self.register(category, name, adapter)

    return adapter

The benchmark calls this method when setting up tools, user simulators, and evaluators. Each call creates a fresh adapter with its own trace log.

register

register(
    category: str,
    name: str,
    component: RegisterableComponent,
) -> RegisterableComponent

Register a component for comprehensive trace and configuration collection.

All core MASEval components (AgentAdapter, ModelAdapter, Environment, User, LLMSimulator, BenchmarkCallback) inherit from TraceableMixin and/or ConfigurableMixin, and are automatically registered for both trace and configuration collection before evaluation.

Note: Most components are automatically registered when returned from setup methods (setup_environment, setup_user, setup_agents). You only need to manually register additional components like models, simulators, or tools that aren't automatically captured.

PARAMETER DESCRIPTION
category

Component category (e.g., "agents", "models", "tools", "simulators", "callbacks", "user", "environment", "seeding"). Use plural form to match the structure in collect_all_traces() and collect_all_configs().

TYPE: str

name

Unique identifier for this component within its category

TYPE: str

component

Any object inheriting from TraceableMixin and/or ConfigurableMixin

TYPE: RegisterableComponent

RETURNS DESCRIPTION
RegisterableComponent

The component (for chaining convenience)

RAISES DESCRIPTION
ValueError

If the component is already registered under a different name

How to use

Most components are auto-registered. Manual registration is only needed for additional components:

def setup_agents(self, agent_data, environment, task, user):
    # Create model (needs manual registration)
    model = MyModelAdapter(...)
    self.register("models", "main_model", model)

    # Create agent (auto-registered when returned)
    agent = MyAgent(model=model)
    agent_adapter = AgentAdapter(agent, "agent1")

    # Environment and user are also auto-registered
    return [agent_adapter], {"agent1": agent_adapter}

Traces and configs are automatically collected before evaluation via collect_all_traces() and collect_all_configs() which are called internally by the run() method.

run

run(
    tasks: Union[
        Task, BaseTaskQueue, Iterable[Union[Task, dict]]
    ],
    agent_data: Dict[str, Any] | Iterable[Dict[str, Any]],
) -> List[Dict[str, Any]]

Initialize and execute the complete benchmark loop across all tasks.

PARAMETER DESCRIPTION
tasks

Task source for execution. Can be: - A single Task object - A BaseTaskQueue (SequentialTaskQueue, PriorityTaskQueue, or custom AdaptiveTaskQueue) - An iterable of Task objects or dicts that will be converted to Tasks

When a BaseTaskQueue is provided, it controls the task ordering. AdaptiveTaskQueue subclasses are automatically registered as callbacks to receive task completion notifications.

TYPE: Union[Task, BaseTaskQueue, Iterable[Union[Task, dict]]]

agent_data

Configuration for agents. Either a single dict applied to all tasks, or an iterable of dicts with one configuration per task. Agent data typically includes model parameters, agent architecture details, and tool specifications.

TYPE: Dict[str, Any] | Iterable[Dict[str, Any]]

RETURNS DESCRIPTION
List[Dict[str, Any]]

List of report dictionaries, one per task repetition. Each report contains:

List[Dict[str, Any]]
  • task_id: Task identifier (UUID)
List[Dict[str, Any]]
  • repeat_idx: Repetition index (0 to n_task_repeats-1)
List[Dict[str, Any]]
  • status: Execution status (one of TaskExecutionStatus enum values)
List[Dict[str, Any]]
  • traces: Execution traces from all registered components
List[Dict[str, Any]]
  • config: Configuration from all registered components and benchmark level
List[Dict[str, Any]]
  • eval: Evaluation results (None if task or evaluation failed)
List[Dict[str, Any]]
  • error: Error details dict (only present if status is not SUCCESS), containing:
  • error_type: Exception class name
  • error_message: Exception message
  • traceback: Full traceback string
RAISES DESCRIPTION
ValueError

If agent_data length doesn't match number of tasks (when agent_data is an iterable).

How to use

This is the framework's main orchestration method that runs your entire benchmark. It iterates through all tasks, handles repetitions, and manages the three-stage lifecycle for each execution. You don't implement this method—instead, you call it to start the benchmark after implementing the setup and execution methods.

By default, the benchmark will continue executing remaining tasks even if some fail. You can change this behavior by setting fail_on_task_error=True, fail_on_evaluation_error=True, or fail_on_setup_error=True when instantiating the benchmark. Each task execution returns a status indicating success or the specific failure type (see TaskExecutionStatus).

For each task execution, the framework:

  1. Calls your setup methods to initialize components
  2. Calls your run_agents() method to execute the task
  3. Collects message histories and calls evaluators
  4. Stores results and triggers callbacks

Pseudocode structure:

for task in tasks:
    for repeat in range(n_task_repeats):
        # Setup stage
        environment = setup_environment(agent_data, task)
        user = setup_user(agent_data, environment, task)
        agents_to_run, agents_dict = setup_agents(agent_data, environment, task, user)
        evaluators = setup_evaluators(environment, task, agents_to_run, user)

        # Run stage (execution_loop handles multi-turn if user exists)
        agents_output = execution_loop(agents_to_run, task, environment, user)

        # Evaluate stage
        traces = collect_message_histories(agents_dict)
        eval_results = evaluate(evaluators, traces, agents_dict)

        # Store results
        store_result(task_id, traces, eval_results)

Callback hooks are triggered at these points:

  • on_run_start: Before processing any tasks
  • on_task_start: Before processing a task (once per task, not per repeat)
  • on_task_repeat_start: Before each repetition of a task
  • on_task_repeat_end: After each repetition completes
  • on_task_end: After all repetitions of a task complete
  • on_run_end: After all tasks complete
# Typical usage
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)

# Analyze results
for report in reports:
    print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}: {report['eval']}")
    print(f"Config: {report['config']}")
    print(f"Traces: {report['traces']}")

# Parallel execution with 4 workers
benchmark = MyBenchmark(num_workers=4)
reports = benchmark.run(tasks=tasks, agent_data=config)

# Single agent config for all tasks
reports = benchmark.run(tasks=tasks, agent_data={"model": "gpt-4"})

# Task-specific agent configs (must match task count)
reports = benchmark.run(
    tasks=tasks,
    agent_data=[
        {"model": "gpt-4", "difficulty": "easy"},
        {"model": "gpt-4", "difficulty": "hard"},
    ]
)

# Priority-based execution
from maseval.core.task import PriorityTaskQueue
for task in tasks:
    task.protocol.priority = compute_priority(task)
queue = PriorityTaskQueue(tasks)
reports = benchmark.run(tasks=queue, agent_data=config)

# Adaptive queue (auto-registered as callback)
queue = MyAdaptiveTaskQueue(tasks)
reports = benchmark.run(tasks=queue)  # queue receives on_task_complete callbacks

run_agents

run_agents(
    agents: Sequence[AgentAdapter],
    task: Task,
    environment: Gaia2Environment,
    query: str = "",
) -> Any

Execute agents and ensure environment cleanup.

PARAMETER DESCRIPTION
agents

Agent instances to run

TYPE: Sequence[AgentAdapter]

task

Current task

TYPE: Task

environment

Gaia2Environment

TYPE: Gaia2Environment

query

Query/prompt for agents

TYPE: str DEFAULT: ''

RETURNS DESCRIPTION
Any

Final answer from agents

setup_agents abstractmethod

setup_agents(
    agent_data: Dict[str, Any],
    environment: Gaia2Environment,
    task: Task,
    user: Optional[User],
    seed_generator: SeedGenerator,
) -> Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]

Create agents for this task. Must be implemented by subclass.

PARAMETER DESCRIPTION
agent_data

Agent configuration

TYPE: Dict[str, Any]

environment

Gaia2Environment with ARE tools

TYPE: Gaia2Environment

task

Current task

TYPE: Task

user

Optional user simulator (always None for Gaia2)

TYPE: Optional[User]

seed_generator

Seed generator for reproducibility

TYPE: SeedGenerator

RETURNS DESCRIPTION
Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]

Tuple of (ordered agent list, agent dict keyed by ID)

setup_environment

setup_environment(
    agent_data: Dict[str, Any],
    task: Task,
    seed_generator: SeedGenerator,
) -> Gaia2Environment

Create Gaia2 environment wrapping ARE simulation.

PARAMETER DESCRIPTION
agent_data

Agent configuration

TYPE: Dict[str, Any]

task

Current task

TYPE: Task

seed_generator

Seed generator for reproducibility

TYPE: SeedGenerator

RETURNS DESCRIPTION
Gaia2Environment

Gaia2Environment instance

setup_evaluators

setup_evaluators(
    environment: Gaia2Environment,
    task: Task,
    agents: Sequence[AgentAdapter],
    user: Optional[User],
    seed_generator: SeedGenerator,
) -> Sequence[Evaluator]

Create Gaia2 evaluator using ARE's judge.

PARAMETER DESCRIPTION
environment

Gaia2Environment instance

TYPE: Gaia2Environment

task

Current task with evaluation data

TYPE: Task

agents

Agent instances

TYPE: Sequence[AgentAdapter]

user

Optional user simulator (always None)

TYPE: Optional[User]

seed_generator

Seed generator for reproducibility

TYPE: SeedGenerator

RETURNS DESCRIPTION
Sequence[Evaluator]

List with single Gaia2Evaluator instance

setup_user

setup_user(
    agent_data: Dict[str, Any],
    environment: Gaia2Environment,
    task: Task,
    seed_generator: SeedGenerator,
) -> Optional[User]

Gaia2 uses event-based simulation, not turn-based user simulation.

User interactions in Gaia2 happen through scheduled events (e.g., "user sends message at t=30s") rather than synchronous turn-taking.

PARAMETER DESCRIPTION
agent_data

Agent configuration

TYPE: Dict[str, Any]

environment

Gaia2Environment instance

TYPE: Gaia2Environment

task

Current task

TYPE: Task

seed_generator

Seed generator for reproducibility

TYPE: SeedGenerator

RETURNS DESCRIPTION
Optional[User]

None (no user simulator needed)

Gaia2Environment

Bases: Environment

MASEval Environment wrapping ARE's simulation.

The ARE simulation runs its own internal event loop. Agent interaction happens purely through tool calls - including time control via SystemApp.wait_for_notification(). No special execution loop needed.

Exposes all ARE app tools (Calendar, Email, Messaging, Contacts, Shopping, Cab, City, FileSystem, Browser, ChatsApp, SystemApp, Timer) to agents.

Key Features
  • Wraps ARE's simulation environment
  • Provides MASEval-compatible tool wrappers with tracing
  • Exposes simulation time for temporal reasoning tasks
  • Handles proper cleanup of ARE resources

__init__

__init__(
    task_data: Dict[str, Any],
    callbacks: Optional[List[Any]] = None,
    judge_engine_config: Optional[Any] = None,
)

Initialize Gaia2 environment.

PARAMETER DESCRIPTION
task_data

Task data containing: - scenario: ARE BenchmarkScenario object - capability: Capability type (execution, search, etc.) - universe_id: Universe identifier

TYPE: Dict[str, Any]

callbacks

Optional callbacks

TYPE: Optional[List[Any]] DEFAULT: None

judge_engine_config

Optional :class:Gaia2JudgeEngineConfig controlling which LLM model and provider the ARE judge uses for semantic comparison. Passed explicitly from setup_environment() (lives in evaluation_data).

TYPE: Optional[Any] DEFAULT: None

cleanup

cleanup() -> None

Stop ARE simulation when task completes.

Ensures proper resource cleanup and stops any running simulation.

create_tools

create_tools() -> Dict[str, Gaia2GenericTool]

Wrap ARE app tools for MASEval tracing.

Creates framework-agnostic Gaia2GenericTool instances that provide clean API with built-in tracing.

Filters out AgentUserInterface message-retrieval tools that ARE removes in remove_aui_irrelevant_tools(), and sets wait_for_user_response to False so the AUI does not block waiting for a response when the agent sends a message. User messages are delivered via the notification system instead.

ARE agents/default_agent/are_simulation_main.py:206-228

RETURNS DESCRIPTION
Dict[str, Gaia2GenericTool]

Dict mapping tool names to Gaia2GenericTool instances

gather_config

gather_config() -> Dict[str, Any]

Gather environment configuration.

RETURNS DESCRIPTION
Dict[str, Any]

Configuration dictionary

gather_traces

gather_traces() -> Dict[str, Any]

Collect traces from environment and all tools.

RETURNS DESCRIPTION
Dict[str, Any]

Trace dictionary with scenario info and all tool traces

get_are_environment

get_are_environment() -> Any

Get the underlying ARE Environment.

Used by the evaluator to access completed events and judge.

RETURNS DESCRIPTION
Any

ARE Environment instance

get_notification_system

get_notification_system() -> Any

Get the ARE notification system.

Used by agents that need to poll for messages between iterations, matching ARE's pre-step notification polling behavior.

RETURNS DESCRIPTION
Any

ARE NotificationSystem instance, or None if not available

get_scenario

get_scenario() -> Any

Get the ARE scenario object.

RETURNS DESCRIPTION
Any

ARE BenchmarkScenario object

get_simulation_time

get_simulation_time() -> float

Get current simulation time in seconds.

RETURNS DESCRIPTION
float

Current simulation time in seconds since scenario start

get_start_time

get_start_time() -> Optional[float]

Get the scenario start time.

RETURNS DESCRIPTION
Optional[float]

Start time as Unix timestamp, or None if not available

get_tool

get_tool(name: str) -> Optional[Any]

Get a tool by name.

PARAMETER DESCRIPTION
name

Tool name

TYPE: str

RETURNS DESCRIPTION
Optional[Any]

The tool, or None if not found

get_tools

get_tools() -> Dict[str, Any]

Get all tools as a dict.

get_turn_notifications

get_turn_notifications() -> Tuple[List[str], bool, bool]

Get notifications for turn transitions, re-queuing env notifications.

Matches ARE's get_notifications() in are_simulation_main.py:331-359: drains the notification queue, separates by type, re-queues environment notifications (so the inner loop's pre-step picks them up), and returns user messages and status flags.

RETURNS DESCRIPTION
List[str]

Tuple of (user_messages, has_env_notifications, has_stop).

bool

user_messages are raw message strings for [TASK] formatting.

bool

has_env_notifications is True when env notifications were re-queued.

Tuple[List[str], bool, bool]

has_stop is True when the environment signalled stop.

pause

pause() -> None

Pause the ARE simulation environment.

Stops time progression during LLM generation, matching ARE's simulated generation time behavior. ARE simulation/environment.py:262-272

No-op if environment is not available or not running.

poll_notifications

poll_notifications() -> Tuple[List[str], List[str], bool]

Poll pending notifications from the ARE notification system.

Drains all pending messages from the notification queue and returns them as pre-formatted strings. Call this between agent steps to receive messages that arrived during wait_for_notification() or from background simulation events.

GAIA2 uses an event-driven multi-turn architecture. When the agent calls SystemApp__wait_for_notification, the ARE environment processes scheduled events, advances simulation time, and queues notifications. After the tool returns, call this method to retrieve those notifications and inject them into the agent's context before the next LLM call.

ARE agents/default_agent/steps/are_simulation.py:26-62

RETURNS DESCRIPTION
List[str]

Tuple of (user_messages, env_notifications, has_stop_message).

List[str]

user_messages and env_notifications contain pre-formatted

bool

strings ready to inject into agent context. has_stop_message

Tuple[List[str], List[str], bool]

is True when the environment has signalled the simulation is over.

resume_with_offset

resume_with_offset(offset: float) -> None

Resume the ARE simulation environment with a time offset.

Advances simulation time by the given offset and resumes the event loop. ARE simulation/environment.py:286-298

PARAMETER DESCRIPTION
offset

Time in seconds to advance the simulation clock

TYPE: float

setup_state

setup_state(task_data: Dict[str, Any]) -> Dict[str, Any]

Initialize ARE scenario and start simulation.

Delegates to ARE's preprocess_scenario() for faithful preprocessing:

  1. Ensure SystemApp is present.
  2. Set scenario duration from ARE defaults (1800s standard, 420s for Time).
  3. Initialize the scenario (populates apps, events).
  4. Run oracle mode to generate expected event log.
  5. Soft-reset so app state is clean for agent run.
  6. Create judge and initialize turns with trigger conditions.
  7. Start the agent-mode simulation.
PARAMETER DESCRIPTION
task_data

Task data with scenario, capability, universe_id

TYPE: Dict[str, Any]

RETURNS DESCRIPTION
Dict[str, Any]

State dictionary with scenario metadata

Gaia2Evaluator

Bases: Evaluator

Evaluates Gaia2 scenarios using ARE's judge system.

Uses ARE's GraphPerEventJudge which combines deterministic hard checks (exact value matching) with LLM-based soft checks (semantic comparison of content like email bodies and calendar descriptions).

The evaluator compares completed events in the simulation against oracle (expected) events to compute Goal Success Rate (GSR).

__call__

__call__(
    traces: Dict[str, Any],
    final_answer: Optional[str] = None,
) -> Dict[str, Any]

Evaluate using ARE's judge system.

Uses the judge created during preprocess_scenario() (attached to the scenario object) rather than creating a new one. This ensures turn initialization and judge state are consistent.

Exceptions return gsr=None (excluded from scoring), matching ARE's behavior where exceptions/no_validation get score=None. ARE benchmark/hf_upload_utils.py:33-52, benchmark/report_stats.py

PARAMETER DESCRIPTION
traces

Filtered execution traces

TYPE: Dict[str, Any]

final_answer

Final answer from agent (not used in Gaia2)

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
Dict[str, Any]

Dict with evaluation results. gsr is None for evaluation errors

Dict[str, Any]

(excluded from scoring) or a float for valid results.

__init__

__init__(
    task: Task,
    environment: Gaia2Environment,
    user: Optional[Any] = None,
    use_llm_judge: bool = False,
    model: Optional[Any] = None,
)

Initialize the evaluator.

PARAMETER DESCRIPTION
task

Task being evaluated

TYPE: Task

environment

Gaia2Environment instance

TYPE: Gaia2Environment

user

Optional user simulator (not used in Gaia2)

TYPE: Optional[Any] DEFAULT: None

use_llm_judge

Whether to use LLM-based judge

TYPE: bool DEFAULT: False

model

Optional ModelAdapter for LLM-based evaluation

TYPE: Optional[Any] DEFAULT: None

filter_traces

filter_traces(traces: Dict[str, Any]) -> Dict[str, Any]

Extract tool invocations and environment state for evaluation.

PARAMETER DESCRIPTION
traces

Full execution traces

TYPE: Dict[str, Any]

RETURNS DESCRIPTION
Dict[str, Any]

Dict with: - tool_invocations: List of all tool calls with timing - simulation_time: Final simulation time - scenario_id: For correlation

DefaultAgentGaia2Benchmark

Bases: Gaia2Benchmark

Gaia2 benchmark with default agent implementation.

Provides a ready-to-use benchmark matching ARE's reference agent behavior. Uses text-based ReAct format with JSON actions, matching ARE's implementation.

Default parameters (matching ARE): - max_iterations: 80 - temperature: 0.5 - max_tokens: 16384 - invalid_format_retries: 10

Example

from maseval.benchmark.gaia2 import DefaultAgentGaia2Benchmark, load_tasks

tasks = load_tasks(capability="execution", limit=5)

benchmark = DefaultAgentGaia2Benchmark( agent_data={"model_id": "gpt-4o"}, ) results = benchmark.run(tasks)

seed_generator property

seed_generator: SeedGenerator

The seed generator for this benchmark.

The seed generator is configured at benchmark initialization via the seed or seed_generator parameters. When seed=None (the default), the generator's derive_seed() method returns None, effectively disabling seeding while maintaining a uniform interface.

RETURNS DESCRIPTION
SeedGenerator

The root SeedGenerator instance.

usage property

usage: Usage

Running usage total across all task repetitions.

Queryable at any time, including while the benchmark is still running. Returns the grand total of all usage collected so far.

usage_by_component property

usage_by_component: Dict[str, Usage]

Per-component running usage totals across all repetitions.

Keys are registry keys (e.g., "models:main_model").

__init__

__init__(
    agent_data: Optional[Dict[str, Any]] = None,
    **kwargs: Any,
)

Initialize benchmark.

PARAMETER DESCRIPTION
agent_data

Agent configuration with: - model_id: Required model identifier - llm_args: Optional model call arguments (temperature, max_tokens, etc.) - max_iterations: Max iterations per task (default: 80) - invalid_format_retries: Max retries for invalid format (default: 10) - simulated_generation_time_config: Optional SimulatedGenerationTimeConfig for simulating LLM generation time in the simulation (default: None) - verbose: Verbosity level (default: 0)

TYPE: Optional[Dict[str, Any]] DEFAULT: None

**kwargs

Additional Benchmark arguments

TYPE: Any DEFAULT: {}

add_callback

add_callback(callback: BenchmarkCallback) -> None

Register a callback handler to monitor benchmark execution.

PARAMETER DESCRIPTION
callback

A BenchmarkCallback instance that will receive execution events.

TYPE: BenchmarkCallback

How to use

Callbacks receive notifications at key lifecycle points for tracing, progress tracking, or custom metrics collection. See BenchmarkCallback for available hooks and their signatures.

from maseval.core.callbacks import MessageTracingCallback

benchmark = MyBenchmark(tasks=tasks, agent_data=config)
benchmark.add_callback(MessageTracingCallback(output_dir="logs"))
results = benchmark.run()

clear_registry

clear_registry() -> None

Clear the component registry after a task repetition completes.

This method is called automatically by run() after each task repetition to ensure components are not carried over between repetitions. The reports list persists across all repetitions for aggregated analysis.

collect_all_configs

collect_all_configs() -> Dict[str, Any]

Collect configuration from all registered components for the current task repetition.

This method is called automatically by run() after each task repetition completes and before evaluation begins. It gathers comprehensive configuration from all registered components (agents, models, tools, simulators, callbacks, etc.) for that specific repetition. After collection, the registry is cleared for the next repetition.

The collected configs are stored in benchmark.reports list along with traces for persistent access across all task repetitions.

Output fields:

  • metadata - Collection timestamp and thread info
  • agents - Dict mapping agent names to their config (settings, parameters)
  • models - Dict mapping model names to their config (model IDs, parameters)
  • tools - Dict mapping tool names to their config (specifications, settings)
  • simulators - Dict mapping simulator names to their config (parameters, templates)
  • callbacks - Dict mapping callback names to their config (settings)
  • environment - Direct config from the environment (not nested), or None if not present
  • user - Direct config from the user simulator (not nested), or None if not present
  • other - Dict for any other registered components
  • benchmark - Benchmark-level configuration (git, system, packages)
RETURNS DESCRIPTION
Dict[str, Any]

Structured dictionary containing configuration from all registered components.

How to use

This method is called automatically by run() after each task repetition:

# Automatic collection (recommended)
results = benchmark.run()

# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
    print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
    # Agents is a dict: agent_name -> config
    print(f"Agent config: {report['config']['agents']['my_agent']}")
    # Environment and user are direct (not nested)
    print(f"Environment config: {report['config']['environment']}")
    print(f"User config: {report['config']['user']}")
    # Benchmark-level config
    print(f"Git commit: {report['config']['benchmark']['git']['commit_hash']}")

The collected configs are available in the results for reproducibility analysis.

collect_all_traces

collect_all_traces() -> Dict[str, Any]

Collect execution traces from all registered components for the current task repetition.

This method is called automatically by run() after each task repetition completes and before evaluation begins. It gathers comprehensive traces from all registered components (agents, models, tools, simulators, callbacks, etc.) for that specific repetition. After collection, the registry is cleared for the next repetition.

The collected traces are stored in benchmark.reports list along with configs for persistent access across all task repetitions.

Output fields:

  • metadata - Collection timestamp and thread info
  • agents - Dict mapping agent names to their traces (messages, execution data)
  • models - Dict mapping model names to their traces (API calls, timing, errors)
  • tools - Dict mapping tool names to their traces (invocations, parameters)
  • simulators - Dict mapping simulator names to their traces (attempts, outcomes)
  • callbacks - Dict mapping callback names to their traces (custom data)
  • environment - Direct traces from the environment (not nested), or None if not present
  • user - Direct traces from the user simulator (not nested), or None if not present
  • other - Dict for any other registered components
RETURNS DESCRIPTION
Dict[str, Any]

Structured dictionary containing execution traces from all registered components.

How to use

This method is called automatically by run() after each task repetition:

# Automatic collection (recommended)
results = benchmark.run()

# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
    print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
    # Agents is a dict: agent_name -> traces
    print(f"Agent messages: {report['traces']['agents']['my_agent']}")
    # Environment and user are direct (not nested)
    print(f"Environment state: {report['traces']['environment']}")
    print(f"User interactions: {report['traces']['user']}")

The collected traces are passed to the evaluator's evaluate() method and stored in benchmark.reports for later analysis.

collect_all_usage

collect_all_usage() -> Dict[str, Any]

Collect usage from all registered components for the current task repetition.

This method is called automatically by run() after each task repetition completes. It gathers usage from all registered UsageTrackableMixin components and also accumulates into persistent running totals accessible via usage and usage_by_component.

RETURNS DESCRIPTION
Dict[str, Any]

Structured dictionary containing usage from all registered components.

evaluate

evaluate(
    evaluators: Sequence[Evaluator],
    agents: Dict[str, AgentAdapter],
    final_answer: Any,
    traces: Dict[str, Any],
) -> List[Dict[str, Any]]

Evaluate using Gaia2 evaluators.

Uses each evaluator's filter_traces() method to extract relevant data, then calls the evaluator with the filtered traces.

Returns Gaia2 format
  • gsr: Goal Success Rate
  • partial_gsr: Partial success rate
  • passed: Boolean
  • rationale: Judge rationale (if available)
PARAMETER DESCRIPTION
evaluators

List of evaluators

TYPE: Sequence[Evaluator]

agents

Dict of agents

TYPE: Dict[str, AgentAdapter]

final_answer

Final answer from agents

TYPE: Any

traces

Execution traces

TYPE: Dict[str, Any]

RETURNS DESCRIPTION
List[Dict[str, Any]]

List of evaluation result dicts

execution_loop

execution_loop(
    agents: Sequence[AgentAdapter],
    task: Task,
    environment: Environment,
    user: Optional[User],
) -> Any

Execute agents with optional user interaction loop.

This method orchestrates the agent-user interaction pattern. When a user is present, the user initiates the conversation using user.get_initial_query(). If no user is present, task.query is used as the initial query.

Interaction Flow

By default, agents execute once (max_invocations=1). For multi-turn interaction, set self.max_invocations > 1 in your benchmark's __init__. The loop continues until max_invocations is reached or user.is_done() returns True (e.g., max turns reached or stop token detected).

Note

Override this method in your benchmark subclass to implement custom interaction patterns (e.g., agent-initiated conversations, different termination conditions, or specialized query routing).

PARAMETER DESCRIPTION
agents

Agents to execute (typically the orchestrator).

TYPE: Sequence[AgentAdapter]

task

The task being solved.

TYPE: Task

environment

The environment providing tools and state.

TYPE: Environment

user

Optional user simulator. If provided, the user initiates and drives the conversation. If None, a single agent execution with task.query.

TYPE: Optional[User]

RETURNS DESCRIPTION
Any

Final answer from the last agent execution.

Example

For interactive benchmarks, enable multi-turn interaction::

def __init__(self, ...):
    super().__init__(...)
    self.max_invocations = 5  # Up to 5 agent-user exchanges

get_failed_tasks

get_failed_tasks(
    status_filter: Optional[
        Union[
            TaskExecutionStatus, List[TaskExecutionStatus]
        ]
    ] = None,
    reports: Optional[List[Dict[str, Any]]] = None,
) -> SequentialTaskQueue

Get tasks that failed during benchmark execution.

This method retrieves failed tasks based on their execution status, useful for debugging, retry logic, or failure analysis.

PARAMETER DESCRIPTION
status_filter

Filter by specific failure status(es). If None, returns all failed tasks (any status except SUCCESS). Can be a single TaskExecutionStatus or a list of them. Examples: - TaskExecutionStatus.TASK_EXECUTION_FAILED: Only tasks that failed during execution - TaskExecutionStatus.EVALUATION_FAILED: Only tasks where evaluation failed - [TaskExecutionStatus.TASK_EXECUTION_FAILED, TaskExecutionStatus.SETUP_FAILED]: Tasks that failed during execution or setup

TYPE: Optional[Union[TaskExecutionStatus, List[TaskExecutionStatus]]] DEFAULT: None

reports

Optional list of reports to analyze. If None, uses the reports from the last run() call. This allows analyzing externally stored or modified reports.

TYPE: Optional[List[Dict[str, Any]]] DEFAULT: None

RETURNS DESCRIPTION
SequentialTaskQueue

SequentialTaskQueue containing the failed tasks. Empty if no failures match the filter.

RAISES DESCRIPTION
RuntimeError

If reports is None and run() has not been executed yet.

How to use
# Run benchmark
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)

# Get all failed tasks (from internal state)
failed = benchmark.get_failed_tasks()
print(f"Failed: {len(failed)}/{len(benchmark.tasks)} tasks")

# Or work with returned reports (safe from internal state changes)
failed = benchmark.get_failed_tasks(reports=reports)

# Get only tasks that failed during execution (not evaluation)
execution_failures = benchmark.get_failed_tasks(
    TaskExecutionStatus.TASK_EXECUTION_FAILED,
    reports=reports
)

# Get setup and execution failures
critical_failures = benchmark.get_failed_tasks(
    status_filter=[
        TaskExecutionStatus.SETUP_FAILED,
        TaskExecutionStatus.TASK_EXECUTION_FAILED
    ],
    reports=reports
)

# Retry failed tasks elegantly - this is the key use case!
if len(failed) > 0:
    retry_reports = benchmark.run(tasks=failed)

# Or more concisely
reports = benchmark.run(tasks=tasks)
retry_reports = benchmark.run(tasks=benchmark.get_failed_tasks())

get_model_adapter abstractmethod

get_model_adapter(
    model_id: str, **kwargs: Any
) -> ModelAdapter

Get or create model adapter. Must be implemented by subclass.

PARAMETER DESCRIPTION
model_id

Model identifier

TYPE: str

**kwargs

Additional arguments (e.g., register_name)

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
ModelAdapter

ModelAdapter instance

register

register(
    category: str,
    name: str,
    component: RegisterableComponent,
) -> RegisterableComponent

Register a component for comprehensive trace and configuration collection.

All core MASEval components (AgentAdapter, ModelAdapter, Environment, User, LLMSimulator, BenchmarkCallback) inherit from TraceableMixin and/or ConfigurableMixin, and are automatically registered for both trace and configuration collection before evaluation.

Note: Most components are automatically registered when returned from setup methods (setup_environment, setup_user, setup_agents). You only need to manually register additional components like models, simulators, or tools that aren't automatically captured.

PARAMETER DESCRIPTION
category

Component category (e.g., "agents", "models", "tools", "simulators", "callbacks", "user", "environment", "seeding"). Use plural form to match the structure in collect_all_traces() and collect_all_configs().

TYPE: str

name

Unique identifier for this component within its category

TYPE: str

component

Any object inheriting from TraceableMixin and/or ConfigurableMixin

TYPE: RegisterableComponent

RETURNS DESCRIPTION
RegisterableComponent

The component (for chaining convenience)

RAISES DESCRIPTION
ValueError

If the component is already registered under a different name

How to use

Most components are auto-registered. Manual registration is only needed for additional components:

def setup_agents(self, agent_data, environment, task, user):
    # Create model (needs manual registration)
    model = MyModelAdapter(...)
    self.register("models", "main_model", model)

    # Create agent (auto-registered when returned)
    agent = MyAgent(model=model)
    agent_adapter = AgentAdapter(agent, "agent1")

    # Environment and user are also auto-registered
    return [agent_adapter], {"agent1": agent_adapter}

Traces and configs are automatically collected before evaluation via collect_all_traces() and collect_all_configs() which are called internally by the run() method.

run

run(
    tasks: Union[
        Task, BaseTaskQueue, Iterable[Union[Task, dict]]
    ],
    agent_data: Dict[str, Any] | Iterable[Dict[str, Any]],
) -> List[Dict[str, Any]]

Initialize and execute the complete benchmark loop across all tasks.

PARAMETER DESCRIPTION
tasks

Task source for execution. Can be: - A single Task object - A BaseTaskQueue (SequentialTaskQueue, PriorityTaskQueue, or custom AdaptiveTaskQueue) - An iterable of Task objects or dicts that will be converted to Tasks

When a BaseTaskQueue is provided, it controls the task ordering. AdaptiveTaskQueue subclasses are automatically registered as callbacks to receive task completion notifications.

TYPE: Union[Task, BaseTaskQueue, Iterable[Union[Task, dict]]]

agent_data

Configuration for agents. Either a single dict applied to all tasks, or an iterable of dicts with one configuration per task. Agent data typically includes model parameters, agent architecture details, and tool specifications.

TYPE: Dict[str, Any] | Iterable[Dict[str, Any]]

RETURNS DESCRIPTION
List[Dict[str, Any]]

List of report dictionaries, one per task repetition. Each report contains:

List[Dict[str, Any]]
  • task_id: Task identifier (UUID)
List[Dict[str, Any]]
  • repeat_idx: Repetition index (0 to n_task_repeats-1)
List[Dict[str, Any]]
  • status: Execution status (one of TaskExecutionStatus enum values)
List[Dict[str, Any]]
  • traces: Execution traces from all registered components
List[Dict[str, Any]]
  • config: Configuration from all registered components and benchmark level
List[Dict[str, Any]]
  • eval: Evaluation results (None if task or evaluation failed)
List[Dict[str, Any]]
  • error: Error details dict (only present if status is not SUCCESS), containing:
  • error_type: Exception class name
  • error_message: Exception message
  • traceback: Full traceback string
RAISES DESCRIPTION
ValueError

If agent_data length doesn't match number of tasks (when agent_data is an iterable).

How to use

This is the framework's main orchestration method that runs your entire benchmark. It iterates through all tasks, handles repetitions, and manages the three-stage lifecycle for each execution. You don't implement this method—instead, you call it to start the benchmark after implementing the setup and execution methods.

By default, the benchmark will continue executing remaining tasks even if some fail. You can change this behavior by setting fail_on_task_error=True, fail_on_evaluation_error=True, or fail_on_setup_error=True when instantiating the benchmark. Each task execution returns a status indicating success or the specific failure type (see TaskExecutionStatus).

For each task execution, the framework:

  1. Calls your setup methods to initialize components
  2. Calls your run_agents() method to execute the task
  3. Collects message histories and calls evaluators
  4. Stores results and triggers callbacks

Pseudocode structure:

for task in tasks:
    for repeat in range(n_task_repeats):
        # Setup stage
        environment = setup_environment(agent_data, task)
        user = setup_user(agent_data, environment, task)
        agents_to_run, agents_dict = setup_agents(agent_data, environment, task, user)
        evaluators = setup_evaluators(environment, task, agents_to_run, user)

        # Run stage (execution_loop handles multi-turn if user exists)
        agents_output = execution_loop(agents_to_run, task, environment, user)

        # Evaluate stage
        traces = collect_message_histories(agents_dict)
        eval_results = evaluate(evaluators, traces, agents_dict)

        # Store results
        store_result(task_id, traces, eval_results)

Callback hooks are triggered at these points:

  • on_run_start: Before processing any tasks
  • on_task_start: Before processing a task (once per task, not per repeat)
  • on_task_repeat_start: Before each repetition of a task
  • on_task_repeat_end: After each repetition completes
  • on_task_end: After all repetitions of a task complete
  • on_run_end: After all tasks complete
# Typical usage
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)

# Analyze results
for report in reports:
    print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}: {report['eval']}")
    print(f"Config: {report['config']}")
    print(f"Traces: {report['traces']}")

# Parallel execution with 4 workers
benchmark = MyBenchmark(num_workers=4)
reports = benchmark.run(tasks=tasks, agent_data=config)

# Single agent config for all tasks
reports = benchmark.run(tasks=tasks, agent_data={"model": "gpt-4"})

# Task-specific agent configs (must match task count)
reports = benchmark.run(
    tasks=tasks,
    agent_data=[
        {"model": "gpt-4", "difficulty": "easy"},
        {"model": "gpt-4", "difficulty": "hard"},
    ]
)

# Priority-based execution
from maseval.core.task import PriorityTaskQueue
for task in tasks:
    task.protocol.priority = compute_priority(task)
queue = PriorityTaskQueue(tasks)
reports = benchmark.run(tasks=queue, agent_data=config)

# Adaptive queue (auto-registered as callback)
queue = MyAdaptiveTaskQueue(tasks)
reports = benchmark.run(tasks=queue)  # queue receives on_task_complete callbacks

run_agents

run_agents(
    agents: Sequence[AgentAdapter],
    task: Task,
    environment: Gaia2Environment,
    query: str = "",
) -> Any

Execute agents and ensure environment cleanup.

PARAMETER DESCRIPTION
agents

Agent instances to run

TYPE: Sequence[AgentAdapter]

task

Current task

TYPE: Task

environment

Gaia2Environment

TYPE: Gaia2Environment

query

Query/prompt for agents

TYPE: str DEFAULT: ''

RETURNS DESCRIPTION
Any

Final answer from agents

setup_agents

setup_agents(
    agent_data: Dict[str, Any],
    environment: Gaia2Environment,
    task: Task,
    user: Optional[User],
    seed_generator: SeedGenerator,
) -> Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]

Create default Gaia2 agent.

PARAMETER DESCRIPTION
agent_data

Agent configuration

TYPE: Dict[str, Any]

environment

Gaia2Environment with ARE tools

TYPE: Gaia2Environment

task

Current task

TYPE: Task

user

Optional user (always None)

TYPE: Optional[User]

seed_generator

Seed generator for reproducibility

TYPE: SeedGenerator

RETURNS DESCRIPTION
Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]

Tuple of (agent list, agent dict)

setup_environment

setup_environment(
    agent_data: Dict[str, Any],
    task: Task,
    seed_generator: SeedGenerator,
) -> Gaia2Environment

Create Gaia2 environment wrapping ARE simulation.

PARAMETER DESCRIPTION
agent_data

Agent configuration

TYPE: Dict[str, Any]

task

Current task

TYPE: Task

seed_generator

Seed generator for reproducibility

TYPE: SeedGenerator

RETURNS DESCRIPTION
Gaia2Environment

Gaia2Environment instance

setup_evaluators

setup_evaluators(
    environment: Gaia2Environment,
    task: Task,
    agents: Sequence[AgentAdapter],
    user: Optional[User],
    seed_generator: SeedGenerator,
) -> Sequence[Evaluator]

Create Gaia2 evaluator using ARE's judge.

PARAMETER DESCRIPTION
environment

Gaia2Environment instance

TYPE: Gaia2Environment

task

Current task with evaluation data

TYPE: Task

agents

Agent instances

TYPE: Sequence[AgentAdapter]

user

Optional user simulator (always None)

TYPE: Optional[User]

seed_generator

Seed generator for reproducibility

TYPE: SeedGenerator

RETURNS DESCRIPTION
Sequence[Evaluator]

List with single Gaia2Evaluator instance

setup_user

setup_user(
    agent_data: Dict[str, Any],
    environment: Gaia2Environment,
    task: Task,
    seed_generator: SeedGenerator,
) -> Optional[User]

Gaia2 uses event-based simulation, not turn-based user simulation.

User interactions in Gaia2 happen through scheduled events (e.g., "user sends message at t=30s") rather than synchronous turn-taking.

PARAMETER DESCRIPTION
agent_data

Agent configuration

TYPE: Dict[str, Any]

environment

Gaia2Environment instance

TYPE: Gaia2Environment

task

Current task

TYPE: Task

seed_generator

Seed generator for reproducibility

TYPE: SeedGenerator

RETURNS DESCRIPTION
Optional[User]

None (no user simulator needed)

DefaultGaia2Agent

Default agent implementation for Gaia2 benchmark.

ReAct-style agent matching ARE's reference implementation. Uses text-based action parsing (Thought/Action/Observation cycle) rather than native function calling.

Uses ARE's two-level loop architecture:

  • Outer loop (_turn_loop): iterates over turns, matching are_simulation_main.py:agent_loop(). Between turns, drains the notification queue, formats user messages as [TASK], re-queues environment notifications for the inner loop's pre-step.
  • Inner loop (_step_loop): iterates over steps within a turn, matching base_agent.py:execute_agent_loop(). Terminates on BOTH send_message_to_user (TERMINATED) and wait_for_notification (PAUSED).

Key characteristics matching ARE (base_agent.py, are_simulation.py):

  • Text-based JSON action format with <end_action> token
  • Stop sequences: ["<end_action>", "Observation:"]
  • Default temperature: 0.5 (ARE llm_engine.py:17)
  • Default max_tokens: 16384 (ARE llm_engine.py:16)
  • Default max_iterations: 80 (ARE are_simulation_agent_config.py:36)
  • Invalid format retry: up to 10 times (ARE base_agent.py:347)
  • Iteration counter incremented EVERY loop (including errors) (ARE base_agent.py:849)
  • Terminates inner loop on send_message_to_user (TERMINATED) or wait_for_notification (PAUSED)
  • Max-iterations sends message to user via tool (ARE are_simulation.py:109-116)
  • Pre-step notification polling (ARE steps/are_simulation.py:26-62)

iteration_count property

iteration_count: int

Get current iteration count.

terminated property

terminated: bool

Get whether the agent has terminated.

__init__

__init__(
    tools: Dict[str, Callable],
    model: ModelAdapter,
    environment: Optional[Any] = None,
    llm_args: Optional[Dict[str, Any]] = None,
    max_iterations: int = _DEFAULT_MAX_ITERATIONS,
    invalid_format_retries: int = _DEFAULT_INVALID_FORMAT_RETRIES,
    simulated_generation_time_config: Optional[
        SimulatedGenerationTimeConfig
    ] = None,
    verbose: int = 0,
)

Initialize the agent.

PARAMETER DESCRIPTION
tools

Dict of tool name -> callable

TYPE: Dict[str, Callable]

model

ModelAdapter for LLM interactions

TYPE: ModelAdapter

environment

Optional Gaia2Environment for notification polling

TYPE: Optional[Any] DEFAULT: None

llm_args

Additional arguments for model calls, passed as kwargs to model.chat(). Defaults (from ARE source):

  • temperature: 0.5 (ARE llm_engine.py:17)
  • max_tokens: 16384 (ARE llm_engine.py:16)
  • stop: ["<end_action>", "Observation:"]

Stop-token handling: Client-side stop-token truncation (ARE litellm_engine.py:126-127) is always applied to the response, regardless of whether stop is also passed to the API. When stop is passed, the API enforces it for efficiency (saves tokens, precise cutoff). When stop is None, only client-side truncation runs — action parsing still works correctly.

None filtering: Parameters set to None are omitted from the API call entirely. Use this to disable parameters the model provider rejects::

llm_args={"stop": None, "temperature": None}

TYPE: Optional[Dict[str, Any]] DEFAULT: None

max_iterations

Maximum iterations before stopping. Default 80.

TYPE: int DEFAULT: _DEFAULT_MAX_ITERATIONS

invalid_format_retries

Max retries for invalid format. Default 10.

TYPE: int DEFAULT: _DEFAULT_INVALID_FORMAT_RETRIES

simulated_generation_time_config

Optional config for simulated generation time. When set, the simulation is paused during LLM generation and resumed with a time offset. Default None (disabled). ARE agents/are_simulation_agent_config.py:28-30

TYPE: Optional[SimulatedGenerationTimeConfig] DEFAULT: None

verbose

Verbosity level (0=quiet, 1=basic, 2=detailed)

TYPE: int DEFAULT: 0

get_messages

get_messages() -> List[Dict[str, Any]]

Get message history.

RETURNS DESCRIPTION
List[Dict[str, Any]]

List of messages

reset

reset() -> None

Reset agent state.

run

run(query: str) -> str

Execute task and return final response.

GAIA2 is event-driven: the real task instruction is delivered via the notification system (first send_message_to_agent event). The outer turn loop (_turn_loop) drains the notification queue and formats user messages as [TASK], matching ARE's agent_loop().

When query is non-empty (e.g. standalone use), it is prepended as a [TASK] message before entering the turn loop.

PARAMETER DESCRIPTION
query

Task query/instructions (may be empty for GAIA2)

TYPE: str

RETURNS DESCRIPTION
str

Final text response from agent

Gaia2GenericTool

Bases: TraceableMixin, ConfigurableMixin

Framework-agnostic wrapper for ARE tools.

Provides clean API with built-in tracing for MASEval compatibility. Developers wrap this for their framework using composition.

Example for smolagents:

class MySmolagentsTool(smolagents.Tool):
    skip_forward_signature_validation = True

    def __init__(self, generic_tool: Gaia2GenericTool):
        self.generic_tool = generic_tool
        self.name = generic_tool.name
        self.description = generic_tool.description
        self.inputs = generic_tool.inputs
        self.output_type = generic_tool.output_type
        super().__init__()

    def forward(self, **kwargs) -> str:
        return self.generic_tool(**kwargs)

    def gather_traces(self):
        return self.generic_tool.gather_traces()

This wrapper preserves ARE's native return types while adding MASEval tracing capabilities and providing a framework-agnostic interface.

__call__

__call__(**kwargs: Any) -> Any

Execute tool and record invocation.

PARAMETER DESCRIPTION
**kwargs

Tool arguments

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Any

Tool execution result (preserves ARE's native return type)

__init__

__init__(are_tool: Any, environment: Gaia2Environment)

Initialize the tool wrapper.

PARAMETER DESCRIPTION
are_tool

ARE AppTool instance to wrap

TYPE: Any

environment

The Gaia2Environment this tool belongs to

TYPE: Gaia2Environment

__repr__

__repr__() -> str

String representation of the tool.

gather_config

gather_config() -> Dict[str, Any]

Gather configuration from this tool.

RETURNS DESCRIPTION
Dict[str, Any]

Dictionary containing tool configuration

gather_traces

gather_traces() -> Dict[str, Any]

Gather execution traces from this tool.

RETURNS DESCRIPTION
Dict[str, Any]

Dictionary containing tool traces with invocation history

load_tasks

load_tasks(
    capability: Optional[str] = None,
    split: str = "validation",
    limit: Optional[int] = None,
    timeout_seconds: Optional[
        float
    ] = DEFAULT_TIMEOUT_SECONDS,
    max_retries: int = DEFAULT_MAX_RETRIES,
) -> TaskQueue

Load Gaia2 tasks from HuggingFace.

Each HuggingFace config corresponds to a capability (execution, search, adaptability, time, ambiguity). When capability is None, all capabilities are loaded and combined.

GAIA2 is event-driven: the task query is delivered to agents via the notification system at runtime (first send_message_to_agent event), not as a static field. task.query is left empty.

PARAMETER DESCRIPTION
capability

Filter by capability type. None loads all capabilities.

TYPE: Optional[str] DEFAULT: None

split

Dataset split (currently only "validation" available)

TYPE: str DEFAULT: 'validation'

limit

Maximum number of tasks to load (across all capabilities)

TYPE: Optional[int] DEFAULT: None

timeout_seconds

Maximum execution time per task. Default 1860 (31 minutes, matching ARE's DEFAULT_SCENARIO_TIMEOUT). Set to None to disable timeout.

TYPE: Optional[float] DEFAULT: DEFAULT_TIMEOUT_SECONDS

max_retries

Maximum retry attempts. Default 1 (skip on failure).

TYPE: int DEFAULT: DEFAULT_MAX_RETRIES

RETURNS DESCRIPTION
TaskQueue

TaskQueue with Task objects.

RAISES DESCRIPTION
ValueError

If capability or split is invalid

ImportError

If required dependencies are not installed

Example

tasks = load_tasks(capability="execution", limit=5) len(tasks) 5

Load all capabilities

tasks = load_tasks(limit=10)

configure_model_ids

configure_model_ids(
    tasks: Union[TaskQueue, List[Task]],
    *,
    evaluator_model_id: Optional[str] = None,
    judge_engine_config: Optional[
        Gaia2JudgeEngineConfig
    ] = None,
) -> Union[TaskQueue, List[Task]]

Configure model IDs and judge engine for benchmark components.

Gaia2's GraphPerEventJudge uses an LLM for semantic comparison of tool arguments (email content, calendar descriptions, etc.). By default it uses ARE's built-in defaults (meta-llama/Meta-Llama-3.3-70B-Instruct via HuggingFace). Pass judge_engine_config to override the model/provider.

Note: Gaia2 doesn't have a user simulator (interactions happen through scheduled events), so there's no user_model_id.

PARAMETER DESCRIPTION
tasks

TaskQueue or list of Tasks to configure.

TYPE: Union[TaskQueue, List[Task]]

evaluator_model_id

Optional model ID for LLM-based evaluation.

TYPE: Optional[str] DEFAULT: None

judge_engine_config

Optional judge engine configuration. Controls which LLM model and provider the ARE judge uses for semantic comparison. When None, ARE's defaults are used.

TYPE: Optional[Gaia2JudgeEngineConfig] DEFAULT: None

RETURNS DESCRIPTION
Union[TaskQueue, List[Task]]

The same collection (mutated in place for convenience).

Example::

>>> tasks = load_tasks(capability="execution", limit=5)
>>> configure_model_ids(
...     tasks,
...     judge_engine_config=Gaia2JudgeEngineConfig(
...         provider="openrouter",
...     ),
... )

compute_gaia2_metrics

compute_gaia2_metrics(
    results: List[Dict[str, Any]],
) -> Dict[str, Any]

Compute summary metrics across all Gaia2 benchmark results.

Matches ARE's scoring logic: - Only validated runs (non-null GSR) count toward success rate - Exceptions and no_validation results are excluded from scoring - ARE benchmark/report_stats.py: success_rate calculated only from validated runs

PARAMETER DESCRIPTION
results

List of result dicts from benchmark.run()

TYPE: List[Dict[str, Any]]

RETURNS DESCRIPTION
Dict[str, Any]

Dict with metrics including total_tasks, scored_tasks, GSR, and per-capability breakdown.