Skip to content

MMLU: Massive Multitask Language Understanding (Beta)

Beta

This benchmark has been implemented carefully, but we have not yet validated the results against the original implementation. Use with caution when comparing with existing results or the original paper's numbers. Contributions and compute donations welcome!

The MMLU Benchmark evaluates language models on multiple-choice questions spanning 57 academic subjects. The MASEval integration supports anchor-point-based evaluation for DISCO prediction, enabling efficient estimation of full benchmark performance from a subset of tasks.

Overview

MMLU (Hendrycks et al., 2021) is a widely used benchmark for measuring knowledge and reasoning across diverse domains. The MASEval implementation features:

  • Log-likelihood MCQ evaluation matching lm-evaluation-harness methodology
  • Anchor-point task selection via DISCOQueue for DISCO-style subset evaluation
  • HuggingFace integration with batched log-probability computation
  • lm-eval compatibility mode for exact numerical reproduction

Check out the BENCHMARKS.md file for more information including licenses.

Installation

Install MMLU with all dependencies needed to run the HuggingFace benchmark and example script:

pip install maseval[mmlu]

Or with uv:

uv sync --extra mmlu

This installs transformers, torch, numpy, and huggingface_hub (the latter two via transformers). You can then run the example:

python examples/mmlu_benchmark/mmlu_benchmark.py --model_id alignment-handbook/zephyr-7b-sft-full

For DISCO prediction support:

pip install maseval[disco]

For exact lm-evaluation-harness reproduction:

pip install maseval[lm-eval]

Quick Start

from maseval.benchmark.mmlu import (
    DefaultMMLUBenchmark,
    load_tasks,
    compute_benchmark_metrics,
)

# Load tasks (downloads from HuggingFace automatically)
tasks = load_tasks(data_path="/path/to/mmlu_prompts_examples.json")

# Create benchmark with HuggingFace model
benchmark = DefaultMMLUBenchmark(
    model_id="meta-llama/Llama-2-7b-hf",
    device="cuda:0",
)

# Run evaluation
results = benchmark.run(
    tasks=tasks,
    agent_data={"model_id": "meta-llama/Llama-2-7b-hf"},
)

# Compute metrics
metrics = compute_benchmark_metrics(results)
print(f"Accuracy: {metrics['acc']:.4f}")

With Anchor Points (DISCO)

from maseval.benchmark.mmlu import load_tasks

# Load tasks filtered to anchor points
tasks = load_tasks(
    data_path="/path/to/mmlu_prompts_examples.json",
    anchor_points_path="/path/to/anchor_points.json",
)

# tasks is a DISCOQueue — only anchor tasks are evaluated
print(f"Evaluating {len(tasks)} anchor tasks")

Custom Benchmark Subclass

MMLUBenchmark is a framework-agnostic base class. To use a different model backend, subclass it and implement setup_agents() and get_model_adapter():

from maseval import AgentAdapter
from maseval.core.history import MessageHistory
from maseval.benchmark.mmlu import MMLUBenchmark

class MyAgentAdapter(AgentAdapter):
    def __init__(self, model, name):
        super().__init__(model, name)
        self._messages = []

    def _run_agent(self, query):
        self._messages.append({"role": "user", "content": query})
        response = self.agent.generate(query)
        self._messages.append({"role": "assistant", "content": response})
        return response

    def get_messages(self):
        return MessageHistory(self._messages)

class MyMMLUBenchmark(MMLUBenchmark):
    def setup_agents(self, agent_data, environment, task, user, seed_generator):
        model = self.get_model_adapter(agent_data["model_id"])
        adapter = MyAgentAdapter(model, name="mmlu_agent")
        return [adapter], {"mmlu_agent": adapter}

    def get_model_adapter(self, model_id, **kwargs):
        adapter = MyModelAdapter(model_id)
        register_name = kwargs.get("register_name")
        if register_name:
            self.register("models", register_name, adapter)
        return adapter

API Reference

MMLUBenchmark

Bases: Benchmark

MMLU Benchmark - Framework-agnostic base class.

Evaluates language models on MMLU multiple choice questions. Supports anchor point-based evaluation for DISCO prediction.

Subclasses must implement:

  • setup_agents() - create agents for MCQ evaluation
  • get_model_adapter() - provide model adapters

For a ready-to-use implementation, see DefaultMMLUBenchmark.

seed_generator property

seed_generator: SeedGenerator

The seed generator for this benchmark.

The seed generator is configured at benchmark initialization via the seed or seed_generator parameters. When seed=None (the default), the generator's derive_seed() method returns None, effectively disabling seeding while maintaining a uniform interface.

RETURNS DESCRIPTION
SeedGenerator

The root SeedGenerator instance.

usage property

usage: Usage

Running usage total across all task repetitions.

Queryable at any time, including while the benchmark is still running. Returns the grand total of all usage collected so far.

usage_by_component property

usage_by_component: Dict[str, Usage]

Per-component running usage totals across all repetitions.

Keys are registry keys (e.g., "models:main_model").

__init__

__init__(
    use_full_prompt: bool = False,
    callbacks: Optional[List[Any]] = None,
    n_task_repeats: int = 1,
    **kwargs: Any,
)

Initialize benchmark.

PARAMETER DESCRIPTION
use_full_prompt

If True, use full_prompt (with few-shot examples) instead of just the query.

TYPE: bool DEFAULT: False

callbacks

Benchmark callbacks.

TYPE: Optional[List[Any]] DEFAULT: None

n_task_repeats

Repetitions per task.

TYPE: int DEFAULT: 1

add_callback

add_callback(callback: BenchmarkCallback) -> None

Register a callback handler to monitor benchmark execution.

PARAMETER DESCRIPTION
callback

A BenchmarkCallback instance that will receive execution events.

TYPE: BenchmarkCallback

How to use

Callbacks receive notifications at key lifecycle points for tracing, progress tracking, or custom metrics collection. See BenchmarkCallback for available hooks and their signatures.

from maseval.core.callbacks import MessageTracingCallback

benchmark = MyBenchmark(tasks=tasks, agent_data=config)
benchmark.add_callback(MessageTracingCallback(output_dir="logs"))
results = benchmark.run()

clear_registry

clear_registry() -> None

Clear the component registry after a task repetition completes.

This method is called automatically by run() after each task repetition to ensure components are not carried over between repetitions. The reports list persists across all repetitions for aggregated analysis.

collect_all_configs

collect_all_configs() -> Dict[str, Any]

Collect configuration from all registered components for the current task repetition.

This method is called automatically by run() after each task repetition completes and before evaluation begins. It gathers comprehensive configuration from all registered components (agents, models, tools, simulators, callbacks, etc.) for that specific repetition. After collection, the registry is cleared for the next repetition.

The collected configs are stored in benchmark.reports list along with traces for persistent access across all task repetitions.

Output fields:

  • metadata - Collection timestamp and thread info
  • agents - Dict mapping agent names to their config (settings, parameters)
  • models - Dict mapping model names to their config (model IDs, parameters)
  • tools - Dict mapping tool names to their config (specifications, settings)
  • simulators - Dict mapping simulator names to their config (parameters, templates)
  • callbacks - Dict mapping callback names to their config (settings)
  • environment - Direct config from the environment (not nested), or None if not present
  • user - Direct config from the user simulator (not nested), or None if not present
  • other - Dict for any other registered components
  • benchmark - Benchmark-level configuration (git, system, packages)
RETURNS DESCRIPTION
Dict[str, Any]

Structured dictionary containing configuration from all registered components.

How to use

This method is called automatically by run() after each task repetition:

# Automatic collection (recommended)
results = benchmark.run()

# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
    print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
    # Agents is a dict: agent_name -> config
    print(f"Agent config: {report['config']['agents']['my_agent']}")
    # Environment and user are direct (not nested)
    print(f"Environment config: {report['config']['environment']}")
    print(f"User config: {report['config']['user']}")
    # Benchmark-level config
    print(f"Git commit: {report['config']['benchmark']['git']['commit_hash']}")

The collected configs are available in the results for reproducibility analysis.

collect_all_traces

collect_all_traces() -> Dict[str, Any]

Collect execution traces from all registered components for the current task repetition.

This method is called automatically by run() after each task repetition completes and before evaluation begins. It gathers comprehensive traces from all registered components (agents, models, tools, simulators, callbacks, etc.) for that specific repetition. After collection, the registry is cleared for the next repetition.

The collected traces are stored in benchmark.reports list along with configs for persistent access across all task repetitions.

Output fields:

  • metadata - Collection timestamp and thread info
  • agents - Dict mapping agent names to their traces (messages, execution data)
  • models - Dict mapping model names to their traces (API calls, timing, errors)
  • tools - Dict mapping tool names to their traces (invocations, parameters)
  • simulators - Dict mapping simulator names to their traces (attempts, outcomes)
  • callbacks - Dict mapping callback names to their traces (custom data)
  • environment - Direct traces from the environment (not nested), or None if not present
  • user - Direct traces from the user simulator (not nested), or None if not present
  • other - Dict for any other registered components
RETURNS DESCRIPTION
Dict[str, Any]

Structured dictionary containing execution traces from all registered components.

How to use

This method is called automatically by run() after each task repetition:

# Automatic collection (recommended)
results = benchmark.run()

# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
    print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
    # Agents is a dict: agent_name -> traces
    print(f"Agent messages: {report['traces']['agents']['my_agent']}")
    # Environment and user are direct (not nested)
    print(f"Environment state: {report['traces']['environment']}")
    print(f"User interactions: {report['traces']['user']}")

The collected traces are passed to the evaluator's evaluate() method and stored in benchmark.reports for later analysis.

collect_all_usage

collect_all_usage() -> Dict[str, Any]

Collect usage from all registered components for the current task repetition.

This method is called automatically by run() after each task repetition completes. It gathers usage from all registered UsageTrackableMixin components and also accumulates into persistent running totals accessible via usage and usage_by_component.

RETURNS DESCRIPTION
Dict[str, Any]

Structured dictionary containing usage from all registered components.

evaluate

evaluate(
    evaluators: Sequence[Evaluator],
    agents: Dict[str, AgentAdapter],
    final_answer: Any,
    traces: Dict[str, Any],
) -> List[Dict[str, Any]]

Evaluate model response.

execution_loop

execution_loop(
    agents: Sequence[AgentAdapter],
    task: Task,
    environment: Environment,
    user: Optional[User],
) -> Any

Execute agents with optional user interaction loop.

This method orchestrates the agent-user interaction pattern. When a user is present, the user initiates the conversation using user.get_initial_query(). If no user is present, task.query is used as the initial query.

Interaction Flow

By default, agents execute once (max_invocations=1). For multi-turn interaction, set self.max_invocations > 1 in your benchmark's __init__. The loop continues until max_invocations is reached or user.is_done() returns True (e.g., max turns reached or stop token detected).

Note

Override this method in your benchmark subclass to implement custom interaction patterns (e.g., agent-initiated conversations, different termination conditions, or specialized query routing).

PARAMETER DESCRIPTION
agents

Agents to execute (typically the orchestrator).

TYPE: Sequence[AgentAdapter]

task

The task being solved.

TYPE: Task

environment

The environment providing tools and state.

TYPE: Environment

user

Optional user simulator. If provided, the user initiates and drives the conversation. If None, a single agent execution with task.query.

TYPE: Optional[User]

RETURNS DESCRIPTION
Any

Final answer from the last agent execution.

Example

For interactive benchmarks, enable multi-turn interaction::

def __init__(self, ...):
    super().__init__(...)
    self.max_invocations = 5  # Up to 5 agent-user exchanges

get_failed_tasks

get_failed_tasks(
    status_filter: Optional[
        Union[
            TaskExecutionStatus, List[TaskExecutionStatus]
        ]
    ] = None,
    reports: Optional[List[Dict[str, Any]]] = None,
) -> SequentialTaskQueue

Get tasks that failed during benchmark execution.

This method retrieves failed tasks based on their execution status, useful for debugging, retry logic, or failure analysis.

PARAMETER DESCRIPTION
status_filter

Filter by specific failure status(es). If None, returns all failed tasks (any status except SUCCESS). Can be a single TaskExecutionStatus or a list of them. Examples: - TaskExecutionStatus.TASK_EXECUTION_FAILED: Only tasks that failed during execution - TaskExecutionStatus.EVALUATION_FAILED: Only tasks where evaluation failed - [TaskExecutionStatus.TASK_EXECUTION_FAILED, TaskExecutionStatus.SETUP_FAILED]: Tasks that failed during execution or setup

TYPE: Optional[Union[TaskExecutionStatus, List[TaskExecutionStatus]]] DEFAULT: None

reports

Optional list of reports to analyze. If None, uses the reports from the last run() call. This allows analyzing externally stored or modified reports.

TYPE: Optional[List[Dict[str, Any]]] DEFAULT: None

RETURNS DESCRIPTION
SequentialTaskQueue

SequentialTaskQueue containing the failed tasks. Empty if no failures match the filter.

RAISES DESCRIPTION
RuntimeError

If reports is None and run() has not been executed yet.

How to use
# Run benchmark
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)

# Get all failed tasks (from internal state)
failed = benchmark.get_failed_tasks()
print(f"Failed: {len(failed)}/{len(benchmark.tasks)} tasks")

# Or work with returned reports (safe from internal state changes)
failed = benchmark.get_failed_tasks(reports=reports)

# Get only tasks that failed during execution (not evaluation)
execution_failures = benchmark.get_failed_tasks(
    TaskExecutionStatus.TASK_EXECUTION_FAILED,
    reports=reports
)

# Get setup and execution failures
critical_failures = benchmark.get_failed_tasks(
    status_filter=[
        TaskExecutionStatus.SETUP_FAILED,
        TaskExecutionStatus.TASK_EXECUTION_FAILED
    ],
    reports=reports
)

# Retry failed tasks elegantly - this is the key use case!
if len(failed) > 0:
    retry_reports = benchmark.run(tasks=failed)

# Or more concisely
reports = benchmark.run(tasks=tasks)
retry_reports = benchmark.run(tasks=benchmark.get_failed_tasks())

get_model_adapter abstractmethod

get_model_adapter(
    model_id: str, **kwargs: Any
) -> ModelAdapter

Provide a ModelAdapter for benchmark components that require LLM access.

Many benchmark components beyond the agents themselves require access to language models. Common examples include:

  • Tool simulators: Simulating tool responses when real APIs aren't available
  • User simulators: Generating realistic user responses in multi-turn dialogues
  • Judges/Evaluators: Using LLMs to assess agent performance against criteria
  • Reward models: Computing scores for reinforcement learning

This method centralizes model provisioning, giving you control over which models are used throughout the benchmark. Implement this to return a configured ModelAdapter for the requested model.

PARAMETER DESCRIPTION
model_id

The model identifier to use (e.g., "gemini-2.5-flash", "openrouter/google/gemini-2.5-flash", "gpt-4o"). This is passed by the benchmark when setting up components that need model access.

TYPE: str

**kwargs

Additional arguments for adapter creation or registration. Common kwargs: - register_category: Category for trace registration (e.g., "models") - register_name: Name for trace registration (e.g., "evaluator_user_gsr")

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
ModelAdapter

A ModelAdapter instance configured for the specified model. For proper tracing,

ModelAdapter

return a fresh adapter for each call rather than reusing instances. You can

ModelAdapter

still share the underlying API client for efficiency.

How to use

For proper tracing, register the adapter after creation using the kwargs:

def get_model_adapter(self, model_id: str, **kwargs: Any) -> ModelAdapter:
    adapter = GoogleGenAIModelAdapter(self.client, model_id=model_id)

    # Register for tracing if registration info provided
    category = kwargs.get("register_category", "models")
    name = kwargs.get("register_name", model_id)
    self.register(category, name, adapter)

    return adapter

The benchmark calls this method when setting up tools, user simulators, and evaluators. Each call creates a fresh adapter with its own trace log.

register

register(
    category: str,
    name: str,
    component: RegisterableComponent,
) -> RegisterableComponent

Register a component for comprehensive trace and configuration collection.

All core MASEval components (AgentAdapter, ModelAdapter, Environment, User, LLMSimulator, BenchmarkCallback) inherit from TraceableMixin and/or ConfigurableMixin, and are automatically registered for both trace and configuration collection before evaluation.

Note: Most components are automatically registered when returned from setup methods (setup_environment, setup_user, setup_agents). You only need to manually register additional components like models, simulators, or tools that aren't automatically captured.

PARAMETER DESCRIPTION
category

Component category (e.g., "agents", "models", "tools", "simulators", "callbacks", "user", "environment", "seeding"). Use plural form to match the structure in collect_all_traces() and collect_all_configs().

TYPE: str

name

Unique identifier for this component within its category

TYPE: str

component

Any object inheriting from TraceableMixin and/or ConfigurableMixin

TYPE: RegisterableComponent

RETURNS DESCRIPTION
RegisterableComponent

The component (for chaining convenience)

RAISES DESCRIPTION
ValueError

If the component is already registered under a different name

How to use

Most components are auto-registered. Manual registration is only needed for additional components:

def setup_agents(self, agent_data, environment, task, user):
    # Create model (needs manual registration)
    model = MyModelAdapter(...)
    self.register("models", "main_model", model)

    # Create agent (auto-registered when returned)
    agent = MyAgent(model=model)
    agent_adapter = AgentAdapter(agent, "agent1")

    # Environment and user are also auto-registered
    return [agent_adapter], {"agent1": agent_adapter}

Traces and configs are automatically collected before evaluation via collect_all_traces() and collect_all_configs() which are called internally by the run() method.

run

run(
    tasks: Union[
        Task, BaseTaskQueue, Iterable[Union[Task, dict]]
    ],
    agent_data: Dict[str, Any] | Iterable[Dict[str, Any]],
) -> List[Dict[str, Any]]

Initialize and execute the complete benchmark loop across all tasks.

PARAMETER DESCRIPTION
tasks

Task source for execution. Can be: - A single Task object - A BaseTaskQueue (SequentialTaskQueue, PriorityTaskQueue, or custom AdaptiveTaskQueue) - An iterable of Task objects or dicts that will be converted to Tasks

When a BaseTaskQueue is provided, it controls the task ordering. AdaptiveTaskQueue subclasses are automatically registered as callbacks to receive task completion notifications.

TYPE: Union[Task, BaseTaskQueue, Iterable[Union[Task, dict]]]

agent_data

Configuration for agents. Either a single dict applied to all tasks, or an iterable of dicts with one configuration per task. Agent data typically includes model parameters, agent architecture details, and tool specifications.

TYPE: Dict[str, Any] | Iterable[Dict[str, Any]]

RETURNS DESCRIPTION
List[Dict[str, Any]]

List of report dictionaries, one per task repetition. Each report contains:

List[Dict[str, Any]]
  • task_id: Task identifier (UUID)
List[Dict[str, Any]]
  • repeat_idx: Repetition index (0 to n_task_repeats-1)
List[Dict[str, Any]]
  • status: Execution status (one of TaskExecutionStatus enum values)
List[Dict[str, Any]]
  • traces: Execution traces from all registered components
List[Dict[str, Any]]
  • config: Configuration from all registered components and benchmark level
List[Dict[str, Any]]
  • eval: Evaluation results (None if task or evaluation failed)
List[Dict[str, Any]]
  • error: Error details dict (only present if status is not SUCCESS), containing:
  • error_type: Exception class name
  • error_message: Exception message
  • traceback: Full traceback string
RAISES DESCRIPTION
ValueError

If agent_data length doesn't match number of tasks (when agent_data is an iterable).

How to use

This is the framework's main orchestration method that runs your entire benchmark. It iterates through all tasks, handles repetitions, and manages the three-stage lifecycle for each execution. You don't implement this method—instead, you call it to start the benchmark after implementing the setup and execution methods.

By default, the benchmark will continue executing remaining tasks even if some fail. You can change this behavior by setting fail_on_task_error=True, fail_on_evaluation_error=True, or fail_on_setup_error=True when instantiating the benchmark. Each task execution returns a status indicating success or the specific failure type (see TaskExecutionStatus).

For each task execution, the framework:

  1. Calls your setup methods to initialize components
  2. Calls your run_agents() method to execute the task
  3. Collects message histories and calls evaluators
  4. Stores results and triggers callbacks

Pseudocode structure:

for task in tasks:
    for repeat in range(n_task_repeats):
        # Setup stage
        environment = setup_environment(agent_data, task)
        user = setup_user(agent_data, environment, task)
        agents_to_run, agents_dict = setup_agents(agent_data, environment, task, user)
        evaluators = setup_evaluators(environment, task, agents_to_run, user)

        # Run stage (execution_loop handles multi-turn if user exists)
        agents_output = execution_loop(agents_to_run, task, environment, user)

        # Evaluate stage
        traces = collect_message_histories(agents_dict)
        eval_results = evaluate(evaluators, traces, agents_dict)

        # Store results
        store_result(task_id, traces, eval_results)

Callback hooks are triggered at these points:

  • on_run_start: Before processing any tasks
  • on_task_start: Before processing a task (once per task, not per repeat)
  • on_task_repeat_start: Before each repetition of a task
  • on_task_repeat_end: After each repetition completes
  • on_task_end: After all repetitions of a task complete
  • on_run_end: After all tasks complete
# Typical usage
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)

# Analyze results
for report in reports:
    print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}: {report['eval']}")
    print(f"Config: {report['config']}")
    print(f"Traces: {report['traces']}")

# Parallel execution with 4 workers
benchmark = MyBenchmark(num_workers=4)
reports = benchmark.run(tasks=tasks, agent_data=config)

# Single agent config for all tasks
reports = benchmark.run(tasks=tasks, agent_data={"model": "gpt-4"})

# Task-specific agent configs (must match task count)
reports = benchmark.run(
    tasks=tasks,
    agent_data=[
        {"model": "gpt-4", "difficulty": "easy"},
        {"model": "gpt-4", "difficulty": "hard"},
    ]
)

# Priority-based execution
from maseval.core.task import PriorityTaskQueue
for task in tasks:
    task.protocol.priority = compute_priority(task)
queue = PriorityTaskQueue(tasks)
reports = benchmark.run(tasks=queue, agent_data=config)

# Adaptive queue (auto-registered as callback)
queue = MyAdaptiveTaskQueue(tasks)
reports = benchmark.run(tasks=queue)  # queue receives on_task_complete callbacks

run_agents

run_agents(
    agents: Sequence[AgentAdapter],
    task: Task,
    environment: Environment,
    query: str,
) -> Any

Execute agent on the MMLU prompt.

setup_agents abstractmethod

setup_agents(
    agent_data: Dict[str, Any],
    environment: Environment,
    task: Task,
    user: Optional[User],
    seed_generator: SeedGenerator,
) -> Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]

Instantiate and configure the agent system for a task.

Note: All agents in the returned agents_dict are automatically registered for tracing. You don't need to manually call register() for them. However, you should manually register models, simulators, or other components used by agents.

PARAMETER DESCRIPTION
agent_data

Configuration dict containing agent specifications, model parameters, and tool assignments for this task.

TYPE: Dict[str, Any]

environment

The initialized environment providing tools to the agents.

TYPE: Environment

task

The Task object with query and metadata.

TYPE: Task

user

Optional user simulator for agent-user interactions.

TYPE: Optional[User]

seed_generator

Seed generator for deriving deterministic seeds for agents and their models. Use per_repetition=True for agents that should vary across repetitions, or per_repetition=False for baseline agents that should remain constant. derive_seed() returns None if seeding is disabled (global_seed=None).

TYPE: SeedGenerator

RETURNS DESCRIPTION
Sequence[AgentAdapter]

A tuple of (agents_to_run, agents_dict) where:

Dict[str, AgentAdapter]
  • agents_to_run: Sequence of agents to invoke in run_agents() (typically 1 orchestrator)
Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]
  • agents_dict: Dictionary mapping agent names/IDs to all agent instances for monitoring
How to use

This method constructs your agent architecture—single agent, multiple collaborative agents, or an orchestrator managing workers. Each agent is wrapped in AgentAdapter for uniform message history tracking.

The dual return structure serves different purposes:

  • agents_to_run: Only agents directly invoked in run_agents() (typically the orchestrator)
  • agents_dict: All agents in the system for message history collection from workers called indirectly through the orchestrator
def setup_agents(self, agent_data, environment, task, user, seed_generator):
    # Use child() for logical paths like "agents/experimental"
    # derive_seed() returns None if seeding is disabled
    agent_gen = seed_generator.child("agents")

    # Vary experimental agent per rep, keep baseline constant
    experimental_seed = agent_gen.derive_seed("experimental", per_repetition=True)
    baseline_seed = agent_gen.derive_seed("baseline", per_repetition=False)

    # For worker agents, nest further: "agents/workers/analyst"
    worker_gen = agent_gen.child("workers")
    analyst_seed = worker_gen.derive_seed("analyst")

    # Create agents with seeds (model adapters accept Optional[int])
    model = self.get_model_adapter(model_id, seed=experimental_seed)
    # ... create agents ...

    return [orchestrator_adapter], all_agents

setup_environment

setup_environment(
    agent_data: Dict[str, Any],
    task: Task,
    seed_generator: SeedGenerator,
) -> MMLUEnvironment

Create environment for a task.

setup_evaluators

setup_evaluators(
    environment: Environment,
    task: Task,
    agents: Sequence[AgentAdapter],
    user: Optional[User],
    seed_generator: SeedGenerator,
) -> Sequence[Evaluator]

Create MMLU evaluator.

setup_user

setup_user(
    agent_data: Dict[str, Any],
    environment: Environment,
    task: Task,
    seed_generator: SeedGenerator,
) -> Optional[User]

Create an optional user simulator for interactive tasks.

This method is optional. Return None if your benchmark does not require user simulation.

Note: The returned user is automatically registered for tracing. You don't need to manually call register() for it.

PARAMETER DESCRIPTION
agent_data

Configuration dict containing agent specifications and settings that may influence user simulator setup (e.g., framework type for creating compatible tools).

TYPE: Dict[str, Any]

environment

The environment instance created for this task.

TYPE: Environment

task

The Task object with user profile data or scenario information.

TYPE: Task

seed_generator

Seed generator for deriving deterministic seeds for the user simulator. derive_seed() returns None if seeding is disabled (global_seed=None).

TYPE: SeedGenerator

RETURNS DESCRIPTION
Optional[User]

A User instance that can respond to agent queries, or None if not needed.

How to use

User simulators enable agent-user interactions by responding to queries with preferences, clarifications, or feedback. Useful for benchmarks testing conversational agents or systems requiring user input during execution.

def setup_user(self, agent_data, environment, task, seed_generator):
    # Use child() to create logical namespace - results in "simulators/user"
    # derive_seed() returns None if seeding is disabled
    sim_gen = seed_generator.child("simulators")
    user_seed = sim_gen.derive_seed("user")  # Optional[int]

    user_model = self.get_model_adapter(model_id, seed=user_seed)
    return LLMUser(model=user_model, ...)

# Or skip user simulation entirely
def setup_user(self, agent_data, environment, task, seed_generator):
    return None

The user is automatically registered for tracing when returned.

DefaultMMLUBenchmark

Bases: MMLUBenchmark

MMLU Benchmark using HuggingFace transformers models.

This concrete implementation uses log-likelihood based MCQ evaluation via HuggingFaceModelScorer, with the same optimisations as lm-evaluation-harness:

  1. Single forward pass per question (one-token continuation optimisation)
  2. Efficient log-softmax computation
  3. Proper left-padding for batch processing

Agents are created using a scorer-backed adapter (see _ScorerBackedAdapter).

seed_generator property

seed_generator: SeedGenerator

The seed generator for this benchmark.

The seed generator is configured at benchmark initialization via the seed or seed_generator parameters. When seed=None (the default), the generator's derive_seed() method returns None, effectively disabling seeding while maintaining a uniform interface.

RETURNS DESCRIPTION
SeedGenerator

The root SeedGenerator instance.

usage property

usage: Usage

Running usage total across all task repetitions.

Queryable at any time, including while the benchmark is still running. Returns the grand total of all usage collected so far.

usage_by_component property

usage_by_component: Dict[str, Usage]

Per-component running usage totals across all repetitions.

Keys are registry keys (e.g., "models:main_model").

__init__

__init__(
    model_id: str,
    device: str = DEFAULT_DEVICE,
    trust_remote_code: bool = True,
    use_full_prompt: bool = True,
    batch_size: int = DEFAULT_BATCH_SIZE,
    **kwargs: Any,
)

Initialize HuggingFace MMLU benchmark.

PARAMETER DESCRIPTION
model_id

HuggingFace model identifier.

TYPE: str

device

Device to run model on.

TYPE: str DEFAULT: DEFAULT_DEVICE

trust_remote_code

Trust remote code when loading model (default True).

TYPE: bool DEFAULT: True

use_full_prompt

Use full prompt with few-shot examples (default True).

TYPE: bool DEFAULT: True

batch_size

Batch size for lm-eval batching (number of questions per batch).

TYPE: int DEFAULT: DEFAULT_BATCH_SIZE

**kwargs

Additional arguments passed to MMLUBenchmark.

TYPE: Any DEFAULT: {}

add_callback

add_callback(callback: BenchmarkCallback) -> None

Register a callback handler to monitor benchmark execution.

PARAMETER DESCRIPTION
callback

A BenchmarkCallback instance that will receive execution events.

TYPE: BenchmarkCallback

How to use

Callbacks receive notifications at key lifecycle points for tracing, progress tracking, or custom metrics collection. See BenchmarkCallback for available hooks and their signatures.

from maseval.core.callbacks import MessageTracingCallback

benchmark = MyBenchmark(tasks=tasks, agent_data=config)
benchmark.add_callback(MessageTracingCallback(output_dir="logs"))
results = benchmark.run()

clear_registry

clear_registry() -> None

Clear the component registry after a task repetition completes.

This method is called automatically by run() after each task repetition to ensure components are not carried over between repetitions. The reports list persists across all repetitions for aggregated analysis.

collect_all_configs

collect_all_configs() -> Dict[str, Any]

Collect configuration from all registered components for the current task repetition.

This method is called automatically by run() after each task repetition completes and before evaluation begins. It gathers comprehensive configuration from all registered components (agents, models, tools, simulators, callbacks, etc.) for that specific repetition. After collection, the registry is cleared for the next repetition.

The collected configs are stored in benchmark.reports list along with traces for persistent access across all task repetitions.

Output fields:

  • metadata - Collection timestamp and thread info
  • agents - Dict mapping agent names to their config (settings, parameters)
  • models - Dict mapping model names to their config (model IDs, parameters)
  • tools - Dict mapping tool names to their config (specifications, settings)
  • simulators - Dict mapping simulator names to their config (parameters, templates)
  • callbacks - Dict mapping callback names to their config (settings)
  • environment - Direct config from the environment (not nested), or None if not present
  • user - Direct config from the user simulator (not nested), or None if not present
  • other - Dict for any other registered components
  • benchmark - Benchmark-level configuration (git, system, packages)
RETURNS DESCRIPTION
Dict[str, Any]

Structured dictionary containing configuration from all registered components.

How to use

This method is called automatically by run() after each task repetition:

# Automatic collection (recommended)
results = benchmark.run()

# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
    print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
    # Agents is a dict: agent_name -> config
    print(f"Agent config: {report['config']['agents']['my_agent']}")
    # Environment and user are direct (not nested)
    print(f"Environment config: {report['config']['environment']}")
    print(f"User config: {report['config']['user']}")
    # Benchmark-level config
    print(f"Git commit: {report['config']['benchmark']['git']['commit_hash']}")

The collected configs are available in the results for reproducibility analysis.

collect_all_traces

collect_all_traces() -> Dict[str, Any]

Collect execution traces from all registered components for the current task repetition.

This method is called automatically by run() after each task repetition completes and before evaluation begins. It gathers comprehensive traces from all registered components (agents, models, tools, simulators, callbacks, etc.) for that specific repetition. After collection, the registry is cleared for the next repetition.

The collected traces are stored in benchmark.reports list along with configs for persistent access across all task repetitions.

Output fields:

  • metadata - Collection timestamp and thread info
  • agents - Dict mapping agent names to their traces (messages, execution data)
  • models - Dict mapping model names to their traces (API calls, timing, errors)
  • tools - Dict mapping tool names to their traces (invocations, parameters)
  • simulators - Dict mapping simulator names to their traces (attempts, outcomes)
  • callbacks - Dict mapping callback names to their traces (custom data)
  • environment - Direct traces from the environment (not nested), or None if not present
  • user - Direct traces from the user simulator (not nested), or None if not present
  • other - Dict for any other registered components
RETURNS DESCRIPTION
Dict[str, Any]

Structured dictionary containing execution traces from all registered components.

How to use

This method is called automatically by run() after each task repetition:

# Automatic collection (recommended)
results = benchmark.run()

# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
    print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
    # Agents is a dict: agent_name -> traces
    print(f"Agent messages: {report['traces']['agents']['my_agent']}")
    # Environment and user are direct (not nested)
    print(f"Environment state: {report['traces']['environment']}")
    print(f"User interactions: {report['traces']['user']}")

The collected traces are passed to the evaluator's evaluate() method and stored in benchmark.reports for later analysis.

collect_all_usage

collect_all_usage() -> Dict[str, Any]

Collect usage from all registered components for the current task repetition.

This method is called automatically by run() after each task repetition completes. It gathers usage from all registered UsageTrackableMixin components and also accumulates into persistent running totals accessible via usage and usage_by_component.

RETURNS DESCRIPTION
Dict[str, Any]

Structured dictionary containing usage from all registered components.

evaluate

evaluate(
    evaluators: Sequence[Evaluator],
    agents: Dict[str, AgentAdapter],
    final_answer: Any,
    traces: Dict[str, Any],
) -> List[Dict[str, Any]]

Evaluate model response.

execution_loop

execution_loop(
    agents: Sequence[AgentAdapter],
    task: Task,
    environment: Environment,
    user: Optional[User],
) -> Any

Execute agents with optional user interaction loop.

This method orchestrates the agent-user interaction pattern. When a user is present, the user initiates the conversation using user.get_initial_query(). If no user is present, task.query is used as the initial query.

Interaction Flow

By default, agents execute once (max_invocations=1). For multi-turn interaction, set self.max_invocations > 1 in your benchmark's __init__. The loop continues until max_invocations is reached or user.is_done() returns True (e.g., max turns reached or stop token detected).

Note

Override this method in your benchmark subclass to implement custom interaction patterns (e.g., agent-initiated conversations, different termination conditions, or specialized query routing).

PARAMETER DESCRIPTION
agents

Agents to execute (typically the orchestrator).

TYPE: Sequence[AgentAdapter]

task

The task being solved.

TYPE: Task

environment

The environment providing tools and state.

TYPE: Environment

user

Optional user simulator. If provided, the user initiates and drives the conversation. If None, a single agent execution with task.query.

TYPE: Optional[User]

RETURNS DESCRIPTION
Any

Final answer from the last agent execution.

Example

For interactive benchmarks, enable multi-turn interaction::

def __init__(self, ...):
    super().__init__(...)
    self.max_invocations = 5  # Up to 5 agent-user exchanges

get_failed_tasks

get_failed_tasks(
    status_filter: Optional[
        Union[
            TaskExecutionStatus, List[TaskExecutionStatus]
        ]
    ] = None,
    reports: Optional[List[Dict[str, Any]]] = None,
) -> SequentialTaskQueue

Get tasks that failed during benchmark execution.

This method retrieves failed tasks based on their execution status, useful for debugging, retry logic, or failure analysis.

PARAMETER DESCRIPTION
status_filter

Filter by specific failure status(es). If None, returns all failed tasks (any status except SUCCESS). Can be a single TaskExecutionStatus or a list of them. Examples: - TaskExecutionStatus.TASK_EXECUTION_FAILED: Only tasks that failed during execution - TaskExecutionStatus.EVALUATION_FAILED: Only tasks where evaluation failed - [TaskExecutionStatus.TASK_EXECUTION_FAILED, TaskExecutionStatus.SETUP_FAILED]: Tasks that failed during execution or setup

TYPE: Optional[Union[TaskExecutionStatus, List[TaskExecutionStatus]]] DEFAULT: None

reports

Optional list of reports to analyze. If None, uses the reports from the last run() call. This allows analyzing externally stored or modified reports.

TYPE: Optional[List[Dict[str, Any]]] DEFAULT: None

RETURNS DESCRIPTION
SequentialTaskQueue

SequentialTaskQueue containing the failed tasks. Empty if no failures match the filter.

RAISES DESCRIPTION
RuntimeError

If reports is None and run() has not been executed yet.

How to use
# Run benchmark
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)

# Get all failed tasks (from internal state)
failed = benchmark.get_failed_tasks()
print(f"Failed: {len(failed)}/{len(benchmark.tasks)} tasks")

# Or work with returned reports (safe from internal state changes)
failed = benchmark.get_failed_tasks(reports=reports)

# Get only tasks that failed during execution (not evaluation)
execution_failures = benchmark.get_failed_tasks(
    TaskExecutionStatus.TASK_EXECUTION_FAILED,
    reports=reports
)

# Get setup and execution failures
critical_failures = benchmark.get_failed_tasks(
    status_filter=[
        TaskExecutionStatus.SETUP_FAILED,
        TaskExecutionStatus.TASK_EXECUTION_FAILED
    ],
    reports=reports
)

# Retry failed tasks elegantly - this is the key use case!
if len(failed) > 0:
    retry_reports = benchmark.run(tasks=failed)

# Or more concisely
reports = benchmark.run(tasks=tasks)
retry_reports = benchmark.run(tasks=benchmark.get_failed_tasks())

get_model_adapter

get_model_adapter(
    model_id: str, **kwargs: Any
) -> ModelAdapter

Not used — DefaultMMLUBenchmark uses HuggingFaceModelScorer.

RAISES DESCRIPTION
NotImplementedError

Always. Use HuggingFaceModelScorer via self._scorer for log-likelihood evaluation.

precompute_all_logprobs_lmeval

precompute_all_logprobs_lmeval(
    tasks: Sequence[Task],
) -> Dict[Any, List[float]]

Precompute log-likelihoods for ALL tasks using lm-eval's batching.

CRITICAL: lm-evaluation-harness batches ALL requests together and uses its Collator class to reorder/group them. This affects floating-point precision for some edge cases. To get EXACT matches, we must process ALL requests together in a single batch.

This method: 1. Creates Instance objects for all task/choice combinations 2. Calls lm-eval's HFLM.loglikelihood() with ALL instances 3. Returns a mapping from doc_id to logprobs

PARAMETER DESCRIPTION
tasks

Iterable of Task objects with prompt and choices.

TYPE: Sequence[Task]

RETURNS DESCRIPTION
Dict[Any, List[float]]

Dict mapping doc_id -> list of log-likelihoods for each choice.

register

register(
    category: str,
    name: str,
    component: RegisterableComponent,
) -> RegisterableComponent

Register a component for comprehensive trace and configuration collection.

All core MASEval components (AgentAdapter, ModelAdapter, Environment, User, LLMSimulator, BenchmarkCallback) inherit from TraceableMixin and/or ConfigurableMixin, and are automatically registered for both trace and configuration collection before evaluation.

Note: Most components are automatically registered when returned from setup methods (setup_environment, setup_user, setup_agents). You only need to manually register additional components like models, simulators, or tools that aren't automatically captured.

PARAMETER DESCRIPTION
category

Component category (e.g., "agents", "models", "tools", "simulators", "callbacks", "user", "environment", "seeding"). Use plural form to match the structure in collect_all_traces() and collect_all_configs().

TYPE: str

name

Unique identifier for this component within its category

TYPE: str

component

Any object inheriting from TraceableMixin and/or ConfigurableMixin

TYPE: RegisterableComponent

RETURNS DESCRIPTION
RegisterableComponent

The component (for chaining convenience)

RAISES DESCRIPTION
ValueError

If the component is already registered under a different name

How to use

Most components are auto-registered. Manual registration is only needed for additional components:

def setup_agents(self, agent_data, environment, task, user):
    # Create model (needs manual registration)
    model = MyModelAdapter(...)
    self.register("models", "main_model", model)

    # Create agent (auto-registered when returned)
    agent = MyAgent(model=model)
    agent_adapter = AgentAdapter(agent, "agent1")

    # Environment and user are also auto-registered
    return [agent_adapter], {"agent1": agent_adapter}

Traces and configs are automatically collected before evaluation via collect_all_traces() and collect_all_configs() which are called internally by the run() method.

run

run(
    tasks: Union[
        Task, BaseTaskQueue, Iterable[Union[Task, dict]]
    ],
    agent_data: Dict[str, Any] | Iterable[Dict[str, Any]],
) -> List[Dict[str, Any]]

Initialize and execute the complete benchmark loop across all tasks.

PARAMETER DESCRIPTION
tasks

Task source for execution. Can be: - A single Task object - A BaseTaskQueue (SequentialTaskQueue, PriorityTaskQueue, or custom AdaptiveTaskQueue) - An iterable of Task objects or dicts that will be converted to Tasks

When a BaseTaskQueue is provided, it controls the task ordering. AdaptiveTaskQueue subclasses are automatically registered as callbacks to receive task completion notifications.

TYPE: Union[Task, BaseTaskQueue, Iterable[Union[Task, dict]]]

agent_data

Configuration for agents. Either a single dict applied to all tasks, or an iterable of dicts with one configuration per task. Agent data typically includes model parameters, agent architecture details, and tool specifications.

TYPE: Dict[str, Any] | Iterable[Dict[str, Any]]

RETURNS DESCRIPTION
List[Dict[str, Any]]

List of report dictionaries, one per task repetition. Each report contains:

List[Dict[str, Any]]
  • task_id: Task identifier (UUID)
List[Dict[str, Any]]
  • repeat_idx: Repetition index (0 to n_task_repeats-1)
List[Dict[str, Any]]
  • status: Execution status (one of TaskExecutionStatus enum values)
List[Dict[str, Any]]
  • traces: Execution traces from all registered components
List[Dict[str, Any]]
  • config: Configuration from all registered components and benchmark level
List[Dict[str, Any]]
  • eval: Evaluation results (None if task or evaluation failed)
List[Dict[str, Any]]
  • error: Error details dict (only present if status is not SUCCESS), containing:
  • error_type: Exception class name
  • error_message: Exception message
  • traceback: Full traceback string
RAISES DESCRIPTION
ValueError

If agent_data length doesn't match number of tasks (when agent_data is an iterable).

How to use

This is the framework's main orchestration method that runs your entire benchmark. It iterates through all tasks, handles repetitions, and manages the three-stage lifecycle for each execution. You don't implement this method—instead, you call it to start the benchmark after implementing the setup and execution methods.

By default, the benchmark will continue executing remaining tasks even if some fail. You can change this behavior by setting fail_on_task_error=True, fail_on_evaluation_error=True, or fail_on_setup_error=True when instantiating the benchmark. Each task execution returns a status indicating success or the specific failure type (see TaskExecutionStatus).

For each task execution, the framework:

  1. Calls your setup methods to initialize components
  2. Calls your run_agents() method to execute the task
  3. Collects message histories and calls evaluators
  4. Stores results and triggers callbacks

Pseudocode structure:

for task in tasks:
    for repeat in range(n_task_repeats):
        # Setup stage
        environment = setup_environment(agent_data, task)
        user = setup_user(agent_data, environment, task)
        agents_to_run, agents_dict = setup_agents(agent_data, environment, task, user)
        evaluators = setup_evaluators(environment, task, agents_to_run, user)

        # Run stage (execution_loop handles multi-turn if user exists)
        agents_output = execution_loop(agents_to_run, task, environment, user)

        # Evaluate stage
        traces = collect_message_histories(agents_dict)
        eval_results = evaluate(evaluators, traces, agents_dict)

        # Store results
        store_result(task_id, traces, eval_results)

Callback hooks are triggered at these points:

  • on_run_start: Before processing any tasks
  • on_task_start: Before processing a task (once per task, not per repeat)
  • on_task_repeat_start: Before each repetition of a task
  • on_task_repeat_end: After each repetition completes
  • on_task_end: After all repetitions of a task complete
  • on_run_end: After all tasks complete
# Typical usage
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)

# Analyze results
for report in reports:
    print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}: {report['eval']}")
    print(f"Config: {report['config']}")
    print(f"Traces: {report['traces']}")

# Parallel execution with 4 workers
benchmark = MyBenchmark(num_workers=4)
reports = benchmark.run(tasks=tasks, agent_data=config)

# Single agent config for all tasks
reports = benchmark.run(tasks=tasks, agent_data={"model": "gpt-4"})

# Task-specific agent configs (must match task count)
reports = benchmark.run(
    tasks=tasks,
    agent_data=[
        {"model": "gpt-4", "difficulty": "easy"},
        {"model": "gpt-4", "difficulty": "hard"},
    ]
)

# Priority-based execution
from maseval.core.task import PriorityTaskQueue
for task in tasks:
    task.protocol.priority = compute_priority(task)
queue = PriorityTaskQueue(tasks)
reports = benchmark.run(tasks=queue, agent_data=config)

# Adaptive queue (auto-registered as callback)
queue = MyAdaptiveTaskQueue(tasks)
reports = benchmark.run(tasks=queue)  # queue receives on_task_complete callbacks

run_agents

run_agents(
    agents: Sequence[AgentAdapter],
    task: Task,
    environment: Environment,
    query: str = "",
) -> Any

Execute log-likelihood based MCQ evaluation.

Uses precomputed logprobs if available (for exact lm-eval match), otherwise delegates to HuggingFaceModelScorer.loglikelihood_choices() which automatically picks single-token or multi-token scoring.

setup_agents

setup_agents(
    agent_data: Dict[str, Any],
    environment: Environment,
    task: Task,
    user: Optional[User],
    seed_generator: SeedGenerator,
) -> Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]

Create scorer-backed agent for MCQ evaluation.

The returned adapter is a tracing container — actual evaluation is driven by self._scorer in run_agents().

PARAMETER DESCRIPTION
agent_data

Agent config (unused; model is set at __init__).

TYPE: Dict[str, Any]

environment

MMLU environment.

TYPE: Environment

task

Current task.

TYPE: Task

user

Unused.

TYPE: Optional[User]

seed_generator

Seed generator (unused for MMLU).

TYPE: SeedGenerator

RETURNS DESCRIPTION
Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]

Tuple of (agents_to_run, agents_dict).

setup_environment

setup_environment(
    agent_data: Dict[str, Any],
    task: Task,
    seed_generator: SeedGenerator,
) -> MMLUEnvironment

Create environment for a task.

setup_evaluators

setup_evaluators(
    environment: Environment,
    task: Task,
    agents: Sequence[AgentAdapter],
    user: Optional[User],
    seed_generator: SeedGenerator,
) -> Sequence[Evaluator]

Create MMLU evaluator.

setup_user

setup_user(
    agent_data: Dict[str, Any],
    environment: Environment,
    task: Task,
    seed_generator: SeedGenerator,
) -> Optional[User]

Create an optional user simulator for interactive tasks.

This method is optional. Return None if your benchmark does not require user simulation.

Note: The returned user is automatically registered for tracing. You don't need to manually call register() for it.

PARAMETER DESCRIPTION
agent_data

Configuration dict containing agent specifications and settings that may influence user simulator setup (e.g., framework type for creating compatible tools).

TYPE: Dict[str, Any]

environment

The environment instance created for this task.

TYPE: Environment

task

The Task object with user profile data or scenario information.

TYPE: Task

seed_generator

Seed generator for deriving deterministic seeds for the user simulator. derive_seed() returns None if seeding is disabled (global_seed=None).

TYPE: SeedGenerator

RETURNS DESCRIPTION
Optional[User]

A User instance that can respond to agent queries, or None if not needed.

How to use

User simulators enable agent-user interactions by responding to queries with preferences, clarifications, or feedback. Useful for benchmarks testing conversational agents or systems requiring user input during execution.

def setup_user(self, agent_data, environment, task, seed_generator):
    # Use child() to create logical namespace - results in "simulators/user"
    # derive_seed() returns None if seeding is disabled
    sim_gen = seed_generator.child("simulators")
    user_seed = sim_gen.derive_seed("user")  # Optional[int]

    user_model = self.get_model_adapter(model_id, seed=user_seed)
    return LLMUser(model=user_model, ...)

# Or skip user simulation entirely
def setup_user(self, agent_data, environment, task, seed_generator):
    return None

The user is automatically registered for tracing when returned.

MMLUEnvironment

Bases: Environment

Simple environment for MMLU multiple choice evaluation.

MMLU tasks don't require tools - the environment just holds the task context (question, choices, etc.).

create_tools

create_tools() -> Dict[str, Any]

MMLU doesn't use tools.

gather_config

gather_config() -> dict[str, Any]

Gather configuration from this environment.

Output fields:

  • type - Component class name
  • gathered_at - ISO timestamp
  • tool_count - Number of tools
  • tool_names - List of tool names
RETURNS DESCRIPTION
dict[str, Any]

Dictionary containing environment configuration.

gather_traces

gather_traces() -> dict[str, Any]

Gather execution traces from this environment and its tools.

Output fields:

  • type - Component class name
  • gathered_at - ISO timestamp
  • tool_count - Number of tools in environment
  • tools - Dictionary of tool traces keyed by tool name
RETURNS DESCRIPTION
dict[str, Any]

Dictionary containing environment execution traces.

get_prompt

get_prompt() -> str

Get the prompt to send to the model.

Returns full_prompt if use_full_prompt is True, otherwise query.

get_tool

get_tool(name: str) -> Optional[Any]

Get a tool by name.

PARAMETER DESCRIPTION
name

Tool name

TYPE: str

RETURNS DESCRIPTION
Optional[Any]

The tool, or None if not found

get_tools

get_tools() -> Dict[str, Any]

Get all tools as a dict.

setup_state

setup_state(task_data: Dict[str, Any]) -> Dict[str, Any]

Initialize state from task data.

PARAMETER DESCRIPTION
task_data

Must contain "query" (str) and "environment_data" (dict with "choices", "full_prompt", "use_full_prompt").

TYPE: Dict[str, Any]

MMLUEvaluator

Bases: Evaluator

Evaluator for MMLU multiple choice questions.

Computes accuracy metrics (acc and acc_norm) by comparing model predictions with gold answers.

__call__

__call__(
    traces: Dict[str, Any],
    final_answer: Optional[str] = None,
) -> Dict[str, Any]

Evaluate the model's response.

PARAMETER DESCRIPTION
traces

Filtered traces with messages.

TYPE: Dict[str, Any]

final_answer

The model's final answer.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
Dict[str, Any]

Dict with acc, acc_norm, predicted, gold, correct, parse_failed, and optionally logprobs fields.

__init__

__init__(
    task: Task,
    environment: Environment,
    user: Optional[Any] = None,
)

Initialize MMLU evaluator.

PARAMETER DESCRIPTION
task

Task being evaluated. Must have evaluation_data["gold"] (int) with the correct answer index.

TYPE: Task

environment

Environment (provides choices).

TYPE: Environment

user

Unused for MMLU.

TYPE: Optional[Any] DEFAULT: None

filter_traces

filter_traces(traces: Dict[str, Any]) -> Dict[str, Any]

Extract relevant traces for evaluation.

For MMLU, we need the model's response from agent traces.

load_tasks

load_tasks(
    data_path: Union[str, Path],
    anchor_points_path: Optional[Union[str, Path]] = None,
    limit: Optional[int] = None,
) -> Union[DISCOQueue, SequentialTaskQueue]

Load MMLU tasks from JSON file.

PARAMETER DESCRIPTION
data_path

Path to MMLU prompts JSON file (mmlu_prompts_examples.json format).

TYPE: Union[str, Path]

anchor_points_path

Optional path to anchor points pickle file. If provided, returns an DISCOQueue that evaluates only the anchor tasks in order.

TYPE: Optional[Union[str, Path]] DEFAULT: None

limit

Optional limit on number of tasks to load.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
Union[DISCOQueue, SequentialTaskQueue]

TaskQueue containing MMLU tasks.

compute_benchmark_metrics

compute_benchmark_metrics(
    results: List[Dict[str, Any]],
) -> Dict[str, Any]

Compute summary metrics across all benchmark results.

PARAMETER DESCRIPTION
results

List of result dicts from benchmark.run().

TYPE: List[Dict[str, Any]]

RETURNS DESCRIPTION
Dict[str, Any]

Dict with accuracy metrics and task counts.