Tasks
Tasks define individual benchmark scenarios including inputs, expected outputs, and metadata for evaluation. Task queues control execution order and scheduling strategy.
Task
dataclass
A data container for a single benchmark task.
Task data can optionally be frozen after loading to prevent accidental
mutation during a benchmark run. Use freeze() to make all dictionary
fields read-only, and unfreeze() to restore mutability.
Note
It is strongly recommended to call freeze() once all task data has
been assembled (e.g. after load_tasks() or inside setup_environment).
This guards against subtle bugs where benchmark components accidentally
overwrite task data during execution.
| ATTRIBUTE | DESCRIPTION |
|---|---|
query |
The main input query or prompt for the task.
TYPE:
|
id |
A unique identifier for the task. Benchmarks can provide human-readable IDs (e.g., "task-000001", "retail_001"). Auto-generates a UUID string if not provided.
TYPE:
|
environment_data |
A dictionary of data needed to set up the environment for the task.
TYPE:
|
evaluation_data |
A dictionary of data needed to evaluate the agent's performance on the task.
TYPE:
|
metadata |
A dictionary for any additional metadata about the task.
TYPE:
|
protocol |
Execution protocol controlling timeout, retries, priority, and other runtime parameters. It provides fine-grained control over how MASEval runs the task. The protocol serves purely as a communication channel between the task instance and MASEval's execution engine; it does not impose any intrinsic semantics on the task content itself.
TYPE:
|
is_frozen
property
is_frozen: bool
Whether this task's data is currently frozen (read-only).
| RETURNS | DESCRIPTION |
|---|---|
bool
|
|
__setattr__
__setattr__(name: str, value: Any) -> None
Block attribute assignment when the task is frozen.
The _frozen field itself can always be set (used internally by
freeze / unfreeze). All other fields raise
TaskFrozenError while the task is frozen.
freeze
freeze() -> Task
Make all dictionary fields read-only.
Converts environment_data, user_data, evaluation_data, and
metadata (including nested dicts) to read-only wrappers and prevents
attribute reassignment on the task. Subsequent attempts to mutate any of
these fields raise TaskFrozenError.
Call unfreeze() to restore mutability.
| RETURNS | DESCRIPTION |
|---|---|
Task
|
|
| RAISES | DESCRIPTION |
|---|---|
TaskFrozenError
|
If the task is already frozen. |
Example
task = Task(query="test", environment_data={"key": "value"})
task.freeze()
task.environment_data["key"] = "new" # raises TaskFrozenError
task.query = "changed" # raises TaskFrozenError
unfreeze
unfreeze() -> Task
Restore mutability to all dictionary fields.
Converts read-only wrappers back to regular dicts and re-enables attribute assignment on the task.
| RETURNS | DESCRIPTION |
|---|---|
Task
|
|
| RAISES | DESCRIPTION |
|---|---|
TaskFrozenError
|
If the task is not currently frozen. |
Example
task.freeze()
# ... benchmark run ...
task.unfreeze()
task.environment_data["key"] = "updated" # works again
TaskProtocol
dataclass
Configuration for how MASEval executes a task.
This is a data container for execution parameters, separate from task content (query, environment_data, etc.). It controls the interface between the task and MASEval's execution engine.
Note
Timeout checking is cooperative and currently only occurs at execution phase boundaries (after setup, before execution, before evaluation). Timeout detection during agent execution is not yet supported.
| ATTRIBUTE | DESCRIPTION |
|---|---|
timeout_seconds |
Maximum execution time for this task. None means no timeout.
TYPE:
|
timeout_action |
Action to take when timeout occurs.
TYPE:
|
max_retries |
Maximum retry attempts for transient failures (not timeouts).
TYPE:
|
priority |
Execution priority (higher = sooner). Used by adaptive task queues.
TYPE:
|
tags |
Arbitrary tags for filtering or grouping tasks.
TYPE:
|
to_dict
to_dict() -> Dict[str, Any]
Convert to a JSON-serializable dictionary.
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dictionary with all fields. Enum values are converted to strings. |
TimeoutAction
Bases: Enum
Action to take when a task timeout occurs.
Task Queues
Task queues determine the order in which tasks are executed. Pass a queue to Benchmark.run(queue=...) to customize scheduling.
BaseTaskQueue
Bases: ABC, Sequence
Abstract base class for task scheduling strategies.
BaseTaskQueue provides a sequence-like interface for task execution. Concrete implementations can reorder tasks, skip tasks, or terminate early based on execution outcomes.
Subclasses must implement __iter__ to define the iteration order.
For adaptive behavior based on task results, use AdaptiveTaskQueue
which integrates with the benchmark callback system.
| ATTRIBUTE | DESCRIPTION |
|---|---|
_tasks |
Internal list of tasks.
TYPE:
|
Example
queue = SequentialTaskQueue(tasks)
for task in queue:
report = execute_task(task)
# Iterator handles termination automatically
__getitem__
__getitem__(idx: int) -> Task
__getitem__(idx: slice) -> BaseTaskQueue
__getitem__(
idx: Union[int, slice],
) -> Union[Task, BaseTaskQueue]
Get a task by index or a slice of tasks.
| PARAMETER | DESCRIPTION |
|---|---|
idx
|
Integer index or slice object.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Union[Task, BaseTaskQueue]
|
A single Task for integer index, or a new queue instance for slices. |
__init__
__init__(tasks: Iterable[Task]) -> None
Initialize the task queue.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
An iterable of Task objects to schedule.
TYPE:
|
__iter__
abstractmethod
__iter__() -> Iterator[Task]
Yield tasks in the scheduled execution order.
| RETURNS | DESCRIPTION |
|---|---|
Iterator[Task]
|
Iterator yielding Task objects. |
__len__
__len__() -> int
Return the total number of tasks in the queue.
append
append(task: Task) -> None
Add a task to the end of the queue.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
The task to append.
TYPE:
|
extend
extend(tasks: Iterable[Task]) -> None
Add multiple tasks to the end of the queue.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
An iterable of tasks to append.
TYPE:
|
from_json_file
classmethod
from_json_file(
path: Union[str, Path], *, limit: Optional[int] = None
) -> BaseTaskQueue
Load tasks from a JSON file.
This helper understands the example file format used in examples/data.json
where the top-level object has a data list and optional metadata.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Path to the JSON file.
TYPE:
|
limit
|
Optional limit to the number of tasks to load.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseTaskQueue
|
A new queue instance containing the loaded tasks. |
from_list
classmethod
from_list(
data: Iterable[Union[Task, dict]],
) -> BaseTaskQueue
Create a queue from an iterable of Tasks or dicts.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
An iterable of Task objects or dicts that can be converted to Tasks.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseTaskQueue
|
A new queue instance containing the tasks. |
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If an item is neither a Task nor a dict. |
SequentialTaskQueue
Bases: BaseTaskQueue
Execute tasks in their original order.
This queue maintains the current sequential execution model, processing tasks in the order they appear in the input iterable. It's the default queue used when no explicit queue is provided.
Example
queue = SequentialTaskQueue(tasks)
for task in queue:
result = execute(task)
__getitem__
__getitem__(idx: int) -> Task
__getitem__(idx: slice) -> BaseTaskQueue
__getitem__(
idx: Union[int, slice],
) -> Union[Task, BaseTaskQueue]
Get a task by index or a slice of tasks.
| PARAMETER | DESCRIPTION |
|---|---|
idx
|
Integer index or slice object.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Union[Task, BaseTaskQueue]
|
A single Task for integer index, or a new queue instance for slices. |
__init__
__init__(tasks: Iterable[Task]) -> None
Initialize the task queue.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
An iterable of Task objects to schedule.
TYPE:
|
__len__
__len__() -> int
Return the total number of tasks in the queue.
append
append(task: Task) -> None
Add a task to the end of the queue.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
The task to append.
TYPE:
|
extend
extend(tasks: Iterable[Task]) -> None
Add multiple tasks to the end of the queue.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
An iterable of tasks to append.
TYPE:
|
from_json_file
classmethod
from_json_file(
path: Union[str, Path], *, limit: Optional[int] = None
) -> BaseTaskQueue
Load tasks from a JSON file.
This helper understands the example file format used in examples/data.json
where the top-level object has a data list and optional metadata.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Path to the JSON file.
TYPE:
|
limit
|
Optional limit to the number of tasks to load.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseTaskQueue
|
A new queue instance containing the loaded tasks. |
from_list
classmethod
from_list(
data: Iterable[Union[Task, dict]],
) -> BaseTaskQueue
Create a queue from an iterable of Tasks or dicts.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
An iterable of Task objects or dicts that can be converted to Tasks.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseTaskQueue
|
A new queue instance containing the tasks. |
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If an item is neither a Task nor a dict. |
InformativeSubsetQueue
Bases: SequentialTaskQueue
Evaluates an informative subset of tasks in a specified order.
Used for efficient evaluation where a carefully selected subset of tasks
can predict performance on the full dataset. The subset is defined by
indices — integer positions into the original task list. Only tasks
at those positions are yielded, in the order given by indices.
The informativeness criterion (how the indices were chosen) is determined by the caller or by a subclass. This base class is criterion-agnostic.
When indices is None, all tasks are yielded in their original
order (equivalent to SequentialTaskQueue).
| ATTRIBUTE | DESCRIPTION |
|---|---|
_all_tasks |
The complete, unfiltered task list.
TYPE:
|
_indices |
The subset indices, or
TYPE:
|
Example
# Evaluate only tasks at indices 0, 5, 12
queue = InformativeSubsetQueue(tasks, indices=[0, 5, 12])
for task in queue:
result = execute(task) # Only 3 tasks
__getitem__
__getitem__(idx: int) -> Task
__getitem__(idx: slice) -> BaseTaskQueue
__getitem__(
idx: Union[int, slice],
) -> Union[Task, BaseTaskQueue]
Get a task by index or a slice of tasks.
| PARAMETER | DESCRIPTION |
|---|---|
idx
|
Integer index or slice object.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Union[Task, BaseTaskQueue]
|
A single Task for integer index, or a new queue instance for slices. |
__init__
__init__(
tasks: Iterable[Task],
indices: Optional[List[int]] = None,
) -> None
Initialize informative-subset task queue.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
Full list of tasks (ordered by index).
TYPE:
|
indices
|
Positions into
TYPE:
|
__len__
__len__() -> int
Return the total number of tasks in the queue.
append
append(task: Task) -> None
Add a task to the end of the queue.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
The task to append.
TYPE:
|
extend
extend(tasks: Iterable[Task]) -> None
Add multiple tasks to the end of the queue.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
An iterable of tasks to append.
TYPE:
|
from_json_file
classmethod
from_json_file(
path: Union[str, Path], *, limit: Optional[int] = None
) -> BaseTaskQueue
Load tasks from a JSON file.
This helper understands the example file format used in examples/data.json
where the top-level object has a data list and optional metadata.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Path to the JSON file.
TYPE:
|
limit
|
Optional limit to the number of tasks to load.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseTaskQueue
|
A new queue instance containing the loaded tasks. |
from_list
classmethod
from_list(
data: Iterable[Union[Task, dict]],
) -> BaseTaskQueue
Create a queue from an iterable of Tasks or dicts.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
An iterable of Task objects or dicts that can be converted to Tasks.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseTaskQueue
|
A new queue instance containing the tasks. |
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If an item is neither a Task nor a dict. |
DISCOQueue
Bases: InformativeSubsetQueue
Diversity-based informative subset using DISCO anchor points.
Selects a diverse subset of tasks (anchor points) for evaluation. Full benchmark performance is then predicted from results on this subset using DISCO (Diversifying Sample Condensation for Efficient Model Evaluation).
The informativeness criterion is diversity: anchor points are chosen to maximise disagreement across models, so that a small evaluation set captures the discriminative structure of the full benchmark.
Reference: DISCO: Diversifying Sample Condensation for Efficient Model
Evaluation <https://arxiv.org/abs/2510.07959>_
Example
queue = DISCOQueue(tasks, anchor_points=[0, 5, 12])
# or load from file:
queue = DISCOQueue(tasks, anchor_points_path="anchor_points.pkl")
for task in queue:
result = execute(task) # Only anchor-point tasks
__getitem__
__getitem__(idx: int) -> Task
__getitem__(idx: slice) -> BaseTaskQueue
__getitem__(
idx: Union[int, slice],
) -> Union[Task, BaseTaskQueue]
Get a task by index or a slice of tasks.
| PARAMETER | DESCRIPTION |
|---|---|
idx
|
Integer index or slice object.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Union[Task, BaseTaskQueue]
|
A single Task for integer index, or a new queue instance for slices. |
__init__
__init__(
tasks: Iterable[Task],
anchor_points: Optional[List[int]] = None,
anchor_points_path: Optional[Union[str, Path]] = None,
) -> None
Initialize DISCO task queue.
Anchor points can be supplied directly via anchor_points or loaded
from a file via anchor_points_path. Providing both is an error.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
Full list of tasks (ordered by index).
TYPE:
|
anchor_points
|
Diversity-selected indices into
TYPE:
|
anchor_points_path
|
Path to a
TYPE:
|
__len__
__len__() -> int
Return the total number of tasks in the queue.
append
append(task: Task) -> None
Add a task to the end of the queue.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
The task to append.
TYPE:
|
extend
extend(tasks: Iterable[Task]) -> None
Add multiple tasks to the end of the queue.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
An iterable of tasks to append.
TYPE:
|
from_json_file
classmethod
from_json_file(
path: Union[str, Path], *, limit: Optional[int] = None
) -> BaseTaskQueue
Load tasks from a JSON file.
This helper understands the example file format used in examples/data.json
where the top-level object has a data list and optional metadata.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Path to the JSON file.
TYPE:
|
limit
|
Optional limit to the number of tasks to load.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseTaskQueue
|
A new queue instance containing the loaded tasks. |
from_list
classmethod
from_list(
data: Iterable[Union[Task, dict]],
) -> BaseTaskQueue
Create a queue from an iterable of Tasks or dicts.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
An iterable of Task objects or dicts that can be converted to Tasks.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseTaskQueue
|
A new queue instance containing the tasks. |
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If an item is neither a Task nor a dict. |
load_anchor_points
staticmethod
load_anchor_points(path: Union[str, Path]) -> List[int]
Load anchor points from a .json or .pkl file.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Path to anchor points file. JSON files should contain a list of integer indices. Pickle files may contain a list or a numpy array.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[int]
|
List of integer anchor-point indices. |
| RAISES | DESCRIPTION |
|---|---|
FileNotFoundError
|
If the file does not exist. |
PriorityTaskQueue
Bases: BaseTaskQueue
Execute tasks ordered by priority.
Tasks are sorted by task.protocol.priority at construction time.
Higher priority values are executed first by default. Tasks with equal
priority maintain their relative order from the original input (stable sort).
This queue uses task.protocol.priority as the sole source of priority.
Pre-compute priority values and assign them to tasks before creating the queue.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
An iterable of Task objects to schedule.
TYPE:
|
reverse
|
If True (default), higher priority values execute first. If False, lower priority values execute first.
TYPE:
|
Example
# Assign priorities based on your criteria
for task in tasks:
task.protocol.priority = compute_priority(task)
# Create queue (higher priority first)
queue = PriorityTaskQueue(tasks)
# Or lower priority first
queue = PriorityTaskQueue(tasks, reverse=False)
__getitem__
__getitem__(idx: int) -> Task
__getitem__(idx: slice) -> BaseTaskQueue
__getitem__(
idx: Union[int, slice],
) -> Union[Task, BaseTaskQueue]
Get a task by index or a slice of tasks.
| PARAMETER | DESCRIPTION |
|---|---|
idx
|
Integer index or slice object.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Union[Task, BaseTaskQueue]
|
A single Task for integer index, or a new queue instance for slices. |
__init__
__init__(
tasks: Iterable[Task], reverse: bool = True
) -> None
Initialize priority queue with sorted tasks.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
An iterable of Task objects to schedule.
TYPE:
|
reverse
|
If True (default), higher priority values execute first.
TYPE:
|
__len__
__len__() -> int
Return the total number of tasks in the queue.
append
append(task: Task) -> None
Add a task to the end of the queue.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
The task to append.
TYPE:
|
extend
extend(tasks: Iterable[Task]) -> None
Add multiple tasks to the end of the queue.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
An iterable of tasks to append.
TYPE:
|
from_json_file
classmethod
from_json_file(
path: Union[str, Path], *, limit: Optional[int] = None
) -> BaseTaskQueue
Load tasks from a JSON file.
This helper understands the example file format used in examples/data.json
where the top-level object has a data list and optional metadata.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Path to the JSON file.
TYPE:
|
limit
|
Optional limit to the number of tasks to load.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseTaskQueue
|
A new queue instance containing the loaded tasks. |
from_list
classmethod
from_list(
data: Iterable[Union[Task, dict]],
) -> BaseTaskQueue
Create a queue from an iterable of Tasks or dicts.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
An iterable of Task objects or dicts that can be converted to Tasks.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseTaskQueue
|
A new queue instance containing the tasks. |
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If an item is neither a Task nor a dict. |
AdaptiveTaskQueue
Bases: BaseTaskQueue, BenchmarkCallback, ABC
Abstract base class for adaptive task scheduling.
AdaptiveTaskQueue enables dynamic task ordering based on execution results. It inherits from BenchmarkCallback to integrate with the benchmark's callback system, creating a clean bidirectional communication model:
- Benchmark → Queue: Via iterator protocol (
for task in queue) - Queue → Benchmark: Via callback (
on_task_repeat_end())
The queue automatically moves completed tasks from _remaining to
_completed and calls update_state() to let subclasses adapt their
scheduling strategy based on task results.
Subclasses must implement
initial_state(): Return initial state dict for adaptive algorithmselect_next_task(remaining, state): Choose the next task to executeupdate_state(task, report, state): Update and return new state
The state dict is managed by the base class: initialized via initial_state()
at iteration start, passed to both methods, and updated from update_state()
return value. This functional approach keeps state flow explicit while allowing
subclasses to store any data they need.
Internal state (managed by base class, do not modify directly):
- _remaining: Tasks not yet executed
- _completed: Completed tasks paired with their reports
- _state: Current adaptive state dict
- _stop_flag: Flag to signal early termination
When used with Benchmark.run(), the queue is automatically registered
as a callback and receives on_task_repeat_end() notifications.
Example
class IRTTaskQueue(AdaptiveTaskQueue):
'''Item Response Theory-based adaptive testing.'''
def initial_state(self) -> Dict[str, Any]:
return {"ability": 0.0}
def select_next_task(
self, remaining: Sequence[Task], state: Dict[str, Any]
) -> Optional[Task]:
# Select task with difficulty closest to current ability estimate
return min(
remaining,
key=lambda t: abs(t.metadata.get("difficulty", 0) - state["ability"])
)
def update_state(
self, task: Task, report: Dict[str, Any], state: Dict[str, Any]
) -> Dict[str, Any]:
# Update ability estimate based on task result
correct = report.get("eval", [{}])[0].get("correct", False)
return {"ability": state["ability"] + (0.5 if correct else -0.5)}
queue = IRTTaskQueue(tasks)
results = benchmark.run(queue) # Auto-registered as callback
__getitem__
__getitem__(idx: int) -> Task
__getitem__(idx: slice) -> BaseTaskQueue
__getitem__(
idx: Union[int, slice],
) -> Union[Task, BaseTaskQueue]
Get a task by index or a slice of tasks.
| PARAMETER | DESCRIPTION |
|---|---|
idx
|
Integer index or slice object.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Union[Task, BaseTaskQueue]
|
A single Task for integer index, or a new queue instance for slices. |
__init__
__init__(tasks: Iterable[Task]) -> None
Initialize adaptive queue.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
An iterable of Task objects to schedule.
TYPE:
|
__iter__
__iter__() -> Iterator[Task]
Yield tasks selected by the adaptive algorithm.
Initializes state via initial_state() at iteration start, then
continues until select_next_task() returns None, _remaining
is empty, or stop() is called.
Note: select_next_task() is only called when _remaining is
non-empty, so implementers don't need to check for empty list.
__len__
__len__() -> int
Return the total number of tasks in the queue.
append
append(task: Task) -> None
Add a task to the end of the queue.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
The task to append.
TYPE:
|
extend
extend(tasks: Iterable[Task]) -> None
Add multiple tasks to the end of the queue.
| PARAMETER | DESCRIPTION |
|---|---|
tasks
|
An iterable of tasks to append.
TYPE:
|
from_json_file
classmethod
from_json_file(
path: Union[str, Path], *, limit: Optional[int] = None
) -> BaseTaskQueue
Load tasks from a JSON file.
This helper understands the example file format used in examples/data.json
where the top-level object has a data list and optional metadata.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Path to the JSON file.
TYPE:
|
limit
|
Optional limit to the number of tasks to load.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseTaskQueue
|
A new queue instance containing the loaded tasks. |
from_list
classmethod
from_list(
data: Iterable[Union[Task, dict]],
) -> BaseTaskQueue
Create a queue from an iterable of Tasks or dicts.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
An iterable of Task objects or dicts that can be converted to Tasks.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
BaseTaskQueue
|
A new queue instance containing the tasks. |
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If an item is neither a Task nor a dict. |
gather_traces
gather_traces() -> dict[str, Any]
Gather execution traces from this callback.
By default, callbacks don't store traces, but subclasses can override this to provide custom tracing data.
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
Dictionary with basic callback information. Subclasses should |
dict[str, Any]
|
extend this with their own data. |
initial_state
abstractmethod
initial_state() -> Dict[str, Any]
Return the initial state for adaptive selection.
This state dict will be passed to select_next_task() and
update_state() throughout the benchmark run. Store any data
your adaptive algorithm needs (ability estimates, history, etc.).
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Initial state dict. Can contain any keys/values you need. |
on_event
on_event(event_name: str, **data) -> None
Handle a generic event.
on_task_repeat_end
on_task_repeat_end(
benchmark: Benchmark, report: Dict[str, Any]
) -> None
BenchmarkCallback hook called after each task repetition completes.
This method extracts the task from the report, moves it from
_remaining to _completed, and calls update_state()
to let the subclass update its adaptive model.
| PARAMETER | DESCRIPTION |
|---|---|
benchmark
|
The benchmark instance (unused in this implementation).
TYPE:
|
report
|
The execution report containing task_id and results.
TYPE:
|
select_next_task
abstractmethod
Select the next task to execute.
Implement this method to define your adaptive selection algorithm (e.g., IRT-based selection, uncertainty sampling, bandit algorithms).
| PARAMETER | DESCRIPTION |
|---|---|
remaining
|
Read-only sequence of tasks not yet executed. Do not modify this sequence; the queue manages task lifecycle.
TYPE:
|
state
|
Current adaptive state from
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Optional[Task]
|
The next Task to execute from |
Optional[Task]
|
signal early termination. |
Note
This method is only called when remaining is non-empty,
so you don't need to check for an empty sequence.
stop
stop() -> None
Signal that no more tasks should be processed.
Call this from update_state() to trigger early termination
(e.g., when confidence threshold is reached).
The _stop_flag is checked in __iter__, which will stop yielding
tasks and naturally terminate the benchmark's iteration loop via Python's
iterator protocol.
to_list
to_list() -> List[Task]
Return a copy of the internal task list.
| RETURNS | DESCRIPTION |
|---|---|
List[Task]
|
List of all tasks in the queue. |
update_state
abstractmethod
update_state(
task: Task,
report: Dict[str, Any],
state: Dict[str, Any],
) -> Dict[str, Any]
Update state after task completion.
Implement this method to update ability estimates, difficulty models, or other adaptive state based on task results.
| PARAMETER | DESCRIPTION |
|---|---|
task
|
The task that just completed.
TYPE:
|
report
|
The execution report containing status and eval results.
TYPE:
|
state
|
Current state dict.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Updated state dict (can be the same dict mutated, or a new dict). |
Note
Call self.stop() here to halt iteration before the next
task selection.