GAIA2: Dynamic Multi-Step Scenario Benchmark (Beta)
Beta
This benchmark has been implemented carefully, but it is highly complex and we have not yet validated the results against the original implementation. Use with caution when comparing with existing results or the original paper's numbers. Contributions and compute donations welcome!
The GAIA2 Benchmark evaluates LLM-based agents on dynamic, multi-step scenarios using Meta's ARE (Agent Research Environments) platform. It tests agents across multiple capability dimensions in a simulated mobile environment.
Overview
GAIA2 is designed to evaluate agents in realistic, time-sensitive scenarios. The benchmark features:
- ARE simulation environment with real-time dynamics and event scheduling
- Tool-based time control via
wait_for_notification()for temporal reasoning - 5 capability dimensions: execution, search, adaptability, time, ambiguity
- Deterministic evaluation via GraphPerEventJudge comparing completed vs expected events
- 12 app tools: Calendar, Email, Messaging, Contacts, Shopping, Cab, City, FileSystem, Browser, ChatsApp, SystemApp, Timer
Reference Paper: "GAIA-2: A Controllable Multi-Turn Conversational Benchmark for Agents"
Check out the BENCHMARKS.md file for more information including licenses.
Installation
GAIA2 requires additional dependencies:
pip install maseval[gaia2]
Or with uv:
uv add maseval --extra gaia2
Quick Start
from maseval.benchmark.gaia2 import (
Gaia2Benchmark, Gaia2Environment, Gaia2Evaluator,
load_tasks, configure_model_ids, compute_gaia2_metrics,
)
# Load tasks (downloads from HuggingFace automatically)
tasks = load_tasks(capability="execution", limit=5)
# Optionally configure LLM-based judge
configure_model_ids(tasks, evaluator_model_id="gpt-4o")
# Create your framework-specific benchmark subclass
class MyGaia2Benchmark(Gaia2Benchmark):
def setup_agents(self, agent_data, environment, task, user, seed_generator):
tools = environment.create_tools()
# Create your agent with these tools
...
def get_model_adapter(self, model_id, **kwargs):
adapter = MyModelAdapter(model_id)
if "register_name" in kwargs:
self.register("models", kwargs["register_name"], adapter)
return adapter
# Run benchmark
benchmark = MyGaia2Benchmark()
results = benchmark.run(tasks)
# Compute metrics
metrics = compute_gaia2_metrics(results)
print(f"GSR: {metrics['gsr']:.2%}")
print(f"By capability: {metrics['by_capability']}")
For baseline comparisons, use DefaultAgentGaia2Benchmark which provides a ReAct-style reference agent:
from maseval.benchmark.gaia2 import DefaultAgentGaia2Benchmark
# Note: You must subclass to provide get_model_adapter()
class MyDefaultGaia2Benchmark(DefaultAgentGaia2Benchmark):
def get_model_adapter(self, model_id, **kwargs):
adapter = MyModelAdapter(model_id)
if "register_name" in kwargs:
self.register("models", kwargs["register_name"], adapter)
return adapter
benchmark = MyDefaultGaia2Benchmark(
agent_data={"model_id": "gpt-4o"},
)
results = benchmark.run(tasks)
Capabilities
GAIA2 tasks are organized by capability dimension:
| Capability | Description |
|---|---|
execution |
Basic task execution |
search |
Information retrieval tasks |
adaptability |
Adapting to changing requirements |
time |
Temporal reasoning tasks |
ambiguity |
Handling ambiguous instructions |
Load specific capabilities:
# Load only time-related tasks
tasks = load_tasks(capability="time", limit=10)
# Load all capabilities
tasks = load_tasks(limit=50)
Multi-Turn Notification Loop
GAIA2 uses an event-driven multi-turn architecture. Scenarios have scheduled events (e.g., "calendar events added at t=240s", "friend replies at t=300s") that the agent must wait for and react to.
The benchmark invokes the agent once. The agent handles multi-turn internally via the notification loop:
- Agent calls
SystemApp__wait_for_notification(timeout=N)as a normal tool. - The ARE environment processes scheduled events, advances simulation time, and queues resulting notifications — all synchronously during the tool call.
- The tool returns. The agent's loop continues (it does not terminate).
- Before the next LLM call, the agent polls
environment.poll_notifications()to retrieve messages that arrived during the wait. - The agent injects those messages into its context and continues reasoning.
- Eventually the agent calls
AgentUserInterface__send_message_to_user— the only termination signal.
What custom agents must implement
The ARE tools handle all environment-side mechanics automatically (event processing, time advancement, notification queuing). No callbacks or hooks required. Custom agents must handle two things:
1. Do not terminate on wait_for_notification. Treat it as a regular tool call. Only terminate on AgentUserInterface__send_message_to_user.
2. Poll notifications between steps. After wait_for_notification returns, new messages are in the queue. Call environment.poll_notifications() to drain them:
# Between agent steps (e.g., before each LLM call):
user_msgs, env_notifs, has_stop = environment.poll_notifications()
# Inject into agent context (format matches ARE's convention):
if user_msgs:
content = "\n".join(user_msgs)
messages.append({"role": "user", "content": f"User messages updates:\n***\n{content}\n***\n"})
if env_notifs:
content = "\n".join(env_notifs)
messages.append({"role": "user", "content": f"Environment notifications updates:\n***\n{content}\n***\n"})
if has_stop:
# Environment signalled simulation end — stop the agent loop
break
See DefaultGaia2Agent source for the canonical single-loop implementation.
API Reference
Gaia2Benchmark
Bases: Benchmark
MASEval wrapper for Gaia2/ARE benchmark.
Hybrid approach: Uses ARE for simulation and evaluation while providing MASEval orchestration, tracing, and agent flexibility.
The ARE simulation runs internally; agents interact purely via tool calls.
Time control happens through SystemApp__wait_for_notification.
Subclasses must implement:
setup_agents()— Create agents for the taskget_model_adapter()— Provide model adapters
Multi-Turn Architecture
GAIA2 uses ARE's two-level loop architecture:
- Outer loop (turns): drains the notification queue, formats user
messages as
[TASK], re-queues environment notifications, then runs the inner step loop. - Inner loop (steps): ReAct cycle. Terminates on
send_message_to_user(TERMINATED — turn complete) orwait_for_notification(PAUSED — outer loop continues).
ARE are_simulation_main.py:agent_loop()
What custom agents must do:
- Terminate inner loop on both
send_message_to_userandwait_for_notification. The former completes a turn; the latter pauses the agent while ARE processes events. - Between turns (outer loop): drain notifications via
environment.get_turn_notifications()which re-queues environment notifications and returns user messages for[TASK]formatting. - Within turns (inner loop pre-step): poll notifications via
environment.poll_notifications()to pick up re-queued environment notifications and new messages.
See the default agent implementation for the reference two-level loop approach.
seed_generator
property
seed_generator: SeedGenerator
The seed generator for this benchmark.
The seed generator is configured at benchmark initialization via the seed
or seed_generator parameters. When seed=None (the default), the generator's
derive_seed() method returns None, effectively disabling seeding while
maintaining a uniform interface.
| RETURNS | DESCRIPTION |
|---|---|
SeedGenerator
|
The root |
usage
property
usage: Usage
Running usage total across all task repetitions.
Queryable at any time, including while the benchmark is still running. Returns the grand total of all usage collected so far.
usage_by_component
property
usage_by_component: Dict[str, Usage]
Per-component running usage totals across all repetitions.
Keys are registry keys (e.g., "models:main_model").
__init__
__init__(
callbacks: Optional[List[BenchmarkCallback]] = None,
n_task_repeats: int = 1,
max_invocations: int = MAX_INVOCATIONS,
num_workers: int = 1,
fail_on_setup_error: bool = False,
fail_on_task_error: bool = False,
fail_on_evaluation_error: bool = False,
progress_bar: bool | str = True,
seed: Optional[int] = None,
seed_generator: Optional[SeedGenerator] = None,
)
Initialize benchmark with Gaia2-specific defaults.
| PARAMETER | DESCRIPTION |
|---|---|
callbacks
|
Optional list of callback handlers for monitoring execution.
TYPE:
|
n_task_repeats
|
Number of times to repeat each task. Default 1.
TYPE:
|
max_invocations
|
Maximum agent invocations (default: 1 for single-turn).
TYPE:
|
num_workers
|
Number of parallel task executions. Default 1 (sequential).
TYPE:
|
fail_on_setup_error
|
If True, raise on setup errors. Default False.
TYPE:
|
fail_on_task_error
|
If True, raise on task execution errors. Default False.
TYPE:
|
fail_on_evaluation_error
|
If True, raise on evaluation errors. Default False.
TYPE:
|
progress_bar
|
Progress display. True (default) for tqdm, "rich" for Rich, or False to disable.
TYPE:
|
seed
|
Global seed for reproducible benchmark runs.
TYPE:
|
seed_generator
|
Custom seed generator (takes precedence over seed).
TYPE:
|
add_callback
add_callback(callback: BenchmarkCallback) -> None
Register a callback handler to monitor benchmark execution.
| PARAMETER | DESCRIPTION |
|---|---|
callback
|
A BenchmarkCallback instance that will receive execution events.
TYPE:
|
How to use
Callbacks receive notifications at key lifecycle points for tracing, progress tracking,
or custom metrics collection. See BenchmarkCallback
for available hooks and their signatures.
from maseval.core.callbacks import MessageTracingCallback
benchmark = MyBenchmark(tasks=tasks, agent_data=config)
benchmark.add_callback(MessageTracingCallback(output_dir="logs"))
results = benchmark.run()
clear_registry
clear_registry() -> None
Clear the component registry after a task repetition completes.
This method is called automatically by run() after each task repetition
to ensure components are not carried over between repetitions. The
reports list persists across all repetitions for aggregated analysis.
collect_all_configs
collect_all_configs() -> Dict[str, Any]
Collect configuration from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition completes
and before evaluation begins. It gathers comprehensive configuration from all registered
components (agents, models, tools, simulators, callbacks, etc.) for that specific
repetition. After collection, the registry is cleared for the next repetition.
The collected configs are stored in benchmark.reports list along with traces
for persistent access across all task repetitions.
Output fields:
metadata- Collection timestamp and thread infoagents- Dict mapping agent names to their config (settings, parameters)models- Dict mapping model names to their config (model IDs, parameters)tools- Dict mapping tool names to their config (specifications, settings)simulators- Dict mapping simulator names to their config (parameters, templates)callbacks- Dict mapping callback names to their config (settings)environment- Direct config from the environment (not nested), orNoneif not presentuser- Direct config from the user simulator (not nested), orNoneif not presentother- Dict for any other registered componentsbenchmark- Benchmark-level configuration (git, system, packages)
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing configuration from all registered components. |
How to use
This method is called automatically by run() after each task repetition:
# Automatic collection (recommended)
results = benchmark.run()
# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
# Agents is a dict: agent_name -> config
print(f"Agent config: {report['config']['agents']['my_agent']}")
# Environment and user are direct (not nested)
print(f"Environment config: {report['config']['environment']}")
print(f"User config: {report['config']['user']}")
# Benchmark-level config
print(f"Git commit: {report['config']['benchmark']['git']['commit_hash']}")
The collected configs are available in the results for reproducibility analysis.
collect_all_traces
collect_all_traces() -> Dict[str, Any]
Collect execution traces from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition completes
and before evaluation begins. It gathers comprehensive traces from all registered
components (agents, models, tools, simulators, callbacks, etc.) for that specific
repetition. After collection, the registry is cleared for the next repetition.
The collected traces are stored in benchmark.reports list along with configs
for persistent access across all task repetitions.
Output fields:
metadata- Collection timestamp and thread infoagents- Dict mapping agent names to their traces (messages, execution data)models- Dict mapping model names to their traces (API calls, timing, errors)tools- Dict mapping tool names to their traces (invocations, parameters)simulators- Dict mapping simulator names to their traces (attempts, outcomes)callbacks- Dict mapping callback names to their traces (custom data)environment- Direct traces from the environment (not nested), orNoneif not presentuser- Direct traces from the user simulator (not nested), orNoneif not presentother- Dict for any other registered components
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing execution traces from all registered components. |
How to use
This method is called automatically by run() after each task repetition:
# Automatic collection (recommended)
results = benchmark.run()
# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
# Agents is a dict: agent_name -> traces
print(f"Agent messages: {report['traces']['agents']['my_agent']}")
# Environment and user are direct (not nested)
print(f"Environment state: {report['traces']['environment']}")
print(f"User interactions: {report['traces']['user']}")
The collected traces are passed to the evaluator's evaluate() method
and stored in benchmark.reports for later analysis.
collect_all_usage
collect_all_usage() -> Dict[str, Any]
Collect usage from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition
completes. It gathers usage from all registered UsageTrackableMixin
components and also accumulates into persistent running totals accessible
via usage and usage_by_component.
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing usage from all registered components. |
evaluate
evaluate(
evaluators: Sequence[Evaluator],
agents: Dict[str, AgentAdapter],
final_answer: Any,
traces: Dict[str, Any],
) -> List[Dict[str, Any]]
Evaluate using Gaia2 evaluators.
Uses each evaluator's filter_traces() method to extract relevant data, then calls the evaluator with the filtered traces.
Returns Gaia2 format
- gsr: Goal Success Rate
- partial_gsr: Partial success rate
- passed: Boolean
- rationale: Judge rationale (if available)
| PARAMETER | DESCRIPTION |
|---|---|
evaluators
|
List of evaluators
TYPE:
|
agents
|
Dict of agents
TYPE:
|
final_answer
|
Final answer from agents
TYPE:
|
traces
|
Execution traces
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[Dict[str, Any]]
|
List of evaluation result dicts |
execution_loop
execution_loop(
agents: Sequence[AgentAdapter],
task: Task,
environment: Environment,
user: Optional[User],
) -> Any
Execute agents with optional user interaction loop.
This method orchestrates the agent-user interaction pattern. When a user is
present, the user initiates the conversation using user.get_initial_query().
If no user is present, task.query is used as the initial query.
Interaction Flow
By default, agents execute once (max_invocations=1). For multi-turn
interaction, set self.max_invocations > 1 in your benchmark's __init__.
The loop continues until max_invocations is reached or user.is_done()
returns True (e.g., max turns reached or stop token detected).
Note
Override this method in your benchmark subclass to implement custom interaction patterns (e.g., agent-initiated conversations, different termination conditions, or specialized query routing).
| PARAMETER | DESCRIPTION |
|---|---|
agents
|
Agents to execute (typically the orchestrator).
TYPE:
|
task
|
The task being solved.
TYPE:
|
environment
|
The environment providing tools and state.
TYPE:
|
user
|
Optional user simulator. If provided, the user initiates and drives
the conversation. If None, a single agent execution with
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
Final answer from the last agent execution. |
Example
For interactive benchmarks, enable multi-turn interaction::
def __init__(self, ...):
super().__init__(...)
self.max_invocations = 5 # Up to 5 agent-user exchanges
get_failed_tasks
get_failed_tasks(
status_filter: Optional[
Union[
TaskExecutionStatus, List[TaskExecutionStatus]
]
] = None,
reports: Optional[List[Dict[str, Any]]] = None,
) -> SequentialTaskQueue
Get tasks that failed during benchmark execution.
This method retrieves failed tasks based on their execution status, useful for debugging, retry logic, or failure analysis.
| PARAMETER | DESCRIPTION |
|---|---|
status_filter
|
Filter by specific failure status(es). If None, returns all failed tasks (any status except SUCCESS). Can be a single TaskExecutionStatus or a list of them. Examples: - TaskExecutionStatus.TASK_EXECUTION_FAILED: Only tasks that failed during execution - TaskExecutionStatus.EVALUATION_FAILED: Only tasks where evaluation failed - [TaskExecutionStatus.TASK_EXECUTION_FAILED, TaskExecutionStatus.SETUP_FAILED]: Tasks that failed during execution or setup
TYPE:
|
reports
|
Optional list of reports to analyze. If None, uses the reports from the last run() call. This allows analyzing externally stored or modified reports.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SequentialTaskQueue
|
SequentialTaskQueue containing the failed tasks. Empty if no failures match the filter. |
| RAISES | DESCRIPTION |
|---|---|
RuntimeError
|
If reports is None and run() has not been executed yet. |
How to use
# Run benchmark
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)
# Get all failed tasks (from internal state)
failed = benchmark.get_failed_tasks()
print(f"Failed: {len(failed)}/{len(benchmark.tasks)} tasks")
# Or work with returned reports (safe from internal state changes)
failed = benchmark.get_failed_tasks(reports=reports)
# Get only tasks that failed during execution (not evaluation)
execution_failures = benchmark.get_failed_tasks(
TaskExecutionStatus.TASK_EXECUTION_FAILED,
reports=reports
)
# Get setup and execution failures
critical_failures = benchmark.get_failed_tasks(
status_filter=[
TaskExecutionStatus.SETUP_FAILED,
TaskExecutionStatus.TASK_EXECUTION_FAILED
],
reports=reports
)
# Retry failed tasks elegantly - this is the key use case!
if len(failed) > 0:
retry_reports = benchmark.run(tasks=failed)
# Or more concisely
reports = benchmark.run(tasks=tasks)
retry_reports = benchmark.run(tasks=benchmark.get_failed_tasks())
get_model_adapter
abstractmethod
get_model_adapter(
model_id: str, **kwargs: Any
) -> ModelAdapter
Provide a ModelAdapter for benchmark components that require LLM access.
Many benchmark components beyond the agents themselves require access to language models. Common examples include:
- Tool simulators: Simulating tool responses when real APIs aren't available
- User simulators: Generating realistic user responses in multi-turn dialogues
- Judges/Evaluators: Using LLMs to assess agent performance against criteria
- Reward models: Computing scores for reinforcement learning
This method centralizes model provisioning, giving you control over which models are used throughout the benchmark. Implement this to return a configured ModelAdapter for the requested model.
| PARAMETER | DESCRIPTION |
|---|---|
model_id
|
The model identifier to use (e.g., "gemini-2.5-flash", "openrouter/google/gemini-2.5-flash", "gpt-4o"). This is passed by the benchmark when setting up components that need model access.
TYPE:
|
**kwargs
|
Additional arguments for adapter creation or registration. Common kwargs: - register_category: Category for trace registration (e.g., "models") - register_name: Name for trace registration (e.g., "evaluator_user_gsr")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ModelAdapter
|
A ModelAdapter instance configured for the specified model. For proper tracing, |
ModelAdapter
|
return a fresh adapter for each call rather than reusing instances. You can |
ModelAdapter
|
still share the underlying API client for efficiency. |
How to use
For proper tracing, register the adapter after creation using the kwargs:
def get_model_adapter(self, model_id: str, **kwargs: Any) -> ModelAdapter:
adapter = GoogleGenAIModelAdapter(self.client, model_id=model_id)
# Register for tracing if registration info provided
category = kwargs.get("register_category", "models")
name = kwargs.get("register_name", model_id)
self.register(category, name, adapter)
return adapter
The benchmark calls this method when setting up tools, user simulators, and evaluators. Each call creates a fresh adapter with its own trace log.
register
register(
category: str,
name: str,
component: RegisterableComponent,
) -> RegisterableComponent
Register a component for comprehensive trace and configuration collection.
All core MASEval components (AgentAdapter, ModelAdapter, Environment, User, LLMSimulator, BenchmarkCallback) inherit from TraceableMixin and/or ConfigurableMixin, and are automatically registered for both trace and configuration collection before evaluation.
Note: Most components are automatically registered when returned from
setup methods (setup_environment, setup_user, setup_agents). You only
need to manually register additional components like models, simulators, or
tools that aren't automatically captured.
| PARAMETER | DESCRIPTION |
|---|---|
category
|
Component category (e.g., "agents", "models", "tools", "simulators", "callbacks", "user", "environment", "seeding"). Use plural form to match the structure in collect_all_traces() and collect_all_configs().
TYPE:
|
name
|
Unique identifier for this component within its category
TYPE:
|
component
|
Any object inheriting from TraceableMixin and/or ConfigurableMixin
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RegisterableComponent
|
The component (for chaining convenience) |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the component is already registered under a different name |
How to use
Most components are auto-registered. Manual registration is only needed for additional components:
def setup_agents(self, agent_data, environment, task, user):
# Create model (needs manual registration)
model = MyModelAdapter(...)
self.register("models", "main_model", model)
# Create agent (auto-registered when returned)
agent = MyAgent(model=model)
agent_adapter = AgentAdapter(agent, "agent1")
# Environment and user are also auto-registered
return [agent_adapter], {"agent1": agent_adapter}
Traces and configs are automatically collected before evaluation via
collect_all_traces() and collect_all_configs() which are called
internally by the run() method.
run
run(
tasks: Union[
Task, BaseTaskQueue, Iterable[Union[Task, dict]]
],
agent_data: Dict[str, Any] | Iterable[Dict[str, Any]],
) -> List[Dict[str, Any]]
Initialize and execute the complete benchmark loop across all tasks.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
Task source for execution. Can be: - A single Task object - A BaseTaskQueue (SequentialTaskQueue, PriorityTaskQueue, or custom AdaptiveTaskQueue) - An iterable of Task objects or dicts that will be converted to Tasks When a BaseTaskQueue is provided, it controls the task ordering. AdaptiveTaskQueue subclasses are automatically registered as callbacks to receive task completion notifications.
TYPE:
|
agent_data
|
Configuration for agents. Either a single dict applied to all tasks, or an iterable of dicts with one configuration per task. Agent data typically includes model parameters, agent architecture details, and tool specifications.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[Dict[str, Any]]
|
List of report dictionaries, one per task repetition. Each report contains: |
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If agent_data length doesn't match number of tasks (when agent_data is an iterable). |
How to use
This is the framework's main orchestration method that runs your entire benchmark. It iterates through all tasks, handles repetitions, and manages the three-stage lifecycle for each execution. You don't implement this method—instead, you call it to start the benchmark after implementing the setup and execution methods.
By default, the benchmark will continue executing remaining tasks even if some fail.
You can change this behavior by setting fail_on_task_error=True,
fail_on_evaluation_error=True, or fail_on_setup_error=True when instantiating
the benchmark. Each task execution returns a status indicating success or the specific
failure type (see TaskExecutionStatus).
For each task execution, the framework:
- Calls your setup methods to initialize components
- Calls your
run_agents()method to execute the task - Collects message histories and calls evaluators
- Stores results and triggers callbacks
Pseudocode structure:
for task in tasks:
for repeat in range(n_task_repeats):
# Setup stage
environment = setup_environment(agent_data, task)
user = setup_user(agent_data, environment, task)
agents_to_run, agents_dict = setup_agents(agent_data, environment, task, user)
evaluators = setup_evaluators(environment, task, agents_to_run, user)
# Run stage (execution_loop handles multi-turn if user exists)
agents_output = execution_loop(agents_to_run, task, environment, user)
# Evaluate stage
traces = collect_message_histories(agents_dict)
eval_results = evaluate(evaluators, traces, agents_dict)
# Store results
store_result(task_id, traces, eval_results)
Callback hooks are triggered at these points:
- on_run_start: Before processing any tasks
- on_task_start: Before processing a task (once per task, not per repeat)
- on_task_repeat_start: Before each repetition of a task
- on_task_repeat_end: After each repetition completes
- on_task_end: After all repetitions of a task complete
- on_run_end: After all tasks complete
# Typical usage
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)
# Analyze results
for report in reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}: {report['eval']}")
print(f"Config: {report['config']}")
print(f"Traces: {report['traces']}")
# Parallel execution with 4 workers
benchmark = MyBenchmark(num_workers=4)
reports = benchmark.run(tasks=tasks, agent_data=config)
# Single agent config for all tasks
reports = benchmark.run(tasks=tasks, agent_data={"model": "gpt-4"})
# Task-specific agent configs (must match task count)
reports = benchmark.run(
tasks=tasks,
agent_data=[
{"model": "gpt-4", "difficulty": "easy"},
{"model": "gpt-4", "difficulty": "hard"},
]
)
# Priority-based execution
from maseval.core.task import PriorityTaskQueue
for task in tasks:
task.protocol.priority = compute_priority(task)
queue = PriorityTaskQueue(tasks)
reports = benchmark.run(tasks=queue, agent_data=config)
# Adaptive queue (auto-registered as callback)
queue = MyAdaptiveTaskQueue(tasks)
reports = benchmark.run(tasks=queue) # queue receives on_task_complete callbacks
run_agents
run_agents(
agents: Sequence[AgentAdapter],
task: Task,
environment: Gaia2Environment,
query: str = "",
) -> Any
Execute agents and ensure environment cleanup.
| PARAMETER | DESCRIPTION |
|---|---|
agents
|
Agent instances to run
TYPE:
|
task
|
Current task
TYPE:
|
environment
|
Gaia2Environment
TYPE:
|
query
|
Query/prompt for agents
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
Final answer from agents |
setup_agents
abstractmethod
setup_agents(
agent_data: Dict[str, Any],
environment: Gaia2Environment,
task: Task,
user: Optional[User],
seed_generator: SeedGenerator,
) -> Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]
Create agents for this task. Must be implemented by subclass.
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Agent configuration
TYPE:
|
environment
|
Gaia2Environment with ARE tools
TYPE:
|
task
|
Current task
TYPE:
|
user
|
Optional user simulator (always None for Gaia2)
TYPE:
|
seed_generator
|
Seed generator for reproducibility
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]
|
Tuple of (ordered agent list, agent dict keyed by ID) |
setup_environment
setup_environment(
agent_data: Dict[str, Any],
task: Task,
seed_generator: SeedGenerator,
) -> Gaia2Environment
Create Gaia2 environment wrapping ARE simulation.
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Agent configuration
TYPE:
|
task
|
Current task
TYPE:
|
seed_generator
|
Seed generator for reproducibility
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Gaia2Environment
|
Gaia2Environment instance |
setup_evaluators
setup_evaluators(
environment: Gaia2Environment,
task: Task,
agents: Sequence[AgentAdapter],
user: Optional[User],
seed_generator: SeedGenerator,
) -> Sequence[Evaluator]
Create Gaia2 evaluator using ARE's judge.
| PARAMETER | DESCRIPTION |
|---|---|
environment
|
Gaia2Environment instance
TYPE:
|
task
|
Current task with evaluation data
TYPE:
|
agents
|
Agent instances
TYPE:
|
user
|
Optional user simulator (always None)
TYPE:
|
seed_generator
|
Seed generator for reproducibility
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Sequence[Evaluator]
|
List with single Gaia2Evaluator instance |
setup_user
setup_user(
agent_data: Dict[str, Any],
environment: Gaia2Environment,
task: Task,
seed_generator: SeedGenerator,
) -> Optional[User]
Gaia2 uses event-based simulation, not turn-based user simulation.
User interactions in Gaia2 happen through scheduled events (e.g., "user sends message at t=30s") rather than synchronous turn-taking.
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Agent configuration
TYPE:
|
environment
|
Gaia2Environment instance
TYPE:
|
task
|
Current task
TYPE:
|
seed_generator
|
Seed generator for reproducibility
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Optional[User]
|
None (no user simulator needed) |
Gaia2Environment
Bases: Environment
MASEval Environment wrapping ARE's simulation.
The ARE simulation runs its own internal event loop. Agent interaction happens purely through tool calls - including time control via SystemApp.wait_for_notification(). No special execution loop needed.
Exposes all ARE app tools (Calendar, Email, Messaging, Contacts, Shopping, Cab, City, FileSystem, Browser, ChatsApp, SystemApp, Timer) to agents.
Key Features
- Wraps ARE's simulation environment
- Provides MASEval-compatible tool wrappers with tracing
- Exposes simulation time for temporal reasoning tasks
- Handles proper cleanup of ARE resources
__init__
__init__(
task_data: Dict[str, Any],
callbacks: Optional[List[Any]] = None,
judge_engine_config: Optional[Any] = None,
)
Initialize Gaia2 environment.
| PARAMETER | DESCRIPTION |
|---|---|
task_data
|
Task data containing: - scenario: ARE BenchmarkScenario object - capability: Capability type (execution, search, etc.) - universe_id: Universe identifier
TYPE:
|
callbacks
|
Optional callbacks
TYPE:
|
judge_engine_config
|
Optional :class:
TYPE:
|
cleanup
cleanup() -> None
Stop ARE simulation when task completes.
Ensures proper resource cleanup and stops any running simulation.
create_tools
create_tools() -> Dict[str, Gaia2GenericTool]
Wrap ARE app tools for MASEval tracing.
Creates framework-agnostic Gaia2GenericTool instances that provide clean API with built-in tracing.
Filters out AgentUserInterface message-retrieval tools that ARE removes
in remove_aui_irrelevant_tools(), and sets wait_for_user_response
to False so the AUI does not block waiting for a response when the
agent sends a message. User messages are delivered via the notification
system instead.
ARE agents/default_agent/are_simulation_main.py:206-228
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Gaia2GenericTool]
|
Dict mapping tool names to Gaia2GenericTool instances |
gather_config
gather_config() -> Dict[str, Any]
Gather environment configuration.
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Configuration dictionary |
gather_traces
gather_traces() -> Dict[str, Any]
Collect traces from environment and all tools.
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Trace dictionary with scenario info and all tool traces |
get_are_environment
get_are_environment() -> Any
Get the underlying ARE Environment.
Used by the evaluator to access completed events and judge.
| RETURNS | DESCRIPTION |
|---|---|
Any
|
ARE Environment instance |
get_notification_system
get_notification_system() -> Any
Get the ARE notification system.
Used by agents that need to poll for messages between iterations, matching ARE's pre-step notification polling behavior.
| RETURNS | DESCRIPTION |
|---|---|
Any
|
ARE NotificationSystem instance, or None if not available |
get_scenario
get_scenario() -> Any
Get the ARE scenario object.
| RETURNS | DESCRIPTION |
|---|---|
Any
|
ARE BenchmarkScenario object |
get_simulation_time
get_simulation_time() -> float
Get current simulation time in seconds.
| RETURNS | DESCRIPTION |
|---|---|
float
|
Current simulation time in seconds since scenario start |
get_start_time
get_start_time() -> Optional[float]
Get the scenario start time.
| RETURNS | DESCRIPTION |
|---|---|
Optional[float]
|
Start time as Unix timestamp, or None if not available |
get_tool
get_tool(name: str) -> Optional[Any]
Get a tool by name.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Tool name
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Optional[Any]
|
The tool, or None if not found |
get_tools
get_tools() -> Dict[str, Any]
Get all tools as a dict.
get_turn_notifications
get_turn_notifications() -> Tuple[List[str], bool, bool]
Get notifications for turn transitions, re-queuing env notifications.
Matches ARE's get_notifications() in are_simulation_main.py:331-359:
drains the notification queue, separates by type, re-queues environment
notifications (so the inner loop's pre-step picks them up), and returns
user messages and status flags.
| RETURNS | DESCRIPTION |
|---|---|
List[str]
|
Tuple of |
bool
|
|
bool
|
|
Tuple[List[str], bool, bool]
|
|
pause
pause() -> None
Pause the ARE simulation environment.
Stops time progression during LLM generation, matching ARE's simulated generation time behavior. ARE simulation/environment.py:262-272
No-op if environment is not available or not running.
poll_notifications
poll_notifications() -> Tuple[List[str], List[str], bool]
Poll pending notifications from the ARE notification system.
Drains all pending messages from the notification queue and returns
them as pre-formatted strings. Call this between agent steps to
receive messages that arrived during wait_for_notification() or
from background simulation events.
GAIA2 uses an event-driven multi-turn architecture. When the agent
calls SystemApp__wait_for_notification, the ARE environment
processes scheduled events, advances simulation time, and queues
notifications. After the tool returns, call this method to retrieve
those notifications and inject them into the agent's context before
the next LLM call.
ARE agents/default_agent/steps/are_simulation.py:26-62
| RETURNS | DESCRIPTION |
|---|---|
List[str]
|
Tuple of |
List[str]
|
|
bool
|
strings ready to inject into agent context. |
Tuple[List[str], List[str], bool]
|
is True when the environment has signalled the simulation is over. |
resume_with_offset
resume_with_offset(offset: float) -> None
Resume the ARE simulation environment with a time offset.
Advances simulation time by the given offset and resumes the event loop. ARE simulation/environment.py:286-298
| PARAMETER | DESCRIPTION |
|---|---|
offset
|
Time in seconds to advance the simulation clock
TYPE:
|
setup_state
setup_state(task_data: Dict[str, Any]) -> Dict[str, Any]
Initialize ARE scenario and start simulation.
Delegates to ARE's preprocess_scenario() for faithful preprocessing:
- Ensure SystemApp is present.
- Set scenario duration from ARE defaults (1800s standard, 420s for Time).
- Initialize the scenario (populates apps, events).
- Run oracle mode to generate expected event log.
- Soft-reset so app state is clean for agent run.
- Create judge and initialize turns with trigger conditions.
- Start the agent-mode simulation.
| PARAMETER | DESCRIPTION |
|---|---|
task_data
|
Task data with scenario, capability, universe_id
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
State dictionary with scenario metadata |
Gaia2Evaluator
Bases: Evaluator
Evaluates Gaia2 scenarios using ARE's judge system.
Uses ARE's GraphPerEventJudge which combines deterministic hard checks
(exact value matching) with LLM-based soft checks (semantic comparison of
content like email bodies and calendar descriptions).
The evaluator compares completed events in the simulation against oracle (expected) events to compute Goal Success Rate (GSR).
__call__
__call__(
traces: Dict[str, Any],
final_answer: Optional[str] = None,
) -> Dict[str, Any]
Evaluate using ARE's judge system.
Uses the judge created during preprocess_scenario() (attached to the
scenario object) rather than creating a new one. This ensures turn
initialization and judge state are consistent.
Exceptions return gsr=None (excluded from scoring), matching ARE's
behavior where exceptions/no_validation get score=None.
ARE benchmark/hf_upload_utils.py:33-52, benchmark/report_stats.py
| PARAMETER | DESCRIPTION |
|---|---|
traces
|
Filtered execution traces
TYPE:
|
final_answer
|
Final answer from agent (not used in Gaia2)
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dict with evaluation results. |
Dict[str, Any]
|
(excluded from scoring) or a float for valid results. |
__init__
__init__(
task: Task,
environment: Gaia2Environment,
user: Optional[Any] = None,
use_llm_judge: bool = False,
model: Optional[Any] = None,
)
Initialize the evaluator.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
Task being evaluated
TYPE:
|
environment
|
Gaia2Environment instance
TYPE:
|
user
|
Optional user simulator (not used in Gaia2)
TYPE:
|
use_llm_judge
|
Whether to use LLM-based judge
TYPE:
|
model
|
Optional ModelAdapter for LLM-based evaluation
TYPE:
|
filter_traces
filter_traces(traces: Dict[str, Any]) -> Dict[str, Any]
Extract tool invocations and environment state for evaluation.
| PARAMETER | DESCRIPTION |
|---|---|
traces
|
Full execution traces
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dict with: - tool_invocations: List of all tool calls with timing - simulation_time: Final simulation time - scenario_id: For correlation |
DefaultAgentGaia2Benchmark
Bases: Gaia2Benchmark
Gaia2 benchmark with default agent implementation.
Provides a ready-to-use benchmark matching ARE's reference agent behavior. Uses text-based ReAct format with JSON actions, matching ARE's implementation.
Default parameters (matching ARE): - max_iterations: 80 - temperature: 0.5 - max_tokens: 16384 - invalid_format_retries: 10
Example
from maseval.benchmark.gaia2 import DefaultAgentGaia2Benchmark, load_tasks
tasks = load_tasks(capability="execution", limit=5)
benchmark = DefaultAgentGaia2Benchmark( agent_data={"model_id": "gpt-4o"}, ) results = benchmark.run(tasks)
seed_generator
property
seed_generator: SeedGenerator
The seed generator for this benchmark.
The seed generator is configured at benchmark initialization via the seed
or seed_generator parameters. When seed=None (the default), the generator's
derive_seed() method returns None, effectively disabling seeding while
maintaining a uniform interface.
| RETURNS | DESCRIPTION |
|---|---|
SeedGenerator
|
The root |
usage
property
usage: Usage
Running usage total across all task repetitions.
Queryable at any time, including while the benchmark is still running. Returns the grand total of all usage collected so far.
usage_by_component
property
usage_by_component: Dict[str, Usage]
Per-component running usage totals across all repetitions.
Keys are registry keys (e.g., "models:main_model").
__init__
__init__(
agent_data: Optional[Dict[str, Any]] = None,
**kwargs: Any,
)
Initialize benchmark.
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Agent configuration with:
- model_id: Required model identifier
- llm_args: Optional model call arguments (temperature, max_tokens, etc.)
- max_iterations: Max iterations per task (default: 80)
- invalid_format_retries: Max retries for invalid format (default: 10)
- simulated_generation_time_config: Optional
TYPE:
|
**kwargs
|
Additional Benchmark arguments
TYPE:
|
add_callback
add_callback(callback: BenchmarkCallback) -> None
Register a callback handler to monitor benchmark execution.
| PARAMETER | DESCRIPTION |
|---|---|
callback
|
A BenchmarkCallback instance that will receive execution events.
TYPE:
|
How to use
Callbacks receive notifications at key lifecycle points for tracing, progress tracking,
or custom metrics collection. See BenchmarkCallback
for available hooks and their signatures.
from maseval.core.callbacks import MessageTracingCallback
benchmark = MyBenchmark(tasks=tasks, agent_data=config)
benchmark.add_callback(MessageTracingCallback(output_dir="logs"))
results = benchmark.run()
clear_registry
clear_registry() -> None
Clear the component registry after a task repetition completes.
This method is called automatically by run() after each task repetition
to ensure components are not carried over between repetitions. The
reports list persists across all repetitions for aggregated analysis.
collect_all_configs
collect_all_configs() -> Dict[str, Any]
Collect configuration from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition completes
and before evaluation begins. It gathers comprehensive configuration from all registered
components (agents, models, tools, simulators, callbacks, etc.) for that specific
repetition. After collection, the registry is cleared for the next repetition.
The collected configs are stored in benchmark.reports list along with traces
for persistent access across all task repetitions.
Output fields:
metadata- Collection timestamp and thread infoagents- Dict mapping agent names to their config (settings, parameters)models- Dict mapping model names to their config (model IDs, parameters)tools- Dict mapping tool names to their config (specifications, settings)simulators- Dict mapping simulator names to their config (parameters, templates)callbacks- Dict mapping callback names to their config (settings)environment- Direct config from the environment (not nested), orNoneif not presentuser- Direct config from the user simulator (not nested), orNoneif not presentother- Dict for any other registered componentsbenchmark- Benchmark-level configuration (git, system, packages)
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing configuration from all registered components. |
How to use
This method is called automatically by run() after each task repetition:
# Automatic collection (recommended)
results = benchmark.run()
# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
# Agents is a dict: agent_name -> config
print(f"Agent config: {report['config']['agents']['my_agent']}")
# Environment and user are direct (not nested)
print(f"Environment config: {report['config']['environment']}")
print(f"User config: {report['config']['user']}")
# Benchmark-level config
print(f"Git commit: {report['config']['benchmark']['git']['commit_hash']}")
The collected configs are available in the results for reproducibility analysis.
collect_all_traces
collect_all_traces() -> Dict[str, Any]
Collect execution traces from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition completes
and before evaluation begins. It gathers comprehensive traces from all registered
components (agents, models, tools, simulators, callbacks, etc.) for that specific
repetition. After collection, the registry is cleared for the next repetition.
The collected traces are stored in benchmark.reports list along with configs
for persistent access across all task repetitions.
Output fields:
metadata- Collection timestamp and thread infoagents- Dict mapping agent names to their traces (messages, execution data)models- Dict mapping model names to their traces (API calls, timing, errors)tools- Dict mapping tool names to their traces (invocations, parameters)simulators- Dict mapping simulator names to their traces (attempts, outcomes)callbacks- Dict mapping callback names to their traces (custom data)environment- Direct traces from the environment (not nested), orNoneif not presentuser- Direct traces from the user simulator (not nested), orNoneif not presentother- Dict for any other registered components
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing execution traces from all registered components. |
How to use
This method is called automatically by run() after each task repetition:
# Automatic collection (recommended)
results = benchmark.run()
# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
# Agents is a dict: agent_name -> traces
print(f"Agent messages: {report['traces']['agents']['my_agent']}")
# Environment and user are direct (not nested)
print(f"Environment state: {report['traces']['environment']}")
print(f"User interactions: {report['traces']['user']}")
The collected traces are passed to the evaluator's evaluate() method
and stored in benchmark.reports for later analysis.
collect_all_usage
collect_all_usage() -> Dict[str, Any]
Collect usage from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition
completes. It gathers usage from all registered UsageTrackableMixin
components and also accumulates into persistent running totals accessible
via usage and usage_by_component.
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing usage from all registered components. |
evaluate
evaluate(
evaluators: Sequence[Evaluator],
agents: Dict[str, AgentAdapter],
final_answer: Any,
traces: Dict[str, Any],
) -> List[Dict[str, Any]]
Evaluate using Gaia2 evaluators.
Uses each evaluator's filter_traces() method to extract relevant data, then calls the evaluator with the filtered traces.
Returns Gaia2 format
- gsr: Goal Success Rate
- partial_gsr: Partial success rate
- passed: Boolean
- rationale: Judge rationale (if available)
| PARAMETER | DESCRIPTION |
|---|---|
evaluators
|
List of evaluators
TYPE:
|
agents
|
Dict of agents
TYPE:
|
final_answer
|
Final answer from agents
TYPE:
|
traces
|
Execution traces
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[Dict[str, Any]]
|
List of evaluation result dicts |
execution_loop
execution_loop(
agents: Sequence[AgentAdapter],
task: Task,
environment: Environment,
user: Optional[User],
) -> Any
Execute agents with optional user interaction loop.
This method orchestrates the agent-user interaction pattern. When a user is
present, the user initiates the conversation using user.get_initial_query().
If no user is present, task.query is used as the initial query.
Interaction Flow
By default, agents execute once (max_invocations=1). For multi-turn
interaction, set self.max_invocations > 1 in your benchmark's __init__.
The loop continues until max_invocations is reached or user.is_done()
returns True (e.g., max turns reached or stop token detected).
Note
Override this method in your benchmark subclass to implement custom interaction patterns (e.g., agent-initiated conversations, different termination conditions, or specialized query routing).
| PARAMETER | DESCRIPTION |
|---|---|
agents
|
Agents to execute (typically the orchestrator).
TYPE:
|
task
|
The task being solved.
TYPE:
|
environment
|
The environment providing tools and state.
TYPE:
|
user
|
Optional user simulator. If provided, the user initiates and drives
the conversation. If None, a single agent execution with
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
Final answer from the last agent execution. |
Example
For interactive benchmarks, enable multi-turn interaction::
def __init__(self, ...):
super().__init__(...)
self.max_invocations = 5 # Up to 5 agent-user exchanges
get_failed_tasks
get_failed_tasks(
status_filter: Optional[
Union[
TaskExecutionStatus, List[TaskExecutionStatus]
]
] = None,
reports: Optional[List[Dict[str, Any]]] = None,
) -> SequentialTaskQueue
Get tasks that failed during benchmark execution.
This method retrieves failed tasks based on their execution status, useful for debugging, retry logic, or failure analysis.
| PARAMETER | DESCRIPTION |
|---|---|
status_filter
|
Filter by specific failure status(es). If None, returns all failed tasks (any status except SUCCESS). Can be a single TaskExecutionStatus or a list of them. Examples: - TaskExecutionStatus.TASK_EXECUTION_FAILED: Only tasks that failed during execution - TaskExecutionStatus.EVALUATION_FAILED: Only tasks where evaluation failed - [TaskExecutionStatus.TASK_EXECUTION_FAILED, TaskExecutionStatus.SETUP_FAILED]: Tasks that failed during execution or setup
TYPE:
|
reports
|
Optional list of reports to analyze. If None, uses the reports from the last run() call. This allows analyzing externally stored or modified reports.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SequentialTaskQueue
|
SequentialTaskQueue containing the failed tasks. Empty if no failures match the filter. |
| RAISES | DESCRIPTION |
|---|---|
RuntimeError
|
If reports is None and run() has not been executed yet. |
How to use
# Run benchmark
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)
# Get all failed tasks (from internal state)
failed = benchmark.get_failed_tasks()
print(f"Failed: {len(failed)}/{len(benchmark.tasks)} tasks")
# Or work with returned reports (safe from internal state changes)
failed = benchmark.get_failed_tasks(reports=reports)
# Get only tasks that failed during execution (not evaluation)
execution_failures = benchmark.get_failed_tasks(
TaskExecutionStatus.TASK_EXECUTION_FAILED,
reports=reports
)
# Get setup and execution failures
critical_failures = benchmark.get_failed_tasks(
status_filter=[
TaskExecutionStatus.SETUP_FAILED,
TaskExecutionStatus.TASK_EXECUTION_FAILED
],
reports=reports
)
# Retry failed tasks elegantly - this is the key use case!
if len(failed) > 0:
retry_reports = benchmark.run(tasks=failed)
# Or more concisely
reports = benchmark.run(tasks=tasks)
retry_reports = benchmark.run(tasks=benchmark.get_failed_tasks())
get_model_adapter
abstractmethod
get_model_adapter(
model_id: str, **kwargs: Any
) -> ModelAdapter
Get or create model adapter. Must be implemented by subclass.
| PARAMETER | DESCRIPTION |
|---|---|
model_id
|
Model identifier
TYPE:
|
**kwargs
|
Additional arguments (e.g., register_name)
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ModelAdapter
|
ModelAdapter instance |
register
register(
category: str,
name: str,
component: RegisterableComponent,
) -> RegisterableComponent
Register a component for comprehensive trace and configuration collection.
All core MASEval components (AgentAdapter, ModelAdapter, Environment, User, LLMSimulator, BenchmarkCallback) inherit from TraceableMixin and/or ConfigurableMixin, and are automatically registered for both trace and configuration collection before evaluation.
Note: Most components are automatically registered when returned from
setup methods (setup_environment, setup_user, setup_agents). You only
need to manually register additional components like models, simulators, or
tools that aren't automatically captured.
| PARAMETER | DESCRIPTION |
|---|---|
category
|
Component category (e.g., "agents", "models", "tools", "simulators", "callbacks", "user", "environment", "seeding"). Use plural form to match the structure in collect_all_traces() and collect_all_configs().
TYPE:
|
name
|
Unique identifier for this component within its category
TYPE:
|
component
|
Any object inheriting from TraceableMixin and/or ConfigurableMixin
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RegisterableComponent
|
The component (for chaining convenience) |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the component is already registered under a different name |
How to use
Most components are auto-registered. Manual registration is only needed for additional components:
def setup_agents(self, agent_data, environment, task, user):
# Create model (needs manual registration)
model = MyModelAdapter(...)
self.register("models", "main_model", model)
# Create agent (auto-registered when returned)
agent = MyAgent(model=model)
agent_adapter = AgentAdapter(agent, "agent1")
# Environment and user are also auto-registered
return [agent_adapter], {"agent1": agent_adapter}
Traces and configs are automatically collected before evaluation via
collect_all_traces() and collect_all_configs() which are called
internally by the run() method.
run
run(
tasks: Union[
Task, BaseTaskQueue, Iterable[Union[Task, dict]]
],
agent_data: Dict[str, Any] | Iterable[Dict[str, Any]],
) -> List[Dict[str, Any]]
Initialize and execute the complete benchmark loop across all tasks.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
Task source for execution. Can be: - A single Task object - A BaseTaskQueue (SequentialTaskQueue, PriorityTaskQueue, or custom AdaptiveTaskQueue) - An iterable of Task objects or dicts that will be converted to Tasks When a BaseTaskQueue is provided, it controls the task ordering. AdaptiveTaskQueue subclasses are automatically registered as callbacks to receive task completion notifications.
TYPE:
|
agent_data
|
Configuration for agents. Either a single dict applied to all tasks, or an iterable of dicts with one configuration per task. Agent data typically includes model parameters, agent architecture details, and tool specifications.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[Dict[str, Any]]
|
List of report dictionaries, one per task repetition. Each report contains: |
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If agent_data length doesn't match number of tasks (when agent_data is an iterable). |
How to use
This is the framework's main orchestration method that runs your entire benchmark. It iterates through all tasks, handles repetitions, and manages the three-stage lifecycle for each execution. You don't implement this method—instead, you call it to start the benchmark after implementing the setup and execution methods.
By default, the benchmark will continue executing remaining tasks even if some fail.
You can change this behavior by setting fail_on_task_error=True,
fail_on_evaluation_error=True, or fail_on_setup_error=True when instantiating
the benchmark. Each task execution returns a status indicating success or the specific
failure type (see TaskExecutionStatus).
For each task execution, the framework:
- Calls your setup methods to initialize components
- Calls your
run_agents()method to execute the task - Collects message histories and calls evaluators
- Stores results and triggers callbacks
Pseudocode structure:
for task in tasks:
for repeat in range(n_task_repeats):
# Setup stage
environment = setup_environment(agent_data, task)
user = setup_user(agent_data, environment, task)
agents_to_run, agents_dict = setup_agents(agent_data, environment, task, user)
evaluators = setup_evaluators(environment, task, agents_to_run, user)
# Run stage (execution_loop handles multi-turn if user exists)
agents_output = execution_loop(agents_to_run, task, environment, user)
# Evaluate stage
traces = collect_message_histories(agents_dict)
eval_results = evaluate(evaluators, traces, agents_dict)
# Store results
store_result(task_id, traces, eval_results)
Callback hooks are triggered at these points:
- on_run_start: Before processing any tasks
- on_task_start: Before processing a task (once per task, not per repeat)
- on_task_repeat_start: Before each repetition of a task
- on_task_repeat_end: After each repetition completes
- on_task_end: After all repetitions of a task complete
- on_run_end: After all tasks complete
# Typical usage
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)
# Analyze results
for report in reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}: {report['eval']}")
print(f"Config: {report['config']}")
print(f"Traces: {report['traces']}")
# Parallel execution with 4 workers
benchmark = MyBenchmark(num_workers=4)
reports = benchmark.run(tasks=tasks, agent_data=config)
# Single agent config for all tasks
reports = benchmark.run(tasks=tasks, agent_data={"model": "gpt-4"})
# Task-specific agent configs (must match task count)
reports = benchmark.run(
tasks=tasks,
agent_data=[
{"model": "gpt-4", "difficulty": "easy"},
{"model": "gpt-4", "difficulty": "hard"},
]
)
# Priority-based execution
from maseval.core.task import PriorityTaskQueue
for task in tasks:
task.protocol.priority = compute_priority(task)
queue = PriorityTaskQueue(tasks)
reports = benchmark.run(tasks=queue, agent_data=config)
# Adaptive queue (auto-registered as callback)
queue = MyAdaptiveTaskQueue(tasks)
reports = benchmark.run(tasks=queue) # queue receives on_task_complete callbacks
run_agents
run_agents(
agents: Sequence[AgentAdapter],
task: Task,
environment: Gaia2Environment,
query: str = "",
) -> Any
Execute agents and ensure environment cleanup.
| PARAMETER | DESCRIPTION |
|---|---|
agents
|
Agent instances to run
TYPE:
|
task
|
Current task
TYPE:
|
environment
|
Gaia2Environment
TYPE:
|
query
|
Query/prompt for agents
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
Final answer from agents |
setup_agents
setup_agents(
agent_data: Dict[str, Any],
environment: Gaia2Environment,
task: Task,
user: Optional[User],
seed_generator: SeedGenerator,
) -> Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]
Create default Gaia2 agent.
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Agent configuration
TYPE:
|
environment
|
Gaia2Environment with ARE tools
TYPE:
|
task
|
Current task
TYPE:
|
user
|
Optional user (always None)
TYPE:
|
seed_generator
|
Seed generator for reproducibility
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]
|
Tuple of (agent list, agent dict) |
setup_environment
setup_environment(
agent_data: Dict[str, Any],
task: Task,
seed_generator: SeedGenerator,
) -> Gaia2Environment
Create Gaia2 environment wrapping ARE simulation.
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Agent configuration
TYPE:
|
task
|
Current task
TYPE:
|
seed_generator
|
Seed generator for reproducibility
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Gaia2Environment
|
Gaia2Environment instance |
setup_evaluators
setup_evaluators(
environment: Gaia2Environment,
task: Task,
agents: Sequence[AgentAdapter],
user: Optional[User],
seed_generator: SeedGenerator,
) -> Sequence[Evaluator]
Create Gaia2 evaluator using ARE's judge.
| PARAMETER | DESCRIPTION |
|---|---|
environment
|
Gaia2Environment instance
TYPE:
|
task
|
Current task with evaluation data
TYPE:
|
agents
|
Agent instances
TYPE:
|
user
|
Optional user simulator (always None)
TYPE:
|
seed_generator
|
Seed generator for reproducibility
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Sequence[Evaluator]
|
List with single Gaia2Evaluator instance |
setup_user
setup_user(
agent_data: Dict[str, Any],
environment: Gaia2Environment,
task: Task,
seed_generator: SeedGenerator,
) -> Optional[User]
Gaia2 uses event-based simulation, not turn-based user simulation.
User interactions in Gaia2 happen through scheduled events (e.g., "user sends message at t=30s") rather than synchronous turn-taking.
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Agent configuration
TYPE:
|
environment
|
Gaia2Environment instance
TYPE:
|
task
|
Current task
TYPE:
|
seed_generator
|
Seed generator for reproducibility
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Optional[User]
|
None (no user simulator needed) |
DefaultGaia2Agent
Default agent implementation for Gaia2 benchmark.
ReAct-style agent matching ARE's reference implementation. Uses text-based action parsing (Thought/Action/Observation cycle) rather than native function calling.
Uses ARE's two-level loop architecture:
- Outer loop (
_turn_loop): iterates over turns, matchingare_simulation_main.py:agent_loop(). Between turns, drains the notification queue, formats user messages as[TASK], re-queues environment notifications for the inner loop's pre-step. - Inner loop (
_step_loop): iterates over steps within a turn, matchingbase_agent.py:execute_agent_loop(). Terminates on BOTHsend_message_to_user(TERMINATED) andwait_for_notification(PAUSED).
Key characteristics matching ARE (base_agent.py, are_simulation.py):
- Text-based JSON action format with
<end_action>token - Stop sequences:
["<end_action>", "Observation:"] - Default temperature: 0.5 (ARE llm_engine.py:17)
- Default max_tokens: 16384 (ARE llm_engine.py:16)
- Default max_iterations: 80 (ARE are_simulation_agent_config.py:36)
- Invalid format retry: up to 10 times (ARE base_agent.py:347)
- Iteration counter incremented EVERY loop (including errors) (ARE base_agent.py:849)
- Terminates inner loop on send_message_to_user (TERMINATED) or wait_for_notification (PAUSED)
- Max-iterations sends message to user via tool (ARE are_simulation.py:109-116)
- Pre-step notification polling (ARE steps/are_simulation.py:26-62)
iteration_count
property
iteration_count: int
Get current iteration count.
terminated
property
terminated: bool
Get whether the agent has terminated.
__init__
__init__(
tools: Dict[str, Callable],
model: ModelAdapter,
environment: Optional[Any] = None,
llm_args: Optional[Dict[str, Any]] = None,
max_iterations: int = _DEFAULT_MAX_ITERATIONS,
invalid_format_retries: int = _DEFAULT_INVALID_FORMAT_RETRIES,
simulated_generation_time_config: Optional[
SimulatedGenerationTimeConfig
] = None,
verbose: int = 0,
)
Initialize the agent.
| PARAMETER | DESCRIPTION |
|---|---|
tools
|
Dict of tool name -> callable
TYPE:
|
model
|
ModelAdapter for LLM interactions
TYPE:
|
environment
|
Optional Gaia2Environment for notification polling
TYPE:
|
llm_args
|
Additional arguments for model calls, passed as kwargs
to
Stop-token handling: Client-side stop-token truncation
(ARE litellm_engine.py:126-127) is always applied to the
response, regardless of whether None filtering: Parameters set to
TYPE:
|
max_iterations
|
Maximum iterations before stopping. Default 80.
TYPE:
|
invalid_format_retries
|
Max retries for invalid format. Default 10.
TYPE:
|
simulated_generation_time_config
|
Optional config for simulated generation time. When set, the simulation is paused during LLM generation and resumed with a time offset. Default None (disabled). ARE agents/are_simulation_agent_config.py:28-30
TYPE:
|
verbose
|
Verbosity level (0=quiet, 1=basic, 2=detailed)
TYPE:
|
get_messages
get_messages() -> List[Dict[str, Any]]
Get message history.
| RETURNS | DESCRIPTION |
|---|---|
List[Dict[str, Any]]
|
List of messages |
reset
reset() -> None
Reset agent state.
run
run(query: str) -> str
Execute task and return final response.
GAIA2 is event-driven: the real task instruction is delivered via the
notification system (first send_message_to_agent event). The outer
turn loop (_turn_loop) drains the notification queue and formats
user messages as [TASK], matching ARE's agent_loop().
When query is non-empty (e.g. standalone use), it is prepended as
a [TASK] message before entering the turn loop.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Task query/instructions (may be empty for GAIA2)
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
Final text response from agent |
Gaia2GenericTool
Bases: TraceableMixin, ConfigurableMixin
Framework-agnostic wrapper for ARE tools.
Provides clean API with built-in tracing for MASEval compatibility. Developers wrap this for their framework using composition.
Example for smolagents:
class MySmolagentsTool(smolagents.Tool):
skip_forward_signature_validation = True
def __init__(self, generic_tool: Gaia2GenericTool):
self.generic_tool = generic_tool
self.name = generic_tool.name
self.description = generic_tool.description
self.inputs = generic_tool.inputs
self.output_type = generic_tool.output_type
super().__init__()
def forward(self, **kwargs) -> str:
return self.generic_tool(**kwargs)
def gather_traces(self):
return self.generic_tool.gather_traces()
This wrapper preserves ARE's native return types while adding MASEval tracing capabilities and providing a framework-agnostic interface.
__call__
__call__(**kwargs: Any) -> Any
Execute tool and record invocation.
| PARAMETER | DESCRIPTION |
|---|---|
**kwargs
|
Tool arguments
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
Tool execution result (preserves ARE's native return type) |
__init__
__init__(are_tool: Any, environment: Gaia2Environment)
Initialize the tool wrapper.
| PARAMETER | DESCRIPTION |
|---|---|
are_tool
|
ARE AppTool instance to wrap
TYPE:
|
environment
|
The Gaia2Environment this tool belongs to
TYPE:
|
__repr__
__repr__() -> str
String representation of the tool.
gather_config
gather_config() -> Dict[str, Any]
Gather configuration from this tool.
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dictionary containing tool configuration |
gather_traces
gather_traces() -> Dict[str, Any]
Gather execution traces from this tool.
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dictionary containing tool traces with invocation history |
load_tasks
load_tasks(
capability: Optional[str] = None,
split: str = "validation",
limit: Optional[int] = None,
timeout_seconds: Optional[
float
] = DEFAULT_TIMEOUT_SECONDS,
max_retries: int = DEFAULT_MAX_RETRIES,
) -> TaskQueue
Load Gaia2 tasks from HuggingFace.
Each HuggingFace config corresponds to a capability (execution, search,
adaptability, time, ambiguity). When capability is None, all
capabilities are loaded and combined.
GAIA2 is event-driven: the task query is delivered to agents via the
notification system at runtime (first send_message_to_agent event),
not as a static field. task.query is left empty.
| PARAMETER | DESCRIPTION |
|---|---|
capability
|
Filter by capability type. None loads all capabilities.
TYPE:
|
split
|
Dataset split (currently only "validation" available)
TYPE:
|
limit
|
Maximum number of tasks to load (across all capabilities)
TYPE:
|
timeout_seconds
|
Maximum execution time per task. Default 1860 (31 minutes, matching ARE's DEFAULT_SCENARIO_TIMEOUT). Set to None to disable timeout.
TYPE:
|
max_retries
|
Maximum retry attempts. Default 1 (skip on failure).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
TaskQueue
|
TaskQueue with Task objects. |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If capability or split is invalid |
ImportError
|
If required dependencies are not installed |
Example
tasks = load_tasks(capability="execution", limit=5) len(tasks) 5
Load all capabilities
tasks = load_tasks(limit=10)
configure_model_ids
configure_model_ids(
tasks: Union[TaskQueue, List[Task]],
*,
evaluator_model_id: Optional[str] = None,
judge_engine_config: Optional[
Gaia2JudgeEngineConfig
] = None,
) -> Union[TaskQueue, List[Task]]
Configure model IDs and judge engine for benchmark components.
Gaia2's GraphPerEventJudge uses an LLM for semantic comparison of tool
arguments (email content, calendar descriptions, etc.). By default it uses
ARE's built-in defaults (meta-llama/Meta-Llama-3.3-70B-Instruct via
HuggingFace). Pass judge_engine_config to override the model/provider.
Note: Gaia2 doesn't have a user simulator (interactions happen through scheduled events), so there's no user_model_id.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
TaskQueue or list of Tasks to configure.
TYPE:
|
evaluator_model_id
|
Optional model ID for LLM-based evaluation.
TYPE:
|
judge_engine_config
|
Optional judge engine configuration. Controls
which LLM model and provider the ARE judge uses for semantic
comparison. When
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Union[TaskQueue, List[Task]]
|
The same collection (mutated in place for convenience). |
Example::
>>> tasks = load_tasks(capability="execution", limit=5)
>>> configure_model_ids(
... tasks,
... judge_engine_config=Gaia2JudgeEngineConfig(
... provider="openrouter",
... ),
... )
compute_gaia2_metrics
compute_gaia2_metrics(
results: List[Dict[str, Any]],
) -> Dict[str, Any]
Compute summary metrics across all Gaia2 benchmark results.
Matches ARE's scoring logic: - Only validated runs (non-null GSR) count toward success rate - Exceptions and no_validation results are excluded from scoring - ARE benchmark/report_stats.py: success_rate calculated only from validated runs
| PARAMETER | DESCRIPTION |
|---|---|
results
|
List of result dicts from benchmark.run()
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dict with metrics including total_tasks, scored_tasks, GSR, and per-capability breakdown. |