Tau2: Tool-Agent-User Interaction Benchmark (Beta)
Beta
This benchmark has been implemented carefully, but it is highly complex and we have not yet validated the results against the original implementation. Use with caution when comparing with existing results or the original paper's numbers. Contributions and compute donations welcome!
The Tau2 Benchmark evaluates LLM-based agents on customer service tasks across multiple real-world domains, testing their ability to use tools, follow policies, and interact with users.
Overview
Tau2-bench (Tool-Agent-User) is designed to evaluate single-agent customer service systems. The benchmark features:
- Real tool implementations that modify actual database state
- Deterministic evaluation via database state comparison
- Three domains: airline (50 tasks), retail (114 tasks), telecom (114 tasks)
- Pass@k metrics for robust evaluation with multiple runs
Reference Paper: Tau-Bench: A Benchmark for Tool-Agent-User Interaction in Real-World Domains
Check out the BENCHMARKS.md file for more information including licenses.
Quick Start
from maseval.benchmark.tau2 import (
Tau2Benchmark, Tau2Environment, Tau2Evaluator, Tau2User,
load_tasks, configure_model_ids, ensure_data_exists,
compute_benchmark_metrics, compute_pass_at_k,
)
# Ensure domain data is downloaded
ensure_data_exists(domain="retail")
# Load tasks and configure model IDs
tasks = load_tasks("retail", split="base", limit=5)
configure_model_ids(
tasks,
user_model_id="gpt-4o",
evaluator_model_id="gpt-4o",
)
# Create your framework-specific benchmark subclass
class MyTau2Benchmark(Tau2Benchmark):
def setup_agents(self, agent_data, environment, task, user):
tools = environment.tools
# Create your agent with these tools
...
def get_model_adapter(self, model_id, **kwargs):
adapter = MyModelAdapter(model_id)
if "register_name" in kwargs:
self.register("models", kwargs["register_name"], adapter)
return adapter
# Run benchmark
benchmark = MyTau2Benchmark(agent_data={}, n_task_repeats=4)
results = benchmark.run(tasks)
# Compute metrics
metrics = compute_benchmark_metrics(results)
pass_k = compute_pass_at_k(results, k_values=[1, 2, 3, 4])
For baseline comparisons, use DefaultAgentTau2Benchmark which mirrors the original tau2-bench implementation:
from maseval.benchmark.tau2 import DefaultAgentTau2Benchmark
benchmark = DefaultAgentTau2Benchmark(
agent_data={"model_id": "gpt-4o"},
n_task_repeats=4,
)
results = benchmark.run(tasks)
Tau2Benchmark
Bases: Benchmark
Tau2 Benchmark - Framework-agnostic base class.
This base class handles: - Environment setup with Tau2Environment (real tools) - Deterministic evaluation via database state comparison - Optional user simulation for multi-turn tasks
Users must subclass and implement: - setup_agents() for their agent framework - get_model_adapter() to provide model adapters
Model IDs for components are read from task data: - task.user_data["model_id"] for user simulator - task.evaluation_data["model_id"] for NL assertion evaluator (optional)
Use configure_model_ids() to set these values after loading tasks.
Example
class MyTau2Benchmark(Tau2Benchmark): def setup_agents(self, agent_data, environment, task, user, seed_generator): # Setup your agents here ...
def get_model_adapter(self, model_id, **kwargs):
seed = kwargs.get("seed") # Extract seed for reproducibility
return MyModelAdapter(model_id, seed=seed)
tasks = load_tasks("retail") configure_model_ids(tasks, user_model_id="gpt-4o")
benchmark = MyTau2Benchmark() benchmark.run(tasks)
seed_generator
property
seed_generator: SeedGenerator
The seed generator for this benchmark.
The seed generator is configured at benchmark initialization via the seed
or seed_generator parameters. When seed=None (the default), the generator's
derive_seed() method returns None, effectively disabling seeding while
maintaining a uniform interface.
| RETURNS | DESCRIPTION |
|---|---|
SeedGenerator
|
The root |
usage
property
usage: Usage
Running usage total across all task repetitions.
Queryable at any time, including while the benchmark is still running. Returns the grand total of all usage collected so far.
usage_by_component
property
usage_by_component: Dict[str, Usage]
Per-component running usage totals across all repetitions.
Keys are registry keys (e.g., "models:main_model").
__init__
__init__(
callbacks: Optional[List[BenchmarkCallback]] = None,
n_task_repeats: int = 1,
max_invocations: int = MAX_INVOCATIONS,
num_workers: int = 1,
fail_on_setup_error: bool = False,
fail_on_task_error: bool = False,
fail_on_evaluation_error: bool = False,
progress_bar: bool | str = True,
seed: Optional[int] = None,
seed_generator: Optional[SeedGenerator] = None,
)
Initialize benchmark with tau2-specific defaults.
| PARAMETER | DESCRIPTION |
|---|---|
callbacks
|
Optional list of callback handlers for monitoring execution.
TYPE:
|
n_task_repeats
|
Number of times to repeat each task. Default 1.
TYPE:
|
max_invocations
|
Maximum steps (default: 200, matching original DEFAULT_MAX_STEPS).
TYPE:
|
num_workers
|
Number of parallel task executions. Default 1 (sequential).
TYPE:
|
fail_on_setup_error
|
If True, raise on setup errors. Default False.
TYPE:
|
fail_on_task_error
|
If True, raise on task execution errors. Default False.
TYPE:
|
fail_on_evaluation_error
|
If True, raise on evaluation errors. Default False.
TYPE:
|
progress_bar
|
Progress display. True (default) for tqdm, "rich" for Rich, or False to disable.
TYPE:
|
seed
|
Global seed for reproducible benchmark runs.
TYPE:
|
seed_generator
|
Custom seed generator (takes precedence over seed).
TYPE:
|
add_callback
add_callback(callback: BenchmarkCallback) -> None
Register a callback handler to monitor benchmark execution.
| PARAMETER | DESCRIPTION |
|---|---|
callback
|
A BenchmarkCallback instance that will receive execution events.
TYPE:
|
How to use
Callbacks receive notifications at key lifecycle points for tracing, progress tracking,
or custom metrics collection. See BenchmarkCallback
for available hooks and their signatures.
from maseval.core.callbacks import MessageTracingCallback
benchmark = MyBenchmark(tasks=tasks, agent_data=config)
benchmark.add_callback(MessageTracingCallback(output_dir="logs"))
results = benchmark.run()
clear_registry
clear_registry() -> None
Clear the component registry after a task repetition completes.
This method is called automatically by run() after each task repetition
to ensure components are not carried over between repetitions. The
reports list persists across all repetitions for aggregated analysis.
collect_all_configs
collect_all_configs() -> Dict[str, Any]
Collect configuration from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition completes
and before evaluation begins. It gathers comprehensive configuration from all registered
components (agents, models, tools, simulators, callbacks, etc.) for that specific
repetition. After collection, the registry is cleared for the next repetition.
The collected configs are stored in benchmark.reports list along with traces
for persistent access across all task repetitions.
Output fields:
metadata- Collection timestamp and thread infoagents- Dict mapping agent names to their config (settings, parameters)models- Dict mapping model names to their config (model IDs, parameters)tools- Dict mapping tool names to their config (specifications, settings)simulators- Dict mapping simulator names to their config (parameters, templates)callbacks- Dict mapping callback names to their config (settings)environment- Direct config from the environment (not nested), orNoneif not presentuser- Direct config from the user simulator (not nested), orNoneif not presentother- Dict for any other registered componentsbenchmark- Benchmark-level configuration (git, system, packages)
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing configuration from all registered components. |
How to use
This method is called automatically by run() after each task repetition:
# Automatic collection (recommended)
results = benchmark.run()
# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
# Agents is a dict: agent_name -> config
print(f"Agent config: {report['config']['agents']['my_agent']}")
# Environment and user are direct (not nested)
print(f"Environment config: {report['config']['environment']}")
print(f"User config: {report['config']['user']}")
# Benchmark-level config
print(f"Git commit: {report['config']['benchmark']['git']['commit_hash']}")
The collected configs are available in the results for reproducibility analysis.
collect_all_traces
collect_all_traces() -> Dict[str, Any]
Collect execution traces from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition completes
and before evaluation begins. It gathers comprehensive traces from all registered
components (agents, models, tools, simulators, callbacks, etc.) for that specific
repetition. After collection, the registry is cleared for the next repetition.
The collected traces are stored in benchmark.reports list along with configs
for persistent access across all task repetitions.
Output fields:
metadata- Collection timestamp and thread infoagents- Dict mapping agent names to their traces (messages, execution data)models- Dict mapping model names to their traces (API calls, timing, errors)tools- Dict mapping tool names to their traces (invocations, parameters)simulators- Dict mapping simulator names to their traces (attempts, outcomes)callbacks- Dict mapping callback names to their traces (custom data)environment- Direct traces from the environment (not nested), orNoneif not presentuser- Direct traces from the user simulator (not nested), orNoneif not presentother- Dict for any other registered components
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing execution traces from all registered components. |
How to use
This method is called automatically by run() after each task repetition:
# Automatic collection (recommended)
results = benchmark.run()
# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
# Agents is a dict: agent_name -> traces
print(f"Agent messages: {report['traces']['agents']['my_agent']}")
# Environment and user are direct (not nested)
print(f"Environment state: {report['traces']['environment']}")
print(f"User interactions: {report['traces']['user']}")
The collected traces are passed to the evaluator's evaluate() method
and stored in benchmark.reports for later analysis.
collect_all_usage
collect_all_usage() -> Dict[str, Any]
Collect usage from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition
completes. It gathers usage from all registered UsageTrackableMixin
components and also accumulates into persistent running totals accessible
via usage and usage_by_component.
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing usage from all registered components. |
evaluate
evaluate(
evaluators: Sequence[Evaluator],
agents: Dict[str, AgentAdapter],
final_answer: Any,
traces: Dict[str, Any],
) -> List[Dict[str, Any]]
Evaluate using Tau2 evaluators.
Uses each evaluator's filter_traces() method to extract relevant data, then calls the evaluator with the filtered traces.
Returns tau2 format: - reward: Float [0.0, 1.0] - passed: Boolean - reward_breakdown: Per-evaluator scores - env_check, action_check, communicate_check: Detailed results
| PARAMETER | DESCRIPTION |
|---|---|
evaluators
|
List of evaluators
TYPE:
|
agents
|
Dict of agents
TYPE:
|
final_answer
|
Final answer from agents
TYPE:
|
traces
|
Execution traces
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[Dict[str, Any]]
|
List of evaluation result dicts |
execution_loop
execution_loop(
agents: Sequence[AgentAdapter],
task: Task,
environment: Tau2Environment,
user: Optional[Tau2User],
) -> Any
Execute agents with user-generated initial query.
C7: Matches original tau2-bench orchestrator.initialize(): The orchestrator sends the greeting to the user simulator, and the user LLM-generates the initial query (not pre-set from task.query). The agent never sees the greeting — only the user's first message.
Source: tau2-bench orchestrator.py:L34-36, L223-229
| PARAMETER | DESCRIPTION |
|---|---|
agents
|
Agents to execute.
TYPE:
|
task
|
The task being solved.
TYPE:
|
environment
|
The Tau2Environment providing tools and state.
TYPE:
|
user
|
Optional Tau2 user simulator.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
Final answer from the last agent execution. |
get_failed_tasks
get_failed_tasks(
status_filter: Optional[
Union[
TaskExecutionStatus, List[TaskExecutionStatus]
]
] = None,
reports: Optional[List[Dict[str, Any]]] = None,
) -> SequentialTaskQueue
Get tasks that failed during benchmark execution.
This method retrieves failed tasks based on their execution status, useful for debugging, retry logic, or failure analysis.
| PARAMETER | DESCRIPTION |
|---|---|
status_filter
|
Filter by specific failure status(es). If None, returns all failed tasks (any status except SUCCESS). Can be a single TaskExecutionStatus or a list of them. Examples: - TaskExecutionStatus.TASK_EXECUTION_FAILED: Only tasks that failed during execution - TaskExecutionStatus.EVALUATION_FAILED: Only tasks where evaluation failed - [TaskExecutionStatus.TASK_EXECUTION_FAILED, TaskExecutionStatus.SETUP_FAILED]: Tasks that failed during execution or setup
TYPE:
|
reports
|
Optional list of reports to analyze. If None, uses the reports from the last run() call. This allows analyzing externally stored or modified reports.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SequentialTaskQueue
|
SequentialTaskQueue containing the failed tasks. Empty if no failures match the filter. |
| RAISES | DESCRIPTION |
|---|---|
RuntimeError
|
If reports is None and run() has not been executed yet. |
How to use
# Run benchmark
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)
# Get all failed tasks (from internal state)
failed = benchmark.get_failed_tasks()
print(f"Failed: {len(failed)}/{len(benchmark.tasks)} tasks")
# Or work with returned reports (safe from internal state changes)
failed = benchmark.get_failed_tasks(reports=reports)
# Get only tasks that failed during execution (not evaluation)
execution_failures = benchmark.get_failed_tasks(
TaskExecutionStatus.TASK_EXECUTION_FAILED,
reports=reports
)
# Get setup and execution failures
critical_failures = benchmark.get_failed_tasks(
status_filter=[
TaskExecutionStatus.SETUP_FAILED,
TaskExecutionStatus.TASK_EXECUTION_FAILED
],
reports=reports
)
# Retry failed tasks elegantly - this is the key use case!
if len(failed) > 0:
retry_reports = benchmark.run(tasks=failed)
# Or more concisely
reports = benchmark.run(tasks=tasks)
retry_reports = benchmark.run(tasks=benchmark.get_failed_tasks())
get_model_adapter
abstractmethod
get_model_adapter(
model_id: str, **kwargs: Any
) -> ModelAdapter
Provide a ModelAdapter for benchmark components that require LLM access.
Many benchmark components beyond the agents themselves require access to language models. Common examples include:
- Tool simulators: Simulating tool responses when real APIs aren't available
- User simulators: Generating realistic user responses in multi-turn dialogues
- Judges/Evaluators: Using LLMs to assess agent performance against criteria
- Reward models: Computing scores for reinforcement learning
This method centralizes model provisioning, giving you control over which models are used throughout the benchmark. Implement this to return a configured ModelAdapter for the requested model.
| PARAMETER | DESCRIPTION |
|---|---|
model_id
|
The model identifier to use (e.g., "gemini-2.5-flash", "openrouter/google/gemini-2.5-flash", "gpt-4o"). This is passed by the benchmark when setting up components that need model access.
TYPE:
|
**kwargs
|
Additional arguments for adapter creation or registration. Common kwargs: - register_category: Category for trace registration (e.g., "models") - register_name: Name for trace registration (e.g., "evaluator_user_gsr")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ModelAdapter
|
A ModelAdapter instance configured for the specified model. For proper tracing, |
ModelAdapter
|
return a fresh adapter for each call rather than reusing instances. You can |
ModelAdapter
|
still share the underlying API client for efficiency. |
How to use
For proper tracing, register the adapter after creation using the kwargs:
def get_model_adapter(self, model_id: str, **kwargs: Any) -> ModelAdapter:
adapter = GoogleGenAIModelAdapter(self.client, model_id=model_id)
# Register for tracing if registration info provided
category = kwargs.get("register_category", "models")
name = kwargs.get("register_name", model_id)
self.register(category, name, adapter)
return adapter
The benchmark calls this method when setting up tools, user simulators, and evaluators. Each call creates a fresh adapter with its own trace log.
register
register(
category: str,
name: str,
component: RegisterableComponent,
) -> RegisterableComponent
Register a component for comprehensive trace and configuration collection.
All core MASEval components (AgentAdapter, ModelAdapter, Environment, User, LLMSimulator, BenchmarkCallback) inherit from TraceableMixin and/or ConfigurableMixin, and are automatically registered for both trace and configuration collection before evaluation.
Note: Most components are automatically registered when returned from
setup methods (setup_environment, setup_user, setup_agents). You only
need to manually register additional components like models, simulators, or
tools that aren't automatically captured.
| PARAMETER | DESCRIPTION |
|---|---|
category
|
Component category (e.g., "agents", "models", "tools", "simulators", "callbacks", "user", "environment", "seeding"). Use plural form to match the structure in collect_all_traces() and collect_all_configs().
TYPE:
|
name
|
Unique identifier for this component within its category
TYPE:
|
component
|
Any object inheriting from TraceableMixin and/or ConfigurableMixin
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RegisterableComponent
|
The component (for chaining convenience) |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the component is already registered under a different name |
How to use
Most components are auto-registered. Manual registration is only needed for additional components:
def setup_agents(self, agent_data, environment, task, user):
# Create model (needs manual registration)
model = MyModelAdapter(...)
self.register("models", "main_model", model)
# Create agent (auto-registered when returned)
agent = MyAgent(model=model)
agent_adapter = AgentAdapter(agent, "agent1")
# Environment and user are also auto-registered
return [agent_adapter], {"agent1": agent_adapter}
Traces and configs are automatically collected before evaluation via
collect_all_traces() and collect_all_configs() which are called
internally by the run() method.
run
run(
tasks: Union[
Task, BaseTaskQueue, Iterable[Union[Task, dict]]
],
agent_data: Dict[str, Any] | Iterable[Dict[str, Any]],
) -> List[Dict[str, Any]]
Initialize and execute the complete benchmark loop across all tasks.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
Task source for execution. Can be: - A single Task object - A BaseTaskQueue (SequentialTaskQueue, PriorityTaskQueue, or custom AdaptiveTaskQueue) - An iterable of Task objects or dicts that will be converted to Tasks When a BaseTaskQueue is provided, it controls the task ordering. AdaptiveTaskQueue subclasses are automatically registered as callbacks to receive task completion notifications.
TYPE:
|
agent_data
|
Configuration for agents. Either a single dict applied to all tasks, or an iterable of dicts with one configuration per task. Agent data typically includes model parameters, agent architecture details, and tool specifications.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[Dict[str, Any]]
|
List of report dictionaries, one per task repetition. Each report contains: |
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If agent_data length doesn't match number of tasks (when agent_data is an iterable). |
How to use
This is the framework's main orchestration method that runs your entire benchmark. It iterates through all tasks, handles repetitions, and manages the three-stage lifecycle for each execution. You don't implement this method—instead, you call it to start the benchmark after implementing the setup and execution methods.
By default, the benchmark will continue executing remaining tasks even if some fail.
You can change this behavior by setting fail_on_task_error=True,
fail_on_evaluation_error=True, or fail_on_setup_error=True when instantiating
the benchmark. Each task execution returns a status indicating success or the specific
failure type (see TaskExecutionStatus).
For each task execution, the framework:
- Calls your setup methods to initialize components
- Calls your
run_agents()method to execute the task - Collects message histories and calls evaluators
- Stores results and triggers callbacks
Pseudocode structure:
for task in tasks:
for repeat in range(n_task_repeats):
# Setup stage
environment = setup_environment(agent_data, task)
user = setup_user(agent_data, environment, task)
agents_to_run, agents_dict = setup_agents(agent_data, environment, task, user)
evaluators = setup_evaluators(environment, task, agents_to_run, user)
# Run stage (execution_loop handles multi-turn if user exists)
agents_output = execution_loop(agents_to_run, task, environment, user)
# Evaluate stage
traces = collect_message_histories(agents_dict)
eval_results = evaluate(evaluators, traces, agents_dict)
# Store results
store_result(task_id, traces, eval_results)
Callback hooks are triggered at these points:
- on_run_start: Before processing any tasks
- on_task_start: Before processing a task (once per task, not per repeat)
- on_task_repeat_start: Before each repetition of a task
- on_task_repeat_end: After each repetition completes
- on_task_end: After all repetitions of a task complete
- on_run_end: After all tasks complete
# Typical usage
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)
# Analyze results
for report in reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}: {report['eval']}")
print(f"Config: {report['config']}")
print(f"Traces: {report['traces']}")
# Parallel execution with 4 workers
benchmark = MyBenchmark(num_workers=4)
reports = benchmark.run(tasks=tasks, agent_data=config)
# Single agent config for all tasks
reports = benchmark.run(tasks=tasks, agent_data={"model": "gpt-4"})
# Task-specific agent configs (must match task count)
reports = benchmark.run(
tasks=tasks,
agent_data=[
{"model": "gpt-4", "difficulty": "easy"},
{"model": "gpt-4", "difficulty": "hard"},
]
)
# Priority-based execution
from maseval.core.task import PriorityTaskQueue
for task in tasks:
task.protocol.priority = compute_priority(task)
queue = PriorityTaskQueue(tasks)
reports = benchmark.run(tasks=queue, agent_data=config)
# Adaptive queue (auto-registered as callback)
queue = MyAdaptiveTaskQueue(tasks)
reports = benchmark.run(tasks=queue) # queue receives on_task_complete callbacks
run_agents
run_agents(
agents: Sequence[AgentAdapter],
task: Task,
environment: Tau2Environment,
query: str = "",
) -> Any
Execute agents and return final answer.
| PARAMETER | DESCRIPTION |
|---|---|
agents
|
Agent instances to run
TYPE:
|
task
|
Current task
TYPE:
|
environment
|
Tau2Environment
TYPE:
|
query
|
Query/prompt for agents
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
Final answer from agents |
setup_agents
abstractmethod
setup_agents(
agent_data: Dict[str, Any],
environment: Tau2Environment,
task: Task,
user: Optional[User],
seed_generator,
) -> Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]
Create agents for this task. Must be implemented by subclass.
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Agent configuration
TYPE:
|
environment
|
Tau2Environment with real tools
TYPE:
|
task
|
Current task
TYPE:
|
user
|
Optional user simulator
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]
|
Tuple of (ordered agent list, agent dict keyed by ID) |
setup_environment
setup_environment(
agent_data: Dict[str, Any], task: Task, seed_generator
) -> Tau2Environment
Create environment for a task.
Creates a Tau2Environment with real tool implementations for the task's domain.
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Agent configuration
TYPE:
|
task
|
Current task
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Tau2Environment
|
Tau2Environment instance |
setup_evaluators
setup_evaluators(
environment: Tau2Environment,
task: Task,
agents: Sequence[AgentAdapter],
user: Optional[User],
seed_generator,
) -> Sequence[Evaluator]
Create evaluator for the task.
Creates a Tau2Evaluator with optional NL assertion model. NL model ID is read from task.evaluation_data["model_id"].
| PARAMETER | DESCRIPTION |
|---|---|
environment
|
Tau2Environment instance
TYPE:
|
task
|
Current task with evaluation criteria
TYPE:
|
agents
|
Agent instances
TYPE:
|
user
|
Optional user simulator
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Sequence[Evaluator]
|
List with single Tau2Evaluator instance |
setup_user
setup_user(
agent_data: Dict[str, Any],
environment: Tau2Environment,
task: Task,
seed_generator: DefaultSeedGenerator,
) -> Optional[User]
Create Tau2 user simulator.
Creates a Tau2User with scenario from the task. Model ID is read from task.user_data["model_id"].
Scenario text is formatted to match original tau2-bench's
str(task.user_scenario) chain:
- StructuredUserInstructions.__str__() for dict instructions
- UserScenario.__str__() wrapping persona + instructions
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Agent configuration
TYPE:
|
environment
|
The task environment
TYPE:
|
task
|
Current task with user scenario
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Optional[User]
|
Tau2User instance |
Tau2User
Bases: User
Tau2-specific user simulator matching original tau2-bench UserSimulator.
Uses chat API with role-flipped messages, matching the original's architecture: - System message: simulation_guidelines + scenario - Messages: role-flipped (user->assistant, assistant->user) matching original's UserState.flip_roles() - Tools: native OpenAI function calling for user tools - Stop: exact case match for ###STOP###, ###TRANSFER###, ###OUT-OF-SCOPE### (tokens kept in content, skipped if message has tool_calls)
Adapted from: tau2-bench src/tau2/user/user_simulator.py
__init__
__init__(
model: ModelAdapter,
scenario: str,
initial_query: str,
tools: Optional[Dict[str, Callable]] = None,
tool_definitions: Optional[List[Dict[str, Any]]] = None,
llm_args: Optional[Dict[str, Any]] = None,
max_turns: int = 50,
exhausted_response: Optional[str] = None,
)
Initialize Tau2 user simulator.
| PARAMETER | DESCRIPTION |
|---|---|
model
|
ModelAdapter for LLM-based response generation
TYPE:
|
scenario
|
Full scenario text containing user instructions
TYPE:
|
initial_query
|
The initial query to the agent
TYPE:
|
tools
|
Optional dictionary of user tools (name -> callable)
TYPE:
|
tool_definitions
|
Optional OpenAI-format tool definitions for LLM
TYPE:
|
llm_args
|
Optional additional args for model.chat() (e.g. temperature)
TYPE:
|
max_turns
|
Maximum conversation turns
TYPE:
|
exhausted_response
|
Message to return when
TYPE:
|
gather_config
gather_config() -> Dict[str, Any]
Gather configuration from this component.
Provides a default implementation that returns basic metadata about the component (type and collection timestamp). Subclasses should extend this method to include their own configuration data.
This method is called by the Benchmark before evaluation to collect all configuration information. The returned dictionary must be JSON-serializable.
Output fields:
type- Component class namegathered_at- ISO timestamp of when config was collected
Subclasses typically add additional component-specific configuration.
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dictionary containing configuration with standardized structure. |
How to use
Override this method and call super().gather_config() to extend
the base implementation with your own data:
def gather_config(self) -> Dict[str, Any]:
return {
**super().gather_config(),
"model_name": self.model_name,
"temperature": self.temperature,
"max_tokens": self.max_tokens
}
If you don't need custom configuration tracking, you can use the default implementation without overriding (it will still return basic metadata about your component).
gather_traces
gather_traces() -> Dict[str, Any]
Gather traces with Tau2-specific information.
get_initial_query
get_initial_query() -> str
Return the initial query to start the conversation.
get_tool
get_tool() -> Any
Return a framework-compatible tool for agent interaction.
Some frameworks (smolagents, CAMEL) use a tool-based pattern where agents invoke an AskUser tool to interact with the user. Override this in subclasses for frameworks that need it.
| RETURNS | DESCRIPTION |
|---|---|
Any
|
Framework-specific tool, or |
inject_greeting
inject_greeting(greeting: str) -> None
Inject the agent's initial greeting into message history.
Must be called AFTER get_initial_query() returns. In the original tau2-bench, the orchestrator adds "Hi! How can I help you today?" as the first AssistantMessage before the user's initial query.
| PARAMETER | DESCRIPTION |
|---|---|
greeting
|
The greeting message to inject
TYPE:
|
is_done
is_done() -> bool
Check if the user interaction should terminate.
respond
respond(message: str) -> str
Respond to an agent message.
Matches original tau2-bench UserSimulator._generate_next_message: 1. Add agent message to history (as AssistantMessage) 2. Flip roles and generate via model.chat() 3. If tool_calls: execute, add results, generate again 4. Return final text response (with stop tokens kept in content)
| PARAMETER | DESCRIPTION |
|---|---|
message
|
The agent's message
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
The user's response text |
Tau2Environment
Bases: Environment
Environment for tau2 domains (airline, retail, telecom).
This environment manages REAL database state that tools actually modify. Provides methods for state verification.
Key Features: - Real tool implementations that modify database state - Deterministic state hashing for evaluation - Support for initial state setup from task data
Adapted from: tau2-bench src/tau2/environment/environment.py
db
property
db: DB
Get the domain database.
domain
property
domain: str
Get the domain name.
policy
property
policy: str
Get the domain policy text.
toolkit
property
toolkit: ToolKitBase
Get the domain toolkit.
user_toolkit
property
user_toolkit: Optional[ToolKitBase]
Get the domain user toolkit (if available).
__init__
__init__(
task_data: Dict[str, Any],
callbacks: Optional[List[Any]] = None,
)
Initialize environment for a domain.
| PARAMETER | DESCRIPTION |
|---|---|
task_data
|
Task data containing: - domain: Domain name ("retail", "airline", "telecom") - initial_state: Optional initial state setup - policy: Domain policy text (embedded during task loading) - db_path: Path to database file (embedded during task loading)
TYPE:
|
callbacks
|
Optional callbacks
TYPE:
|
create_tools
create_tools() -> Dict[str, Callable]
Create tools from the domain toolkit, wrapped with post-invocation sync.
create_user_tools
create_user_tools() -> Dict[str, Callable]
Create user tools from the domain user toolkit, wrapped with post-invocation sync.
gather_config
gather_config() -> Dict[str, Any]
Gather environment configuration.
gather_traces
gather_traces() -> Dict[str, Any]
Gather execution traces including database state changes.
get_db_hash
get_db_hash() -> str
Get hash of current agent database state.
For telecom domain, excludes the embedded user_db field so the
agent-side hash only reflects agent DB state. This matches the
original tau2-bench where TelecomDB and TelecomUserDB are
separate objects with independent hashes.
get_initial_db_hash
get_initial_db_hash() -> str
Get hash of initial database state.
get_response
get_response(
tool_name: str,
requestor: Literal["user", "assistant"] = "assistant",
tool_call_id: str = "",
**kwargs: Any,
) -> Dict[str, Any]
Execute a tool call with error handling and sync.
Matches original Environment.get_response() (environment.py:390-415). Catches exceptions, calls sync_tools on success, serializes result.
| PARAMETER | DESCRIPTION |
|---|---|
tool_name
|
Name of the tool to call
TYPE:
|
requestor
|
Who is making the call
TYPE:
|
tool_call_id
|
ID of the tool call (for matching)
TYPE:
|
**kwargs
|
Tool arguments
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dict with content (serialized result), error flag, requestor, tool_call_id |
get_tool
get_tool(name: str) -> Optional[Any]
Get a tool by name.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Tool name
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Optional[Any]
|
The tool, or None if not found |
get_tools
get_tools() -> Dict[str, Any]
Get all tools as a dict.
get_user_db_hash
get_user_db_hash() -> Optional[str]
Get hash of current user database state.
For telecom domain, hashes just the user_db (TelecomUserDB), matching original tau2-bench's get_user_db_hash() which calls user_tools.get_db_hash() on a separate user DB.
make_tool_call
make_tool_call(
tool_name: str,
requestor: Literal["user", "assistant"] = "assistant",
**kwargs: Any,
) -> Any
Execute a tool call, routing based on requestor.
Matches original Environment.make_tool_call() (environment.py:128-155). Does NOT call sync_tools — caller is responsible.
| PARAMETER | DESCRIPTION |
|---|---|
tool_name
|
Name of the tool
TYPE:
|
requestor
|
Who is making the call ("user" or "assistant")
TYPE:
|
**kwargs
|
Tool arguments
TYPE:
|
make_user_tool_call
make_user_tool_call(tool_name: str, **kwargs: Any) -> Any
Execute a user tool call.
run_env_assertion
run_env_assertion(
assertion: Dict[str, Any],
raise_assertion_error: bool = True,
) -> bool
Run an environment assertion.
Matches original Environment.run_env_assertion() (environment.py:183-201). Uses run_env_function_call (getattr), NOT use_tool.
| PARAMETER | DESCRIPTION |
|---|---|
assertion
|
Dict with env_type, func_name, arguments, assert_value, message
TYPE:
|
raise_assertion_error
|
If True, raise AssertionError on failure
TYPE:
|
run_env_function_call
run_env_function_call(
env_function_call: Dict[str, Any],
) -> Any
Execute an environment function call using getattr.
Matches original Environment.run_env_function_call() (environment.py:164-181). Uses getattr() on toolkit, NOT use_tool(). This is critical because assertion functions are NOT registered as @is_tool.
| PARAMETER | DESCRIPTION |
|---|---|
env_function_call
|
Dict with env_type, func_name, arguments
TYPE:
|
set_state
set_state(
initialization_data: Optional[Dict[str, Any]],
initialization_actions: Optional[List[Dict[str, Any]]],
message_history: List[Dict[str, Any]],
) -> None
Set environment state by replaying initialization data, actions, and message history.
Matches original Environment.set_state() (environment.py:263-335). Used by the evaluator to reconstruct predicted/gold environments.
| PARAMETER | DESCRIPTION |
|---|---|
initialization_data
|
Dict with agent_data, user_data for DB updates
TYPE:
|
initialization_actions
|
List of env function calls to execute
TYPE:
|
message_history
|
List of message dicts to replay tool calls from
TYPE:
|
setup_state
setup_state(task_data: Dict[str, Any]) -> Dict[str, Any]
Initialize environment state from task data.
Sets up: - db: Domain database loaded from data files - toolkit: Domain toolkit with tools - policy: Domain policy text - initial_db_hash: Hash of initial state
| PARAMETER | DESCRIPTION |
|---|---|
task_data
|
Task data with domain, initial_state, policy, db_path
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
State dictionary |
sync_tools
sync_tools() -> None
Synchronize agent-side and user-side state.
Called automatically after every tool invocation via wrapped callables. Currently only applies to telecom domain (no-op for retail/airline).
to_json_str
classmethod
to_json_str(resp: Any) -> str
Convert a response to a JSON string.
Matches original Environment.to_json_str() (environment.py:337-366).
Tau2Evaluator
Bases: Evaluator
Evaluator for tau2 benchmark tasks.
Combines multiple evaluation strategies: - Environment assertions (database state checks) - Action assertions (correct tool usage) - Communication assertions (appropriate responses)
Uses DETERMINISTIC evaluation based on actual database state comparison.
Adapted from: tau2-bench src/tau2/evaluator/
__call__
__call__(
traces: Dict[str, Any],
final_answer: Optional[str] = None,
) -> Dict[str, Any]
Evaluate task completion.
Matches original tau2-bench evaluate_simulation(): - Premature termination → reward=0.0 - Always runs ALL evaluators (M7: not gated by reward_basis) - Only uses reward_basis when COMBINING scores
| PARAMETER | DESCRIPTION |
|---|---|
traces
|
Filtered execution traces (from filter_traces)
TYPE:
|
final_answer
|
Final answer from agent
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dict with reward, passed, reward_breakdown, and per-evaluator results |
__init__
__init__(
task: Task,
environment: Tau2Environment,
nl_model: Optional[ModelAdapter] = None,
)
Initialize the evaluator.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
Task being evaluated
TYPE:
|
environment
|
Tau2Environment instance
TYPE:
|
nl_model
|
Optional model for NL assertion evaluation
TYPE:
|
filter_traces
filter_traces(traces: Dict[str, Any]) -> Dict[str, Any]
Build full message trajectory from agent and user traces.
Matches original tau2-bench where evaluate_simulation receives simulation.messages — a flat ordered list of ALL messages.
| PARAMETER | DESCRIPTION |
|---|---|
traces
|
Full execution traces
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dict with full_trajectory, environment traces, termination_reason |
DefaultAgentTau2Benchmark
Bases: Tau2Benchmark
Tau2 benchmark with default agent implementation.
This benchmark uses the DefaultTau2Agent which mirrors the original tau2-bench LLMAgent implementation for direct comparison.
Configuration via agent_data
- model_id: LLM model identifier (required)
- llm_args: Optional dict of additional LLM arguments
- max_tool_calls: Maximum tool calls per turn (default: 50)
- verbose: Verbosity level for debugging (0=silent, 1=basic, 2=detailed)
Example
from maseval.benchmark.tau2 import DefaultAgentTau2Benchmark, load_tasks, configure_model_ids
tasks = load_tasks("retail", split="base", limit=5) configure_model_ids(tasks, user_model_id="gpt-4o")
benchmark = DefaultAgentTau2Benchmark( agent_data={"model_id": "gpt-4o", "verbose": 1}, ) results = benchmark.run(tasks)
seed_generator
property
seed_generator: SeedGenerator
The seed generator for this benchmark.
The seed generator is configured at benchmark initialization via the seed
or seed_generator parameters. When seed=None (the default), the generator's
derive_seed() method returns None, effectively disabling seeding while
maintaining a uniform interface.
| RETURNS | DESCRIPTION |
|---|---|
SeedGenerator
|
The root |
usage
property
usage: Usage
Running usage total across all task repetitions.
Queryable at any time, including while the benchmark is still running. Returns the grand total of all usage collected so far.
usage_by_component
property
usage_by_component: Dict[str, Usage]
Per-component running usage totals across all repetitions.
Keys are registry keys (e.g., "models:main_model").
__init__
__init__(
callbacks: Optional[List[BenchmarkCallback]] = None,
n_task_repeats: int = 1,
max_invocations: int = MAX_INVOCATIONS,
num_workers: int = 1,
fail_on_setup_error: bool = False,
fail_on_task_error: bool = False,
fail_on_evaluation_error: bool = False,
progress_bar: bool | str = True,
seed: Optional[int] = None,
seed_generator: Optional[SeedGenerator] = None,
)
Initialize benchmark with tau2-specific defaults.
| PARAMETER | DESCRIPTION |
|---|---|
callbacks
|
Optional list of callback handlers for monitoring execution.
TYPE:
|
n_task_repeats
|
Number of times to repeat each task. Default 1.
TYPE:
|
max_invocations
|
Maximum steps (default: 200, matching original DEFAULT_MAX_STEPS).
TYPE:
|
num_workers
|
Number of parallel task executions. Default 1 (sequential).
TYPE:
|
fail_on_setup_error
|
If True, raise on setup errors. Default False.
TYPE:
|
fail_on_task_error
|
If True, raise on task execution errors. Default False.
TYPE:
|
fail_on_evaluation_error
|
If True, raise on evaluation errors. Default False.
TYPE:
|
progress_bar
|
Progress display. True (default) for tqdm, "rich" for Rich, or False to disable.
TYPE:
|
seed
|
Global seed for reproducible benchmark runs.
TYPE:
|
seed_generator
|
Custom seed generator (takes precedence over seed).
TYPE:
|
add_callback
add_callback(callback: BenchmarkCallback) -> None
Register a callback handler to monitor benchmark execution.
| PARAMETER | DESCRIPTION |
|---|---|
callback
|
A BenchmarkCallback instance that will receive execution events.
TYPE:
|
How to use
Callbacks receive notifications at key lifecycle points for tracing, progress tracking,
or custom metrics collection. See BenchmarkCallback
for available hooks and their signatures.
from maseval.core.callbacks import MessageTracingCallback
benchmark = MyBenchmark(tasks=tasks, agent_data=config)
benchmark.add_callback(MessageTracingCallback(output_dir="logs"))
results = benchmark.run()
clear_registry
clear_registry() -> None
Clear the component registry after a task repetition completes.
This method is called automatically by run() after each task repetition
to ensure components are not carried over between repetitions. The
reports list persists across all repetitions for aggregated analysis.
collect_all_configs
collect_all_configs() -> Dict[str, Any]
Collect configuration from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition completes
and before evaluation begins. It gathers comprehensive configuration from all registered
components (agents, models, tools, simulators, callbacks, etc.) for that specific
repetition. After collection, the registry is cleared for the next repetition.
The collected configs are stored in benchmark.reports list along with traces
for persistent access across all task repetitions.
Output fields:
metadata- Collection timestamp and thread infoagents- Dict mapping agent names to their config (settings, parameters)models- Dict mapping model names to their config (model IDs, parameters)tools- Dict mapping tool names to their config (specifications, settings)simulators- Dict mapping simulator names to their config (parameters, templates)callbacks- Dict mapping callback names to their config (settings)environment- Direct config from the environment (not nested), orNoneif not presentuser- Direct config from the user simulator (not nested), orNoneif not presentother- Dict for any other registered componentsbenchmark- Benchmark-level configuration (git, system, packages)
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing configuration from all registered components. |
How to use
This method is called automatically by run() after each task repetition:
# Automatic collection (recommended)
results = benchmark.run()
# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
# Agents is a dict: agent_name -> config
print(f"Agent config: {report['config']['agents']['my_agent']}")
# Environment and user are direct (not nested)
print(f"Environment config: {report['config']['environment']}")
print(f"User config: {report['config']['user']}")
# Benchmark-level config
print(f"Git commit: {report['config']['benchmark']['git']['commit_hash']}")
The collected configs are available in the results for reproducibility analysis.
collect_all_traces
collect_all_traces() -> Dict[str, Any]
Collect execution traces from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition completes
and before evaluation begins. It gathers comprehensive traces from all registered
components (agents, models, tools, simulators, callbacks, etc.) for that specific
repetition. After collection, the registry is cleared for the next repetition.
The collected traces are stored in benchmark.reports list along with configs
for persistent access across all task repetitions.
Output fields:
metadata- Collection timestamp and thread infoagents- Dict mapping agent names to their traces (messages, execution data)models- Dict mapping model names to their traces (API calls, timing, errors)tools- Dict mapping tool names to their traces (invocations, parameters)simulators- Dict mapping simulator names to their traces (attempts, outcomes)callbacks- Dict mapping callback names to their traces (custom data)environment- Direct traces from the environment (not nested), orNoneif not presentuser- Direct traces from the user simulator (not nested), orNoneif not presentother- Dict for any other registered components
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing execution traces from all registered components. |
How to use
This method is called automatically by run() after each task repetition:
# Automatic collection (recommended)
results = benchmark.run()
# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
# Agents is a dict: agent_name -> traces
print(f"Agent messages: {report['traces']['agents']['my_agent']}")
# Environment and user are direct (not nested)
print(f"Environment state: {report['traces']['environment']}")
print(f"User interactions: {report['traces']['user']}")
The collected traces are passed to the evaluator's evaluate() method
and stored in benchmark.reports for later analysis.
collect_all_usage
collect_all_usage() -> Dict[str, Any]
Collect usage from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition
completes. It gathers usage from all registered UsageTrackableMixin
components and also accumulates into persistent running totals accessible
via usage and usage_by_component.
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing usage from all registered components. |
evaluate
evaluate(
evaluators: Sequence[Evaluator],
agents: Dict[str, AgentAdapter],
final_answer: Any,
traces: Dict[str, Any],
) -> List[Dict[str, Any]]
Evaluate using Tau2 evaluators.
Uses each evaluator's filter_traces() method to extract relevant data, then calls the evaluator with the filtered traces.
Returns tau2 format: - reward: Float [0.0, 1.0] - passed: Boolean - reward_breakdown: Per-evaluator scores - env_check, action_check, communicate_check: Detailed results
| PARAMETER | DESCRIPTION |
|---|---|
evaluators
|
List of evaluators
TYPE:
|
agents
|
Dict of agents
TYPE:
|
final_answer
|
Final answer from agents
TYPE:
|
traces
|
Execution traces
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[Dict[str, Any]]
|
List of evaluation result dicts |
execution_loop
execution_loop(
agents: Sequence[AgentAdapter],
task: Task,
environment: Tau2Environment,
user: Optional[Tau2User],
) -> Any
Execute with step counting matching original orchestrator.
C8: The original counts steps per-message-appended: - Each agent LLM generation = 1 step - Each tool result = 1 step - Each user LLM generation = 1 step Steps during initialization (greeting + initial query) don't count.
| PARAMETER | DESCRIPTION |
|---|---|
agents
|
Agents to execute.
TYPE:
|
task
|
The task being solved.
TYPE:
|
environment
|
The Tau2Environment providing tools and state.
TYPE:
|
user
|
Optional Tau2 user simulator.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
Final answer from the last agent execution. |
get_failed_tasks
get_failed_tasks(
status_filter: Optional[
Union[
TaskExecutionStatus, List[TaskExecutionStatus]
]
] = None,
reports: Optional[List[Dict[str, Any]]] = None,
) -> SequentialTaskQueue
Get tasks that failed during benchmark execution.
This method retrieves failed tasks based on their execution status, useful for debugging, retry logic, or failure analysis.
| PARAMETER | DESCRIPTION |
|---|---|
status_filter
|
Filter by specific failure status(es). If None, returns all failed tasks (any status except SUCCESS). Can be a single TaskExecutionStatus or a list of them. Examples: - TaskExecutionStatus.TASK_EXECUTION_FAILED: Only tasks that failed during execution - TaskExecutionStatus.EVALUATION_FAILED: Only tasks where evaluation failed - [TaskExecutionStatus.TASK_EXECUTION_FAILED, TaskExecutionStatus.SETUP_FAILED]: Tasks that failed during execution or setup
TYPE:
|
reports
|
Optional list of reports to analyze. If None, uses the reports from the last run() call. This allows analyzing externally stored or modified reports.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SequentialTaskQueue
|
SequentialTaskQueue containing the failed tasks. Empty if no failures match the filter. |
| RAISES | DESCRIPTION |
|---|---|
RuntimeError
|
If reports is None and run() has not been executed yet. |
How to use
# Run benchmark
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)
# Get all failed tasks (from internal state)
failed = benchmark.get_failed_tasks()
print(f"Failed: {len(failed)}/{len(benchmark.tasks)} tasks")
# Or work with returned reports (safe from internal state changes)
failed = benchmark.get_failed_tasks(reports=reports)
# Get only tasks that failed during execution (not evaluation)
execution_failures = benchmark.get_failed_tasks(
TaskExecutionStatus.TASK_EXECUTION_FAILED,
reports=reports
)
# Get setup and execution failures
critical_failures = benchmark.get_failed_tasks(
status_filter=[
TaskExecutionStatus.SETUP_FAILED,
TaskExecutionStatus.TASK_EXECUTION_FAILED
],
reports=reports
)
# Retry failed tasks elegantly - this is the key use case!
if len(failed) > 0:
retry_reports = benchmark.run(tasks=failed)
# Or more concisely
reports = benchmark.run(tasks=tasks)
retry_reports = benchmark.run(tasks=benchmark.get_failed_tasks())
get_model_adapter
abstractmethod
get_model_adapter(
model_id: str, **kwargs: Any
) -> ModelAdapter
Get or create a model adapter.
Must be implemented by subclass to provide the actual ModelAdapter implementation for the desired LLM provider.
| PARAMETER | DESCRIPTION |
|---|---|
model_id
|
Model identifier
TYPE:
|
**kwargs
|
Additional arguments (e.g., register_name for tracing)
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ModelAdapter
|
ModelAdapter instance |
Note
DefaultAgentTau2Benchmark uses lazy initialization for model caching.
Access via getattr(self, '_model_cache', {}) in subclass implementations.
register
register(
category: str,
name: str,
component: RegisterableComponent,
) -> RegisterableComponent
Register a component for comprehensive trace and configuration collection.
All core MASEval components (AgentAdapter, ModelAdapter, Environment, User, LLMSimulator, BenchmarkCallback) inherit from TraceableMixin and/or ConfigurableMixin, and are automatically registered for both trace and configuration collection before evaluation.
Note: Most components are automatically registered when returned from
setup methods (setup_environment, setup_user, setup_agents). You only
need to manually register additional components like models, simulators, or
tools that aren't automatically captured.
| PARAMETER | DESCRIPTION |
|---|---|
category
|
Component category (e.g., "agents", "models", "tools", "simulators", "callbacks", "user", "environment", "seeding"). Use plural form to match the structure in collect_all_traces() and collect_all_configs().
TYPE:
|
name
|
Unique identifier for this component within its category
TYPE:
|
component
|
Any object inheriting from TraceableMixin and/or ConfigurableMixin
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RegisterableComponent
|
The component (for chaining convenience) |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the component is already registered under a different name |
How to use
Most components are auto-registered. Manual registration is only needed for additional components:
def setup_agents(self, agent_data, environment, task, user):
# Create model (needs manual registration)
model = MyModelAdapter(...)
self.register("models", "main_model", model)
# Create agent (auto-registered when returned)
agent = MyAgent(model=model)
agent_adapter = AgentAdapter(agent, "agent1")
# Environment and user are also auto-registered
return [agent_adapter], {"agent1": agent_adapter}
Traces and configs are automatically collected before evaluation via
collect_all_traces() and collect_all_configs() which are called
internally by the run() method.
run
run(
tasks: Union[
Task, BaseTaskQueue, Iterable[Union[Task, dict]]
],
agent_data: Dict[str, Any] | Iterable[Dict[str, Any]],
) -> List[Dict[str, Any]]
Initialize and execute the complete benchmark loop across all tasks.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
Task source for execution. Can be: - A single Task object - A BaseTaskQueue (SequentialTaskQueue, PriorityTaskQueue, or custom AdaptiveTaskQueue) - An iterable of Task objects or dicts that will be converted to Tasks When a BaseTaskQueue is provided, it controls the task ordering. AdaptiveTaskQueue subclasses are automatically registered as callbacks to receive task completion notifications.
TYPE:
|
agent_data
|
Configuration for agents. Either a single dict applied to all tasks, or an iterable of dicts with one configuration per task. Agent data typically includes model parameters, agent architecture details, and tool specifications.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[Dict[str, Any]]
|
List of report dictionaries, one per task repetition. Each report contains: |
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If agent_data length doesn't match number of tasks (when agent_data is an iterable). |
How to use
This is the framework's main orchestration method that runs your entire benchmark. It iterates through all tasks, handles repetitions, and manages the three-stage lifecycle for each execution. You don't implement this method—instead, you call it to start the benchmark after implementing the setup and execution methods.
By default, the benchmark will continue executing remaining tasks even if some fail.
You can change this behavior by setting fail_on_task_error=True,
fail_on_evaluation_error=True, or fail_on_setup_error=True when instantiating
the benchmark. Each task execution returns a status indicating success or the specific
failure type (see TaskExecutionStatus).
For each task execution, the framework:
- Calls your setup methods to initialize components
- Calls your
run_agents()method to execute the task - Collects message histories and calls evaluators
- Stores results and triggers callbacks
Pseudocode structure:
for task in tasks:
for repeat in range(n_task_repeats):
# Setup stage
environment = setup_environment(agent_data, task)
user = setup_user(agent_data, environment, task)
agents_to_run, agents_dict = setup_agents(agent_data, environment, task, user)
evaluators = setup_evaluators(environment, task, agents_to_run, user)
# Run stage (execution_loop handles multi-turn if user exists)
agents_output = execution_loop(agents_to_run, task, environment, user)
# Evaluate stage
traces = collect_message_histories(agents_dict)
eval_results = evaluate(evaluators, traces, agents_dict)
# Store results
store_result(task_id, traces, eval_results)
Callback hooks are triggered at these points:
- on_run_start: Before processing any tasks
- on_task_start: Before processing a task (once per task, not per repeat)
- on_task_repeat_start: Before each repetition of a task
- on_task_repeat_end: After each repetition completes
- on_task_end: After all repetitions of a task complete
- on_run_end: After all tasks complete
# Typical usage
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)
# Analyze results
for report in reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}: {report['eval']}")
print(f"Config: {report['config']}")
print(f"Traces: {report['traces']}")
# Parallel execution with 4 workers
benchmark = MyBenchmark(num_workers=4)
reports = benchmark.run(tasks=tasks, agent_data=config)
# Single agent config for all tasks
reports = benchmark.run(tasks=tasks, agent_data={"model": "gpt-4"})
# Task-specific agent configs (must match task count)
reports = benchmark.run(
tasks=tasks,
agent_data=[
{"model": "gpt-4", "difficulty": "easy"},
{"model": "gpt-4", "difficulty": "hard"},
]
)
# Priority-based execution
from maseval.core.task import PriorityTaskQueue
for task in tasks:
task.protocol.priority = compute_priority(task)
queue = PriorityTaskQueue(tasks)
reports = benchmark.run(tasks=queue, agent_data=config)
# Adaptive queue (auto-registered as callback)
queue = MyAdaptiveTaskQueue(tasks)
reports = benchmark.run(tasks=queue) # queue receives on_task_complete callbacks
run_agents
run_agents(
agents: Sequence[AgentAdapter],
task: Task,
environment: Tau2Environment,
query: str = "",
) -> Any
Execute agents and return final answer.
| PARAMETER | DESCRIPTION |
|---|---|
agents
|
Agent instances to run
TYPE:
|
task
|
Current task
TYPE:
|
environment
|
Tau2Environment
TYPE:
|
query
|
Query/prompt for agents
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
Final answer from agents |
setup_agents
setup_agents(
agent_data: Dict[str, Any],
environment: Tau2Environment,
task: Task,
user: Optional[User],
seed_generator: DefaultSeedGenerator,
) -> Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]
Create the default tau2 agent.
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Agent configuration with model_id
TYPE:
|
environment
|
Tau2Environment with real tools
TYPE:
|
task
|
Current task
TYPE:
|
user
|
Optional user simulator
TYPE:
|
seed_generator
|
Seed generator for deriving agent seeds
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]
|
Tuple of (agent list, agent dict) |
setup_environment
setup_environment(
agent_data: Dict[str, Any], task: Task, seed_generator
) -> Tau2Environment
Create environment for a task.
Creates a Tau2Environment with real tool implementations for the task's domain.
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Agent configuration
TYPE:
|
task
|
Current task
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Tau2Environment
|
Tau2Environment instance |
setup_evaluators
setup_evaluators(
environment: Tau2Environment,
task: Task,
agents: Sequence[AgentAdapter],
user: Optional[User],
seed_generator,
) -> Sequence[Evaluator]
Create evaluator for the task.
Creates a Tau2Evaluator with optional NL assertion model. NL model ID is read from task.evaluation_data["model_id"].
| PARAMETER | DESCRIPTION |
|---|---|
environment
|
Tau2Environment instance
TYPE:
|
task
|
Current task with evaluation criteria
TYPE:
|
agents
|
Agent instances
TYPE:
|
user
|
Optional user simulator
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Sequence[Evaluator]
|
List with single Tau2Evaluator instance |
setup_user
setup_user(
agent_data: Dict[str, Any],
environment: Tau2Environment,
task: Task,
seed_generator: DefaultSeedGenerator,
) -> Optional[User]
Create Tau2 user simulator.
Creates a Tau2User with scenario from the task. Model ID is read from task.user_data["model_id"].
Scenario text is formatted to match original tau2-bench's
str(task.user_scenario) chain:
- StructuredUserInstructions.__str__() for dict instructions
- UserScenario.__str__() wrapping persona + instructions
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Agent configuration
TYPE:
|
environment
|
The task environment
TYPE:
|
task
|
Current task with user scenario
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Optional[User]
|
Tau2User instance |
DefaultTau2Agent
Default agent implementation matching original tau2-bench LLMAgent.
This agent mirrors the behavior of the original tau2-bench LLMAgent class, enabling direct comparison with the original benchmark results.
The agent uses a simple ReAct-style loop: 1. Receives user message 2. Generates response (text or tool call) 3. If tool call: executes tool and loops back to step 2 4. If text: returns text as response
Original implementation: tau2-bench/src/tau2/agent/llm_agent.py
| ATTRIBUTE | DESCRIPTION |
|---|---|
tools |
Dictionary mapping tool names to callables
|
policy |
Domain policy text (markdown)
|
model |
ModelAdapter for LLM calls
|
llm_args |
Additional arguments for LLM calls
|
max_tool_calls |
Maximum tool calls per turn (prevents infinite loops)
|
verbose |
Verbosity level (0=silent, 1=basic, 2=detailed)
|
__init__
__init__(
tools: Dict[str, Callable],
policy: str,
model: ModelAdapter,
llm_args: Optional[Dict[str, Any]] = None,
max_tool_calls: int = 50,
verbose: int = 0,
)
Initialize the default tau2 agent.
| PARAMETER | DESCRIPTION |
|---|---|
tools
|
Dictionary mapping tool names to callable implementations
TYPE:
|
policy
|
Domain policy text (markdown format)
TYPE:
|
model
|
ModelAdapter for making LLM calls
TYPE:
|
llm_args
|
Optional additional arguments passed to model.generate()
TYPE:
|
max_tool_calls
|
Maximum number of tool calls per agent turn
TYPE:
|
verbose
|
Verbosity level for debugging output: - 0: Silent (no output) - 1: Basic (tool calls and responses) - 2: Detailed (full message contents, tool arguments and results)
TYPE:
|
get_messages
get_messages() -> List[Dict[str, Any]]
Get the current message history.
| RETURNS | DESCRIPTION |
|---|---|
List[Dict[str, Any]]
|
List of message dictionaries |
reset
reset() -> None
Reset the agent state for a new conversation.
run
run(query: str) -> str
Process a user query and return the agent's response.
This method handles the full agent turn: 1. Adds user message to history 2. Generates LLM response with tool access 3. If tool call: executes tools and continues generating 4. Returns final text response to user
| PARAMETER | DESCRIPTION |
|---|---|
query
|
The user's message/query
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
Agent's text response to the user |
load_tasks
load_tasks(
domain: str,
split: str = "base",
data_dir: Optional[Path] = None,
limit: Optional[int] = None,
timeout_seconds: Optional[
float
] = DEFAULT_TIMEOUT_SECONDS,
max_retries: int = DEFAULT_MAX_RETRIES,
) -> TaskQueue
Load tasks for a tau2 domain.
| PARAMETER | DESCRIPTION |
|---|---|
domain
|
One of "airline", "retail", "telecom"
TYPE:
|
split
|
One of "base", "hard", "all" (base recommended for reproducibility)
TYPE:
|
data_dir
|
Base data directory (default: module's data/)
TYPE:
|
limit
|
Maximum number of tasks to load
TYPE:
|
timeout_seconds
|
Maximum execution time per task in seconds. Default 600 (10 minutes). Set to None to disable timeout.
TYPE:
|
max_retries
|
Maximum retry attempts for transient failures. Default 1 (skip on failure).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
TaskQueue
|
TaskQueue containing Task objects with: - id: Task identifier from tau2 data - query: Initial user message (from user_scenario) - environment_data: Domain tools, database state, policies - evaluation_data: Assertions, expected outcomes - user_data: User profile, instructions - metadata: domain, split, description - protocol: Execution settings (timeout, retries, tags) |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If domain or split is invalid |
FileNotFoundError
|
If tasks.json doesn't exist |
Example
tasks = load_tasks("retail", split="base", limit=5) len(tasks) 5
Custom timeout and retries
tasks = load_tasks("retail", timeout_seconds=300, max_retries=2)
configure_model_ids
configure_model_ids(
tasks: Union[TaskQueue, List[Task]],
*,
user_model_id: Optional[str] = None,
evaluator_model_id: Optional[str] = None,
) -> Union[TaskQueue, List[Task]]
Configure model IDs for benchmark components in task data.
Tau2 tools execute real business logic and don't need a tool_model_id. Only user simulation and evaluation use LLMs.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
TaskQueue or list of Tasks to configure
TYPE:
|
user_model_id
|
Model ID for user simulator (stored in user_data)
TYPE:
|
evaluator_model_id
|
Model ID for evaluators (stored in evaluation_data)
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Union[TaskQueue, List[Task]]
|
The same collection (mutated in place for convenience) |
Example
tasks = load_tasks("retail", limit=5) configure_model_ids( ... tasks, ... user_model_id="gpt-4o", ... evaluator_model_id="gpt-4o", ... )
ensure_data_exists
ensure_data_exists(
data_dir: Optional[Path] = None,
domain: Optional[str] = None,
force_download: bool = False,
verbose: int = 1,
) -> Path
Ensure domain data exists, downloading if needed.
| PARAMETER | DESCRIPTION |
|---|---|
data_dir
|
Base data directory (default: module's data/)
TYPE:
|
domain
|
Specific domain to check/download, or None for all
TYPE:
|
force_download
|
If True, re-download even if data exists
TYPE:
|
verbose
|
0=silent, 1=summary, 2=detailed
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Path
|
Path to the data directory |
Example
ensure_data_exists(domain="retail") PosixPath('.../maseval/benchmark/tau2/data')
compute_benchmark_metrics
compute_benchmark_metrics(
results: List[Dict[str, Any]],
) -> Dict[str, Any]
Compute summary metrics across all benchmark results.
H9: ALL simulations count in the denominator (matching original). Terminated simulations get reward=0.0 (handled by evaluator).
| PARAMETER | DESCRIPTION |
|---|---|
results
|
List of result dicts from benchmark.run()
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dict with success_rate, mean_reward, status_counts |
compute_pass_at_k
compute_pass_at_k(
results: List[Dict[str, Any]],
k_values: List[int] = [1, 2, 3, 4],
) -> Dict[str, float]
Compute Pass@k metrics from benchmark results.
Pass@k: Probability that at least 1 of k attempts succeeds. H9: ALL simulations count (terminated ones are failures).
| PARAMETER | DESCRIPTION |
|---|---|
results
|
List of result dicts from benchmark.run()
TYPE:
|
k_values
|
k values to compute (default: 1, 2, 3, 4 per tau2 paper)
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, float]
|
Dict with pass@1, pass@2, etc. scores |
compute_pass_hat_k
compute_pass_hat_k(
results: List[Dict[str, Any]],
k_values: Optional[List[int]] = None,
) -> Dict[str, float]
Compute Pass^k metrics from benchmark results.
Pass^k is the combinatorial metric from the tau2 paper that estimates the probability of k successes in k draws without replacement.
This differs from Pass@k which only checks if at least 1 of k attempts succeeds.
Requires running benchmark with n_task_repeats >= max(k_values).
| PARAMETER | DESCRIPTION |
|---|---|
results
|
List of result dicts from benchmark.run()
TYPE:
|
k_values
|
k values to compute. If None, uses 1 to max trials.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, float]
|
Dict with pass^1, pass^2, etc. scores (averaged across all tasks) |