MMLU: Massive Multitask Language Understanding (Beta)
Beta
This benchmark has been implemented carefully, but we have not yet validated the results against the original implementation. Use with caution when comparing with existing results or the original paper's numbers. Contributions and compute donations welcome!
The MMLU Benchmark evaluates language models on multiple-choice questions spanning 57 academic subjects. The MASEval integration supports anchor-point-based evaluation for DISCO prediction, enabling efficient estimation of full benchmark performance from a subset of tasks.
Overview
MMLU (Hendrycks et al., 2021) is a widely used benchmark for measuring knowledge and reasoning across diverse domains. The MASEval implementation features:
- Log-likelihood MCQ evaluation matching lm-evaluation-harness methodology
- Anchor-point task selection via
DISCOQueuefor DISCO-style subset evaluation - HuggingFace integration with batched log-probability computation
- lm-eval compatibility mode for exact numerical reproduction
Check out the BENCHMARKS.md file for more information including licenses.
Installation
Install MMLU with all dependencies needed to run the HuggingFace benchmark and example script:
pip install maseval[mmlu]
Or with uv:
uv sync --extra mmlu
This installs transformers, torch, numpy, and huggingface_hub (the latter two via transformers). You can then run the example:
python examples/mmlu_benchmark/mmlu_benchmark.py --model_id alignment-handbook/zephyr-7b-sft-full
For DISCO prediction support:
pip install maseval[disco]
For exact lm-evaluation-harness reproduction:
pip install maseval[lm-eval]
Quick Start
from maseval.benchmark.mmlu import (
DefaultMMLUBenchmark,
load_tasks,
compute_benchmark_metrics,
)
# Load tasks (downloads from HuggingFace automatically)
tasks = load_tasks(data_path="/path/to/mmlu_prompts_examples.json")
# Create benchmark with HuggingFace model
benchmark = DefaultMMLUBenchmark(
model_id="meta-llama/Llama-2-7b-hf",
device="cuda:0",
)
# Run evaluation
results = benchmark.run(
tasks=tasks,
agent_data={"model_id": "meta-llama/Llama-2-7b-hf"},
)
# Compute metrics
metrics = compute_benchmark_metrics(results)
print(f"Accuracy: {metrics['acc']:.4f}")
With Anchor Points (DISCO)
from maseval.benchmark.mmlu import load_tasks
# Load tasks filtered to anchor points
tasks = load_tasks(
data_path="/path/to/mmlu_prompts_examples.json",
anchor_points_path="/path/to/anchor_points.json",
)
# tasks is a DISCOQueue — only anchor tasks are evaluated
print(f"Evaluating {len(tasks)} anchor tasks")
Custom Benchmark Subclass
MMLUBenchmark is a framework-agnostic base class. To use a different model backend, subclass it and implement setup_agents() and get_model_adapter():
from maseval import AgentAdapter
from maseval.core.history import MessageHistory
from maseval.benchmark.mmlu import MMLUBenchmark
class MyAgentAdapter(AgentAdapter):
def __init__(self, model, name):
super().__init__(model, name)
self._messages = []
def _run_agent(self, query):
self._messages.append({"role": "user", "content": query})
response = self.agent.generate(query)
self._messages.append({"role": "assistant", "content": response})
return response
def get_messages(self):
return MessageHistory(self._messages)
class MyMMLUBenchmark(MMLUBenchmark):
def setup_agents(self, agent_data, environment, task, user, seed_generator):
model = self.get_model_adapter(agent_data["model_id"])
adapter = MyAgentAdapter(model, name="mmlu_agent")
return [adapter], {"mmlu_agent": adapter}
def get_model_adapter(self, model_id, **kwargs):
adapter = MyModelAdapter(model_id)
register_name = kwargs.get("register_name")
if register_name:
self.register("models", register_name, adapter)
return adapter
API Reference
MMLUBenchmark
Bases: Benchmark
MMLU Benchmark - Framework-agnostic base class.
Evaluates language models on MMLU multiple choice questions. Supports anchor point-based evaluation for DISCO prediction.
Subclasses must implement:
setup_agents()- create agents for MCQ evaluationget_model_adapter()- provide model adapters
For a ready-to-use implementation, see DefaultMMLUBenchmark.
seed_generator
property
seed_generator: SeedGenerator
The seed generator for this benchmark.
The seed generator is configured at benchmark initialization via the seed
or seed_generator parameters. When seed=None (the default), the generator's
derive_seed() method returns None, effectively disabling seeding while
maintaining a uniform interface.
| RETURNS | DESCRIPTION |
|---|---|
SeedGenerator
|
The root |
usage
property
usage: Usage
Running usage total across all task repetitions.
Queryable at any time, including while the benchmark is still running. Returns the grand total of all usage collected so far.
usage_by_component
property
usage_by_component: Dict[str, Usage]
Per-component running usage totals across all repetitions.
Keys are registry keys (e.g., "models:main_model").
__init__
__init__(
use_full_prompt: bool = False,
callbacks: Optional[List[Any]] = None,
n_task_repeats: int = 1,
**kwargs: Any,
)
Initialize benchmark.
| PARAMETER | DESCRIPTION |
|---|---|
use_full_prompt
|
If True, use full_prompt (with few-shot examples) instead of just the query.
TYPE:
|
callbacks
|
Benchmark callbacks.
TYPE:
|
n_task_repeats
|
Repetitions per task.
TYPE:
|
add_callback
add_callback(callback: BenchmarkCallback) -> None
Register a callback handler to monitor benchmark execution.
| PARAMETER | DESCRIPTION |
|---|---|
callback
|
A BenchmarkCallback instance that will receive execution events.
TYPE:
|
How to use
Callbacks receive notifications at key lifecycle points for tracing, progress tracking,
or custom metrics collection. See BenchmarkCallback
for available hooks and their signatures.
from maseval.core.callbacks import MessageTracingCallback
benchmark = MyBenchmark(tasks=tasks, agent_data=config)
benchmark.add_callback(MessageTracingCallback(output_dir="logs"))
results = benchmark.run()
clear_registry
clear_registry() -> None
Clear the component registry after a task repetition completes.
This method is called automatically by run() after each task repetition
to ensure components are not carried over between repetitions. The
reports list persists across all repetitions for aggregated analysis.
collect_all_configs
collect_all_configs() -> Dict[str, Any]
Collect configuration from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition completes
and before evaluation begins. It gathers comprehensive configuration from all registered
components (agents, models, tools, simulators, callbacks, etc.) for that specific
repetition. After collection, the registry is cleared for the next repetition.
The collected configs are stored in benchmark.reports list along with traces
for persistent access across all task repetitions.
Output fields:
metadata- Collection timestamp and thread infoagents- Dict mapping agent names to their config (settings, parameters)models- Dict mapping model names to their config (model IDs, parameters)tools- Dict mapping tool names to their config (specifications, settings)simulators- Dict mapping simulator names to their config (parameters, templates)callbacks- Dict mapping callback names to their config (settings)environment- Direct config from the environment (not nested), orNoneif not presentuser- Direct config from the user simulator (not nested), orNoneif not presentother- Dict for any other registered componentsbenchmark- Benchmark-level configuration (git, system, packages)
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing configuration from all registered components. |
How to use
This method is called automatically by run() after each task repetition:
# Automatic collection (recommended)
results = benchmark.run()
# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
# Agents is a dict: agent_name -> config
print(f"Agent config: {report['config']['agents']['my_agent']}")
# Environment and user are direct (not nested)
print(f"Environment config: {report['config']['environment']}")
print(f"User config: {report['config']['user']}")
# Benchmark-level config
print(f"Git commit: {report['config']['benchmark']['git']['commit_hash']}")
The collected configs are available in the results for reproducibility analysis.
collect_all_traces
collect_all_traces() -> Dict[str, Any]
Collect execution traces from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition completes
and before evaluation begins. It gathers comprehensive traces from all registered
components (agents, models, tools, simulators, callbacks, etc.) for that specific
repetition. After collection, the registry is cleared for the next repetition.
The collected traces are stored in benchmark.reports list along with configs
for persistent access across all task repetitions.
Output fields:
metadata- Collection timestamp and thread infoagents- Dict mapping agent names to their traces (messages, execution data)models- Dict mapping model names to their traces (API calls, timing, errors)tools- Dict mapping tool names to their traces (invocations, parameters)simulators- Dict mapping simulator names to their traces (attempts, outcomes)callbacks- Dict mapping callback names to their traces (custom data)environment- Direct traces from the environment (not nested), orNoneif not presentuser- Direct traces from the user simulator (not nested), orNoneif not presentother- Dict for any other registered components
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing execution traces from all registered components. |
How to use
This method is called automatically by run() after each task repetition:
# Automatic collection (recommended)
results = benchmark.run()
# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
# Agents is a dict: agent_name -> traces
print(f"Agent messages: {report['traces']['agents']['my_agent']}")
# Environment and user are direct (not nested)
print(f"Environment state: {report['traces']['environment']}")
print(f"User interactions: {report['traces']['user']}")
The collected traces are passed to the evaluator's evaluate() method
and stored in benchmark.reports for later analysis.
collect_all_usage
collect_all_usage() -> Dict[str, Any]
Collect usage from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition
completes. It gathers usage from all registered UsageTrackableMixin
components and also accumulates into persistent running totals accessible
via usage and usage_by_component.
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing usage from all registered components. |
evaluate
evaluate(
evaluators: Sequence[Evaluator],
agents: Dict[str, AgentAdapter],
final_answer: Any,
traces: Dict[str, Any],
) -> List[Dict[str, Any]]
Evaluate model response.
execution_loop
execution_loop(
agents: Sequence[AgentAdapter],
task: Task,
environment: Environment,
user: Optional[User],
) -> Any
Execute agents with optional user interaction loop.
This method orchestrates the agent-user interaction pattern. When a user is
present, the user initiates the conversation using user.get_initial_query().
If no user is present, task.query is used as the initial query.
Interaction Flow
By default, agents execute once (max_invocations=1). For multi-turn
interaction, set self.max_invocations > 1 in your benchmark's __init__.
The loop continues until max_invocations is reached or user.is_done()
returns True (e.g., max turns reached or stop token detected).
Note
Override this method in your benchmark subclass to implement custom interaction patterns (e.g., agent-initiated conversations, different termination conditions, or specialized query routing).
| PARAMETER | DESCRIPTION |
|---|---|
agents
|
Agents to execute (typically the orchestrator).
TYPE:
|
task
|
The task being solved.
TYPE:
|
environment
|
The environment providing tools and state.
TYPE:
|
user
|
Optional user simulator. If provided, the user initiates and drives
the conversation. If None, a single agent execution with
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
Final answer from the last agent execution. |
Example
For interactive benchmarks, enable multi-turn interaction::
def __init__(self, ...):
super().__init__(...)
self.max_invocations = 5 # Up to 5 agent-user exchanges
get_failed_tasks
get_failed_tasks(
status_filter: Optional[
Union[
TaskExecutionStatus, List[TaskExecutionStatus]
]
] = None,
reports: Optional[List[Dict[str, Any]]] = None,
) -> SequentialTaskQueue
Get tasks that failed during benchmark execution.
This method retrieves failed tasks based on their execution status, useful for debugging, retry logic, or failure analysis.
| PARAMETER | DESCRIPTION |
|---|---|
status_filter
|
Filter by specific failure status(es). If None, returns all failed tasks (any status except SUCCESS). Can be a single TaskExecutionStatus or a list of them. Examples: - TaskExecutionStatus.TASK_EXECUTION_FAILED: Only tasks that failed during execution - TaskExecutionStatus.EVALUATION_FAILED: Only tasks where evaluation failed - [TaskExecutionStatus.TASK_EXECUTION_FAILED, TaskExecutionStatus.SETUP_FAILED]: Tasks that failed during execution or setup
TYPE:
|
reports
|
Optional list of reports to analyze. If None, uses the reports from the last run() call. This allows analyzing externally stored or modified reports.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SequentialTaskQueue
|
SequentialTaskQueue containing the failed tasks. Empty if no failures match the filter. |
| RAISES | DESCRIPTION |
|---|---|
RuntimeError
|
If reports is None and run() has not been executed yet. |
How to use
# Run benchmark
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)
# Get all failed tasks (from internal state)
failed = benchmark.get_failed_tasks()
print(f"Failed: {len(failed)}/{len(benchmark.tasks)} tasks")
# Or work with returned reports (safe from internal state changes)
failed = benchmark.get_failed_tasks(reports=reports)
# Get only tasks that failed during execution (not evaluation)
execution_failures = benchmark.get_failed_tasks(
TaskExecutionStatus.TASK_EXECUTION_FAILED,
reports=reports
)
# Get setup and execution failures
critical_failures = benchmark.get_failed_tasks(
status_filter=[
TaskExecutionStatus.SETUP_FAILED,
TaskExecutionStatus.TASK_EXECUTION_FAILED
],
reports=reports
)
# Retry failed tasks elegantly - this is the key use case!
if len(failed) > 0:
retry_reports = benchmark.run(tasks=failed)
# Or more concisely
reports = benchmark.run(tasks=tasks)
retry_reports = benchmark.run(tasks=benchmark.get_failed_tasks())
get_model_adapter
abstractmethod
get_model_adapter(
model_id: str, **kwargs: Any
) -> ModelAdapter
Provide a ModelAdapter for benchmark components that require LLM access.
Many benchmark components beyond the agents themselves require access to language models. Common examples include:
- Tool simulators: Simulating tool responses when real APIs aren't available
- User simulators: Generating realistic user responses in multi-turn dialogues
- Judges/Evaluators: Using LLMs to assess agent performance against criteria
- Reward models: Computing scores for reinforcement learning
This method centralizes model provisioning, giving you control over which models are used throughout the benchmark. Implement this to return a configured ModelAdapter for the requested model.
| PARAMETER | DESCRIPTION |
|---|---|
model_id
|
The model identifier to use (e.g., "gemini-2.5-flash", "openrouter/google/gemini-2.5-flash", "gpt-4o"). This is passed by the benchmark when setting up components that need model access.
TYPE:
|
**kwargs
|
Additional arguments for adapter creation or registration. Common kwargs: - register_category: Category for trace registration (e.g., "models") - register_name: Name for trace registration (e.g., "evaluator_user_gsr")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ModelAdapter
|
A ModelAdapter instance configured for the specified model. For proper tracing, |
ModelAdapter
|
return a fresh adapter for each call rather than reusing instances. You can |
ModelAdapter
|
still share the underlying API client for efficiency. |
How to use
For proper tracing, register the adapter after creation using the kwargs:
def get_model_adapter(self, model_id: str, **kwargs: Any) -> ModelAdapter:
adapter = GoogleGenAIModelAdapter(self.client, model_id=model_id)
# Register for tracing if registration info provided
category = kwargs.get("register_category", "models")
name = kwargs.get("register_name", model_id)
self.register(category, name, adapter)
return adapter
The benchmark calls this method when setting up tools, user simulators, and evaluators. Each call creates a fresh adapter with its own trace log.
register
register(
category: str,
name: str,
component: RegisterableComponent,
) -> RegisterableComponent
Register a component for comprehensive trace and configuration collection.
All core MASEval components (AgentAdapter, ModelAdapter, Environment, User, LLMSimulator, BenchmarkCallback) inherit from TraceableMixin and/or ConfigurableMixin, and are automatically registered for both trace and configuration collection before evaluation.
Note: Most components are automatically registered when returned from
setup methods (setup_environment, setup_user, setup_agents). You only
need to manually register additional components like models, simulators, or
tools that aren't automatically captured.
| PARAMETER | DESCRIPTION |
|---|---|
category
|
Component category (e.g., "agents", "models", "tools", "simulators", "callbacks", "user", "environment", "seeding"). Use plural form to match the structure in collect_all_traces() and collect_all_configs().
TYPE:
|
name
|
Unique identifier for this component within its category
TYPE:
|
component
|
Any object inheriting from TraceableMixin and/or ConfigurableMixin
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RegisterableComponent
|
The component (for chaining convenience) |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the component is already registered under a different name |
How to use
Most components are auto-registered. Manual registration is only needed for additional components:
def setup_agents(self, agent_data, environment, task, user):
# Create model (needs manual registration)
model = MyModelAdapter(...)
self.register("models", "main_model", model)
# Create agent (auto-registered when returned)
agent = MyAgent(model=model)
agent_adapter = AgentAdapter(agent, "agent1")
# Environment and user are also auto-registered
return [agent_adapter], {"agent1": agent_adapter}
Traces and configs are automatically collected before evaluation via
collect_all_traces() and collect_all_configs() which are called
internally by the run() method.
run
run(
tasks: Union[
Task, BaseTaskQueue, Iterable[Union[Task, dict]]
],
agent_data: Dict[str, Any] | Iterable[Dict[str, Any]],
) -> List[Dict[str, Any]]
Initialize and execute the complete benchmark loop across all tasks.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
Task source for execution. Can be: - A single Task object - A BaseTaskQueue (SequentialTaskQueue, PriorityTaskQueue, or custom AdaptiveTaskQueue) - An iterable of Task objects or dicts that will be converted to Tasks When a BaseTaskQueue is provided, it controls the task ordering. AdaptiveTaskQueue subclasses are automatically registered as callbacks to receive task completion notifications.
TYPE:
|
agent_data
|
Configuration for agents. Either a single dict applied to all tasks, or an iterable of dicts with one configuration per task. Agent data typically includes model parameters, agent architecture details, and tool specifications.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[Dict[str, Any]]
|
List of report dictionaries, one per task repetition. Each report contains: |
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If agent_data length doesn't match number of tasks (when agent_data is an iterable). |
How to use
This is the framework's main orchestration method that runs your entire benchmark. It iterates through all tasks, handles repetitions, and manages the three-stage lifecycle for each execution. You don't implement this method—instead, you call it to start the benchmark after implementing the setup and execution methods.
By default, the benchmark will continue executing remaining tasks even if some fail.
You can change this behavior by setting fail_on_task_error=True,
fail_on_evaluation_error=True, or fail_on_setup_error=True when instantiating
the benchmark. Each task execution returns a status indicating success or the specific
failure type (see TaskExecutionStatus).
For each task execution, the framework:
- Calls your setup methods to initialize components
- Calls your
run_agents()method to execute the task - Collects message histories and calls evaluators
- Stores results and triggers callbacks
Pseudocode structure:
for task in tasks:
for repeat in range(n_task_repeats):
# Setup stage
environment = setup_environment(agent_data, task)
user = setup_user(agent_data, environment, task)
agents_to_run, agents_dict = setup_agents(agent_data, environment, task, user)
evaluators = setup_evaluators(environment, task, agents_to_run, user)
# Run stage (execution_loop handles multi-turn if user exists)
agents_output = execution_loop(agents_to_run, task, environment, user)
# Evaluate stage
traces = collect_message_histories(agents_dict)
eval_results = evaluate(evaluators, traces, agents_dict)
# Store results
store_result(task_id, traces, eval_results)
Callback hooks are triggered at these points:
- on_run_start: Before processing any tasks
- on_task_start: Before processing a task (once per task, not per repeat)
- on_task_repeat_start: Before each repetition of a task
- on_task_repeat_end: After each repetition completes
- on_task_end: After all repetitions of a task complete
- on_run_end: After all tasks complete
# Typical usage
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)
# Analyze results
for report in reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}: {report['eval']}")
print(f"Config: {report['config']}")
print(f"Traces: {report['traces']}")
# Parallel execution with 4 workers
benchmark = MyBenchmark(num_workers=4)
reports = benchmark.run(tasks=tasks, agent_data=config)
# Single agent config for all tasks
reports = benchmark.run(tasks=tasks, agent_data={"model": "gpt-4"})
# Task-specific agent configs (must match task count)
reports = benchmark.run(
tasks=tasks,
agent_data=[
{"model": "gpt-4", "difficulty": "easy"},
{"model": "gpt-4", "difficulty": "hard"},
]
)
# Priority-based execution
from maseval.core.task import PriorityTaskQueue
for task in tasks:
task.protocol.priority = compute_priority(task)
queue = PriorityTaskQueue(tasks)
reports = benchmark.run(tasks=queue, agent_data=config)
# Adaptive queue (auto-registered as callback)
queue = MyAdaptiveTaskQueue(tasks)
reports = benchmark.run(tasks=queue) # queue receives on_task_complete callbacks
run_agents
run_agents(
agents: Sequence[AgentAdapter],
task: Task,
environment: Environment,
query: str,
) -> Any
Execute agent on the MMLU prompt.
setup_agents
abstractmethod
setup_agents(
agent_data: Dict[str, Any],
environment: Environment,
task: Task,
user: Optional[User],
seed_generator: SeedGenerator,
) -> Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]
Instantiate and configure the agent system for a task.
Note: All agents in the returned agents_dict are automatically registered
for tracing. You don't need to manually call register() for them. However, you
should manually register models, simulators, or other components used by agents.
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Configuration dict containing agent specifications, model parameters, and tool assignments for this task.
TYPE:
|
environment
|
The initialized environment providing tools to the agents.
TYPE:
|
task
|
The Task object with query and metadata.
TYPE:
|
user
|
Optional user simulator for agent-user interactions.
TYPE:
|
seed_generator
|
Seed generator for deriving deterministic seeds for agents and their
models. Use
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Sequence[AgentAdapter]
|
A tuple of (agents_to_run, agents_dict) where: |
Dict[str, AgentAdapter]
|
|
Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]
|
|
How to use
This method constructs your agent architecture—single agent, multiple collaborative agents, or an orchestrator managing workers. Each agent is wrapped in AgentAdapter for uniform message history tracking.
The dual return structure serves different purposes:
- agents_to_run: Only agents directly invoked in run_agents() (typically the orchestrator)
- agents_dict: All agents in the system for message history collection from workers called indirectly through the orchestrator
def setup_agents(self, agent_data, environment, task, user, seed_generator):
# Use child() for logical paths like "agents/experimental"
# derive_seed() returns None if seeding is disabled
agent_gen = seed_generator.child("agents")
# Vary experimental agent per rep, keep baseline constant
experimental_seed = agent_gen.derive_seed("experimental", per_repetition=True)
baseline_seed = agent_gen.derive_seed("baseline", per_repetition=False)
# For worker agents, nest further: "agents/workers/analyst"
worker_gen = agent_gen.child("workers")
analyst_seed = worker_gen.derive_seed("analyst")
# Create agents with seeds (model adapters accept Optional[int])
model = self.get_model_adapter(model_id, seed=experimental_seed)
# ... create agents ...
return [orchestrator_adapter], all_agents
setup_environment
setup_environment(
agent_data: Dict[str, Any],
task: Task,
seed_generator: SeedGenerator,
) -> MMLUEnvironment
Create environment for a task.
setup_evaluators
setup_evaluators(
environment: Environment,
task: Task,
agents: Sequence[AgentAdapter],
user: Optional[User],
seed_generator: SeedGenerator,
) -> Sequence[Evaluator]
Create MMLU evaluator.
setup_user
setup_user(
agent_data: Dict[str, Any],
environment: Environment,
task: Task,
seed_generator: SeedGenerator,
) -> Optional[User]
Create an optional user simulator for interactive tasks.
This method is optional. Return None if your benchmark does not require user simulation.
Note: The returned user is automatically registered for tracing.
You don't need to manually call register() for it.
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Configuration dict containing agent specifications and settings that may influence user simulator setup (e.g., framework type for creating compatible tools).
TYPE:
|
environment
|
The environment instance created for this task.
TYPE:
|
task
|
The Task object with user profile data or scenario information.
TYPE:
|
seed_generator
|
Seed generator for deriving deterministic seeds for the user simulator.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Optional[User]
|
A User instance that can respond to agent queries, or None if not needed. |
How to use
User simulators enable agent-user interactions by responding to queries with preferences, clarifications, or feedback. Useful for benchmarks testing conversational agents or systems requiring user input during execution.
def setup_user(self, agent_data, environment, task, seed_generator):
# Use child() to create logical namespace - results in "simulators/user"
# derive_seed() returns None if seeding is disabled
sim_gen = seed_generator.child("simulators")
user_seed = sim_gen.derive_seed("user") # Optional[int]
user_model = self.get_model_adapter(model_id, seed=user_seed)
return LLMUser(model=user_model, ...)
# Or skip user simulation entirely
def setup_user(self, agent_data, environment, task, seed_generator):
return None
The user is automatically registered for tracing when returned.
DefaultMMLUBenchmark
Bases: MMLUBenchmark
MMLU Benchmark using HuggingFace transformers models.
This concrete implementation uses log-likelihood based MCQ evaluation
via HuggingFaceModelScorer, with the same optimisations as
lm-evaluation-harness:
- Single forward pass per question (one-token continuation optimisation)
- Efficient log-softmax computation
- Proper left-padding for batch processing
Agents are created using a scorer-backed adapter (see _ScorerBackedAdapter).
seed_generator
property
seed_generator: SeedGenerator
The seed generator for this benchmark.
The seed generator is configured at benchmark initialization via the seed
or seed_generator parameters. When seed=None (the default), the generator's
derive_seed() method returns None, effectively disabling seeding while
maintaining a uniform interface.
| RETURNS | DESCRIPTION |
|---|---|
SeedGenerator
|
The root |
usage
property
usage: Usage
Running usage total across all task repetitions.
Queryable at any time, including while the benchmark is still running. Returns the grand total of all usage collected so far.
usage_by_component
property
usage_by_component: Dict[str, Usage]
Per-component running usage totals across all repetitions.
Keys are registry keys (e.g., "models:main_model").
__init__
__init__(
model_id: str,
device: str = DEFAULT_DEVICE,
trust_remote_code: bool = True,
use_full_prompt: bool = True,
batch_size: int = DEFAULT_BATCH_SIZE,
**kwargs: Any,
)
Initialize HuggingFace MMLU benchmark.
| PARAMETER | DESCRIPTION |
|---|---|
model_id
|
HuggingFace model identifier.
TYPE:
|
device
|
Device to run model on.
TYPE:
|
trust_remote_code
|
Trust remote code when loading model (default True).
TYPE:
|
use_full_prompt
|
Use full prompt with few-shot examples (default True).
TYPE:
|
batch_size
|
Batch size for lm-eval batching (number of questions per batch).
TYPE:
|
**kwargs
|
Additional arguments passed to
TYPE:
|
add_callback
add_callback(callback: BenchmarkCallback) -> None
Register a callback handler to monitor benchmark execution.
| PARAMETER | DESCRIPTION |
|---|---|
callback
|
A BenchmarkCallback instance that will receive execution events.
TYPE:
|
How to use
Callbacks receive notifications at key lifecycle points for tracing, progress tracking,
or custom metrics collection. See BenchmarkCallback
for available hooks and their signatures.
from maseval.core.callbacks import MessageTracingCallback
benchmark = MyBenchmark(tasks=tasks, agent_data=config)
benchmark.add_callback(MessageTracingCallback(output_dir="logs"))
results = benchmark.run()
clear_registry
clear_registry() -> None
Clear the component registry after a task repetition completes.
This method is called automatically by run() after each task repetition
to ensure components are not carried over between repetitions. The
reports list persists across all repetitions for aggregated analysis.
collect_all_configs
collect_all_configs() -> Dict[str, Any]
Collect configuration from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition completes
and before evaluation begins. It gathers comprehensive configuration from all registered
components (agents, models, tools, simulators, callbacks, etc.) for that specific
repetition. After collection, the registry is cleared for the next repetition.
The collected configs are stored in benchmark.reports list along with traces
for persistent access across all task repetitions.
Output fields:
metadata- Collection timestamp and thread infoagents- Dict mapping agent names to their config (settings, parameters)models- Dict mapping model names to their config (model IDs, parameters)tools- Dict mapping tool names to their config (specifications, settings)simulators- Dict mapping simulator names to their config (parameters, templates)callbacks- Dict mapping callback names to their config (settings)environment- Direct config from the environment (not nested), orNoneif not presentuser- Direct config from the user simulator (not nested), orNoneif not presentother- Dict for any other registered componentsbenchmark- Benchmark-level configuration (git, system, packages)
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing configuration from all registered components. |
How to use
This method is called automatically by run() after each task repetition:
# Automatic collection (recommended)
results = benchmark.run()
# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
# Agents is a dict: agent_name -> config
print(f"Agent config: {report['config']['agents']['my_agent']}")
# Environment and user are direct (not nested)
print(f"Environment config: {report['config']['environment']}")
print(f"User config: {report['config']['user']}")
# Benchmark-level config
print(f"Git commit: {report['config']['benchmark']['git']['commit_hash']}")
The collected configs are available in the results for reproducibility analysis.
collect_all_traces
collect_all_traces() -> Dict[str, Any]
Collect execution traces from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition completes
and before evaluation begins. It gathers comprehensive traces from all registered
components (agents, models, tools, simulators, callbacks, etc.) for that specific
repetition. After collection, the registry is cleared for the next repetition.
The collected traces are stored in benchmark.reports list along with configs
for persistent access across all task repetitions.
Output fields:
metadata- Collection timestamp and thread infoagents- Dict mapping agent names to their traces (messages, execution data)models- Dict mapping model names to their traces (API calls, timing, errors)tools- Dict mapping tool names to their traces (invocations, parameters)simulators- Dict mapping simulator names to their traces (attempts, outcomes)callbacks- Dict mapping callback names to their traces (custom data)environment- Direct traces from the environment (not nested), orNoneif not presentuser- Direct traces from the user simulator (not nested), orNoneif not presentother- Dict for any other registered components
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing execution traces from all registered components. |
How to use
This method is called automatically by run() after each task repetition:
# Automatic collection (recommended)
results = benchmark.run()
# Access all collected reports (traces + configs) across repetitions
for report in benchmark.reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}")
# Agents is a dict: agent_name -> traces
print(f"Agent messages: {report['traces']['agents']['my_agent']}")
# Environment and user are direct (not nested)
print(f"Environment state: {report['traces']['environment']}")
print(f"User interactions: {report['traces']['user']}")
The collected traces are passed to the evaluator's evaluate() method
and stored in benchmark.reports for later analysis.
collect_all_usage
collect_all_usage() -> Dict[str, Any]
Collect usage from all registered components for the current task repetition.
This method is called automatically by run() after each task repetition
completes. It gathers usage from all registered UsageTrackableMixin
components and also accumulates into persistent running totals accessible
via usage and usage_by_component.
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Structured dictionary containing usage from all registered components. |
evaluate
evaluate(
evaluators: Sequence[Evaluator],
agents: Dict[str, AgentAdapter],
final_answer: Any,
traces: Dict[str, Any],
) -> List[Dict[str, Any]]
Evaluate model response.
execution_loop
execution_loop(
agents: Sequence[AgentAdapter],
task: Task,
environment: Environment,
user: Optional[User],
) -> Any
Execute agents with optional user interaction loop.
This method orchestrates the agent-user interaction pattern. When a user is
present, the user initiates the conversation using user.get_initial_query().
If no user is present, task.query is used as the initial query.
Interaction Flow
By default, agents execute once (max_invocations=1). For multi-turn
interaction, set self.max_invocations > 1 in your benchmark's __init__.
The loop continues until max_invocations is reached or user.is_done()
returns True (e.g., max turns reached or stop token detected).
Note
Override this method in your benchmark subclass to implement custom interaction patterns (e.g., agent-initiated conversations, different termination conditions, or specialized query routing).
| PARAMETER | DESCRIPTION |
|---|---|
agents
|
Agents to execute (typically the orchestrator).
TYPE:
|
task
|
The task being solved.
TYPE:
|
environment
|
The environment providing tools and state.
TYPE:
|
user
|
Optional user simulator. If provided, the user initiates and drives
the conversation. If None, a single agent execution with
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
Final answer from the last agent execution. |
Example
For interactive benchmarks, enable multi-turn interaction::
def __init__(self, ...):
super().__init__(...)
self.max_invocations = 5 # Up to 5 agent-user exchanges
get_failed_tasks
get_failed_tasks(
status_filter: Optional[
Union[
TaskExecutionStatus, List[TaskExecutionStatus]
]
] = None,
reports: Optional[List[Dict[str, Any]]] = None,
) -> SequentialTaskQueue
Get tasks that failed during benchmark execution.
This method retrieves failed tasks based on their execution status, useful for debugging, retry logic, or failure analysis.
| PARAMETER | DESCRIPTION |
|---|---|
status_filter
|
Filter by specific failure status(es). If None, returns all failed tasks (any status except SUCCESS). Can be a single TaskExecutionStatus or a list of them. Examples: - TaskExecutionStatus.TASK_EXECUTION_FAILED: Only tasks that failed during execution - TaskExecutionStatus.EVALUATION_FAILED: Only tasks where evaluation failed - [TaskExecutionStatus.TASK_EXECUTION_FAILED, TaskExecutionStatus.SETUP_FAILED]: Tasks that failed during execution or setup
TYPE:
|
reports
|
Optional list of reports to analyze. If None, uses the reports from the last run() call. This allows analyzing externally stored or modified reports.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SequentialTaskQueue
|
SequentialTaskQueue containing the failed tasks. Empty if no failures match the filter. |
| RAISES | DESCRIPTION |
|---|---|
RuntimeError
|
If reports is None and run() has not been executed yet. |
How to use
# Run benchmark
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)
# Get all failed tasks (from internal state)
failed = benchmark.get_failed_tasks()
print(f"Failed: {len(failed)}/{len(benchmark.tasks)} tasks")
# Or work with returned reports (safe from internal state changes)
failed = benchmark.get_failed_tasks(reports=reports)
# Get only tasks that failed during execution (not evaluation)
execution_failures = benchmark.get_failed_tasks(
TaskExecutionStatus.TASK_EXECUTION_FAILED,
reports=reports
)
# Get setup and execution failures
critical_failures = benchmark.get_failed_tasks(
status_filter=[
TaskExecutionStatus.SETUP_FAILED,
TaskExecutionStatus.TASK_EXECUTION_FAILED
],
reports=reports
)
# Retry failed tasks elegantly - this is the key use case!
if len(failed) > 0:
retry_reports = benchmark.run(tasks=failed)
# Or more concisely
reports = benchmark.run(tasks=tasks)
retry_reports = benchmark.run(tasks=benchmark.get_failed_tasks())
get_model_adapter
get_model_adapter(
model_id: str, **kwargs: Any
) -> ModelAdapter
Not used — DefaultMMLUBenchmark uses HuggingFaceModelScorer.
| RAISES | DESCRIPTION |
|---|---|
NotImplementedError
|
Always. Use |
precompute_all_logprobs_lmeval
precompute_all_logprobs_lmeval(
tasks: Sequence[Task],
) -> Dict[Any, List[float]]
Precompute log-likelihoods for ALL tasks using lm-eval's batching.
CRITICAL: lm-evaluation-harness batches ALL requests together and uses its Collator class to reorder/group them. This affects floating-point precision for some edge cases. To get EXACT matches, we must process ALL requests together in a single batch.
This method: 1. Creates Instance objects for all task/choice combinations 2. Calls lm-eval's HFLM.loglikelihood() with ALL instances 3. Returns a mapping from doc_id to logprobs
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
Iterable of Task objects with prompt and choices.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[Any, List[float]]
|
Dict mapping doc_id -> list of log-likelihoods for each choice. |
register
register(
category: str,
name: str,
component: RegisterableComponent,
) -> RegisterableComponent
Register a component for comprehensive trace and configuration collection.
All core MASEval components (AgentAdapter, ModelAdapter, Environment, User, LLMSimulator, BenchmarkCallback) inherit from TraceableMixin and/or ConfigurableMixin, and are automatically registered for both trace and configuration collection before evaluation.
Note: Most components are automatically registered when returned from
setup methods (setup_environment, setup_user, setup_agents). You only
need to manually register additional components like models, simulators, or
tools that aren't automatically captured.
| PARAMETER | DESCRIPTION |
|---|---|
category
|
Component category (e.g., "agents", "models", "tools", "simulators", "callbacks", "user", "environment", "seeding"). Use plural form to match the structure in collect_all_traces() and collect_all_configs().
TYPE:
|
name
|
Unique identifier for this component within its category
TYPE:
|
component
|
Any object inheriting from TraceableMixin and/or ConfigurableMixin
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RegisterableComponent
|
The component (for chaining convenience) |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the component is already registered under a different name |
How to use
Most components are auto-registered. Manual registration is only needed for additional components:
def setup_agents(self, agent_data, environment, task, user):
# Create model (needs manual registration)
model = MyModelAdapter(...)
self.register("models", "main_model", model)
# Create agent (auto-registered when returned)
agent = MyAgent(model=model)
agent_adapter = AgentAdapter(agent, "agent1")
# Environment and user are also auto-registered
return [agent_adapter], {"agent1": agent_adapter}
Traces and configs are automatically collected before evaluation via
collect_all_traces() and collect_all_configs() which are called
internally by the run() method.
run
run(
tasks: Union[
Task, BaseTaskQueue, Iterable[Union[Task, dict]]
],
agent_data: Dict[str, Any] | Iterable[Dict[str, Any]],
) -> List[Dict[str, Any]]
Initialize and execute the complete benchmark loop across all tasks.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
Task source for execution. Can be: - A single Task object - A BaseTaskQueue (SequentialTaskQueue, PriorityTaskQueue, or custom AdaptiveTaskQueue) - An iterable of Task objects or dicts that will be converted to Tasks When a BaseTaskQueue is provided, it controls the task ordering. AdaptiveTaskQueue subclasses are automatically registered as callbacks to receive task completion notifications.
TYPE:
|
agent_data
|
Configuration for agents. Either a single dict applied to all tasks, or an iterable of dicts with one configuration per task. Agent data typically includes model parameters, agent architecture details, and tool specifications.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[Dict[str, Any]]
|
List of report dictionaries, one per task repetition. Each report contains: |
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
List[Dict[str, Any]]
|
|
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If agent_data length doesn't match number of tasks (when agent_data is an iterable). |
How to use
This is the framework's main orchestration method that runs your entire benchmark. It iterates through all tasks, handles repetitions, and manages the three-stage lifecycle for each execution. You don't implement this method—instead, you call it to start the benchmark after implementing the setup and execution methods.
By default, the benchmark will continue executing remaining tasks even if some fail.
You can change this behavior by setting fail_on_task_error=True,
fail_on_evaluation_error=True, or fail_on_setup_error=True when instantiating
the benchmark. Each task execution returns a status indicating success or the specific
failure type (see TaskExecutionStatus).
For each task execution, the framework:
- Calls your setup methods to initialize components
- Calls your
run_agents()method to execute the task - Collects message histories and calls evaluators
- Stores results and triggers callbacks
Pseudocode structure:
for task in tasks:
for repeat in range(n_task_repeats):
# Setup stage
environment = setup_environment(agent_data, task)
user = setup_user(agent_data, environment, task)
agents_to_run, agents_dict = setup_agents(agent_data, environment, task, user)
evaluators = setup_evaluators(environment, task, agents_to_run, user)
# Run stage (execution_loop handles multi-turn if user exists)
agents_output = execution_loop(agents_to_run, task, environment, user)
# Evaluate stage
traces = collect_message_histories(agents_dict)
eval_results = evaluate(evaluators, traces, agents_dict)
# Store results
store_result(task_id, traces, eval_results)
Callback hooks are triggered at these points:
- on_run_start: Before processing any tasks
- on_task_start: Before processing a task (once per task, not per repeat)
- on_task_repeat_start: Before each repetition of a task
- on_task_repeat_end: After each repetition completes
- on_task_end: After all repetitions of a task complete
- on_run_end: After all tasks complete
# Typical usage
benchmark = MyBenchmark()
reports = benchmark.run(tasks=tasks, agent_data=config)
# Analyze results
for report in reports:
print(f"Task {report['task_id']}, Repeat {report['repeat_idx']}: {report['eval']}")
print(f"Config: {report['config']}")
print(f"Traces: {report['traces']}")
# Parallel execution with 4 workers
benchmark = MyBenchmark(num_workers=4)
reports = benchmark.run(tasks=tasks, agent_data=config)
# Single agent config for all tasks
reports = benchmark.run(tasks=tasks, agent_data={"model": "gpt-4"})
# Task-specific agent configs (must match task count)
reports = benchmark.run(
tasks=tasks,
agent_data=[
{"model": "gpt-4", "difficulty": "easy"},
{"model": "gpt-4", "difficulty": "hard"},
]
)
# Priority-based execution
from maseval.core.task import PriorityTaskQueue
for task in tasks:
task.protocol.priority = compute_priority(task)
queue = PriorityTaskQueue(tasks)
reports = benchmark.run(tasks=queue, agent_data=config)
# Adaptive queue (auto-registered as callback)
queue = MyAdaptiveTaskQueue(tasks)
reports = benchmark.run(tasks=queue) # queue receives on_task_complete callbacks
run_agents
run_agents(
agents: Sequence[AgentAdapter],
task: Task,
environment: Environment,
query: str = "",
) -> Any
Execute log-likelihood based MCQ evaluation.
Uses precomputed logprobs if available (for exact lm-eval match),
otherwise delegates to HuggingFaceModelScorer.loglikelihood_choices()
which automatically picks single-token or multi-token scoring.
setup_agents
setup_agents(
agent_data: Dict[str, Any],
environment: Environment,
task: Task,
user: Optional[User],
seed_generator: SeedGenerator,
) -> Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]
Create scorer-backed agent for MCQ evaluation.
The returned adapter is a tracing container — actual evaluation is
driven by self._scorer in run_agents().
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Agent config (unused; model is set at
TYPE:
|
environment
|
MMLU environment.
TYPE:
|
task
|
Current task.
TYPE:
|
user
|
Unused.
TYPE:
|
seed_generator
|
Seed generator (unused for MMLU).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Tuple[Sequence[AgentAdapter], Dict[str, AgentAdapter]]
|
Tuple of (agents_to_run, agents_dict). |
setup_environment
setup_environment(
agent_data: Dict[str, Any],
task: Task,
seed_generator: SeedGenerator,
) -> MMLUEnvironment
Create environment for a task.
setup_evaluators
setup_evaluators(
environment: Environment,
task: Task,
agents: Sequence[AgentAdapter],
user: Optional[User],
seed_generator: SeedGenerator,
) -> Sequence[Evaluator]
Create MMLU evaluator.
setup_user
setup_user(
agent_data: Dict[str, Any],
environment: Environment,
task: Task,
seed_generator: SeedGenerator,
) -> Optional[User]
Create an optional user simulator for interactive tasks.
This method is optional. Return None if your benchmark does not require user simulation.
Note: The returned user is automatically registered for tracing.
You don't need to manually call register() for it.
| PARAMETER | DESCRIPTION |
|---|---|
agent_data
|
Configuration dict containing agent specifications and settings that may influence user simulator setup (e.g., framework type for creating compatible tools).
TYPE:
|
environment
|
The environment instance created for this task.
TYPE:
|
task
|
The Task object with user profile data or scenario information.
TYPE:
|
seed_generator
|
Seed generator for deriving deterministic seeds for the user simulator.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Optional[User]
|
A User instance that can respond to agent queries, or None if not needed. |
How to use
User simulators enable agent-user interactions by responding to queries with preferences, clarifications, or feedback. Useful for benchmarks testing conversational agents or systems requiring user input during execution.
def setup_user(self, agent_data, environment, task, seed_generator):
# Use child() to create logical namespace - results in "simulators/user"
# derive_seed() returns None if seeding is disabled
sim_gen = seed_generator.child("simulators")
user_seed = sim_gen.derive_seed("user") # Optional[int]
user_model = self.get_model_adapter(model_id, seed=user_seed)
return LLMUser(model=user_model, ...)
# Or skip user simulation entirely
def setup_user(self, agent_data, environment, task, seed_generator):
return None
The user is automatically registered for tracing when returned.
MMLUEnvironment
Bases: Environment
Simple environment for MMLU multiple choice evaluation.
MMLU tasks don't require tools - the environment just holds the task context (question, choices, etc.).
create_tools
create_tools() -> Dict[str, Any]
MMLU doesn't use tools.
gather_config
gather_config() -> dict[str, Any]
Gather configuration from this environment.
Output fields:
type- Component class namegathered_at- ISO timestamptool_count- Number of toolstool_names- List of tool names
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
Dictionary containing environment configuration. |
gather_traces
gather_traces() -> dict[str, Any]
Gather execution traces from this environment and its tools.
Output fields:
type- Component class namegathered_at- ISO timestamptool_count- Number of tools in environmenttools- Dictionary of tool traces keyed by tool name
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
Dictionary containing environment execution traces. |
get_prompt
get_prompt() -> str
Get the prompt to send to the model.
Returns full_prompt if use_full_prompt is True, otherwise query.
get_tool
get_tool(name: str) -> Optional[Any]
Get a tool by name.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Tool name
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Optional[Any]
|
The tool, or None if not found |
get_tools
get_tools() -> Dict[str, Any]
Get all tools as a dict.
setup_state
setup_state(task_data: Dict[str, Any]) -> Dict[str, Any]
Initialize state from task data.
| PARAMETER | DESCRIPTION |
|---|---|
task_data
|
Must contain
TYPE:
|
MMLUEvaluator
Bases: Evaluator
Evaluator for MMLU multiple choice questions.
Computes accuracy metrics (acc and acc_norm) by comparing model predictions with gold answers.
__call__
__call__(
traces: Dict[str, Any],
final_answer: Optional[str] = None,
) -> Dict[str, Any]
Evaluate the model's response.
| PARAMETER | DESCRIPTION |
|---|---|
traces
|
Filtered traces with messages.
TYPE:
|
final_answer
|
The model's final answer.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dict with acc, acc_norm, predicted, gold, correct, parse_failed, and optionally logprobs fields. |
__init__
__init__(
task: Task,
environment: Environment,
user: Optional[Any] = None,
)
Initialize MMLU evaluator.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
Task being evaluated. Must have
TYPE:
|
environment
|
Environment (provides choices).
TYPE:
|
user
|
Unused for MMLU.
TYPE:
|
filter_traces
filter_traces(traces: Dict[str, Any]) -> Dict[str, Any]
Extract relevant traces for evaluation.
For MMLU, we need the model's response from agent traces.
load_tasks
load_tasks(
data_path: Union[str, Path],
anchor_points_path: Optional[Union[str, Path]] = None,
limit: Optional[int] = None,
) -> Union[DISCOQueue, SequentialTaskQueue]
Load MMLU tasks from JSON file.
| PARAMETER | DESCRIPTION |
|---|---|
data_path
|
Path to MMLU prompts JSON file (mmlu_prompts_examples.json format).
TYPE:
|
anchor_points_path
|
Optional path to anchor points pickle file. If provided, returns an DISCOQueue that evaluates only the anchor tasks in order.
TYPE:
|
limit
|
Optional limit on number of tasks to load.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Union[DISCOQueue, SequentialTaskQueue]
|
TaskQueue containing MMLU tasks. |
compute_benchmark_metrics
compute_benchmark_metrics(
results: List[Dict[str, Any]],
) -> Dict[str, Any]
Compute summary metrics across all benchmark results.
| PARAMETER | DESCRIPTION |
|---|---|
results
|
List of result dicts from benchmark.run().
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dict with accuracy metrics and task counts. |