Skip to content

Tasks

Tasks define individual benchmark scenarios including inputs, expected outputs, and metadata for evaluation. Task queues control execution order and scheduling strategy.

View source

Task dataclass

A data container for a single benchmark task.

Task data can optionally be frozen after loading to prevent accidental mutation during a benchmark run. Use freeze() to make all dictionary fields read-only, and unfreeze() to restore mutability.

Note

It is strongly recommended to call freeze() once all task data has been assembled (e.g. after load_tasks() or inside setup_environment). This guards against subtle bugs where benchmark components accidentally overwrite task data during execution.

ATTRIBUTE DESCRIPTION
query

The main input query or prompt for the task.

TYPE: str

id

A unique identifier for the task. Benchmarks can provide human-readable IDs (e.g., "task-000001", "retail_001"). Auto-generates a UUID string if not provided.

TYPE: str

environment_data

A dictionary of data needed to set up the environment for the task.

TYPE: Dict[str, Any]

evaluation_data

A dictionary of data needed to evaluate the agent's performance on the task.

TYPE: Dict[str, Any]

metadata

A dictionary for any additional metadata about the task.

TYPE: Dict[str, Any]

protocol

Execution protocol controlling timeout, retries, priority, and other runtime parameters. It provides fine-grained control over how MASEval runs the task. The protocol serves purely as a communication channel between the task instance and MASEval's execution engine; it does not impose any intrinsic semantics on the task content itself.

TYPE: TaskProtocol

is_frozen property

is_frozen: bool

Whether this task's data is currently frozen (read-only).

RETURNS DESCRIPTION
bool

True if freeze() has been called and unfreeze() has not.

__setattr__

__setattr__(name: str, value: Any) -> None

Block attribute assignment when the task is frozen.

The _frozen field itself can always be set (used internally by freeze / unfreeze). All other fields raise TaskFrozenError while the task is frozen.

freeze

freeze() -> Task

Make all dictionary fields read-only.

Converts environment_data, user_data, evaluation_data, and metadata (including nested dicts) to read-only wrappers and prevents attribute reassignment on the task. Subsequent attempts to mutate any of these fields raise TaskFrozenError.

Call unfreeze() to restore mutability.

RETURNS DESCRIPTION
Task

self, for chaining (e.g. task.freeze().query).

RAISES DESCRIPTION
TaskFrozenError

If the task is already frozen.

Example
task = Task(query="test", environment_data={"key": "value"})
task.freeze()

task.environment_data["key"] = "new"  # raises TaskFrozenError
task.query = "changed"                 # raises TaskFrozenError

unfreeze

unfreeze() -> Task

Restore mutability to all dictionary fields.

Converts read-only wrappers back to regular dicts and re-enables attribute assignment on the task.

RETURNS DESCRIPTION
Task

self, for chaining.

RAISES DESCRIPTION
TaskFrozenError

If the task is not currently frozen.

Example
task.freeze()
# ... benchmark run ...
task.unfreeze()
task.environment_data["key"] = "updated"  # works again

View source

TaskProtocol dataclass

Configuration for how MASEval executes a task.

This is a data container for execution parameters, separate from task content (query, environment_data, etc.). It controls the interface between the task and MASEval's execution engine.

Note

Timeout checking is cooperative and currently only occurs at execution phase boundaries (after setup, before execution, before evaluation). Timeout detection during agent execution is not yet supported.

ATTRIBUTE DESCRIPTION
timeout_seconds

Maximum execution time for this task. None means no timeout.

TYPE: Optional[float]

timeout_action

Action to take when timeout occurs.

TYPE: TimeoutAction

max_retries

Maximum retry attempts for transient failures (not timeouts).

TYPE: int

priority

Execution priority (higher = sooner). Used by adaptive task queues.

TYPE: int

tags

Arbitrary tags for filtering or grouping tasks.

TYPE: Dict[str, Any]

to_dict

to_dict() -> Dict[str, Any]

Convert to a JSON-serializable dictionary.

RETURNS DESCRIPTION
Dict[str, Any]

Dictionary with all fields. Enum values are converted to strings.

View source

TimeoutAction

Bases: Enum

Action to take when a task timeout occurs.

Task Queues

Task queues determine the order in which tasks are executed. Pass a queue to Benchmark.run(queue=...) to customize scheduling.

View source

BaseTaskQueue

Bases: ABC, Sequence

Abstract base class for task scheduling strategies.

BaseTaskQueue provides a sequence-like interface for task execution. Concrete implementations can reorder tasks, skip tasks, or terminate early based on execution outcomes.

Subclasses must implement __iter__ to define the iteration order. For adaptive behavior based on task results, use AdaptiveTaskQueue which integrates with the benchmark callback system.

ATTRIBUTE DESCRIPTION
_tasks

Internal list of tasks.

TYPE: List[Task]

Example
queue = SequentialTaskQueue(tasks)

for task in queue:
    report = execute_task(task)
    # Iterator handles termination automatically

__getitem__

__getitem__(idx: int) -> Task
__getitem__(idx: slice) -> BaseTaskQueue
__getitem__(
    idx: Union[int, slice],
) -> Union[Task, BaseTaskQueue]

Get a task by index or a slice of tasks.

PARAMETER DESCRIPTION
idx

Integer index or slice object.

TYPE: Union[int, slice]

RETURNS DESCRIPTION
Union[Task, BaseTaskQueue]

A single Task for integer index, or a new queue instance for slices.

__init__

__init__(tasks: Iterable[Task]) -> None

Initialize the task queue.

PARAMETER DESCRIPTION
tasks

An iterable of Task objects to schedule.

TYPE: Iterable[Task]

__iter__ abstractmethod

__iter__() -> Iterator[Task]

Yield tasks in the scheduled execution order.

RETURNS DESCRIPTION
Iterator[Task]

Iterator yielding Task objects.

__len__

__len__() -> int

Return the total number of tasks in the queue.

append

append(task: Task) -> None

Add a task to the end of the queue.

PARAMETER DESCRIPTION
task

The task to append.

TYPE: Task

extend

extend(tasks: Iterable[Task]) -> None

Add multiple tasks to the end of the queue.

PARAMETER DESCRIPTION
tasks

An iterable of tasks to append.

TYPE: Iterable[Task]

from_json_file classmethod

from_json_file(
    path: Union[str, Path], *, limit: Optional[int] = None
) -> BaseTaskQueue

Load tasks from a JSON file.

This helper understands the example file format used in examples/data.json where the top-level object has a data list and optional metadata.

PARAMETER DESCRIPTION
path

Path to the JSON file.

TYPE: Union[str, Path]

limit

Optional limit to the number of tasks to load.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
BaseTaskQueue

A new queue instance containing the loaded tasks.

from_list classmethod

from_list(
    data: Iterable[Union[Task, dict]],
) -> BaseTaskQueue

Create a queue from an iterable of Tasks or dicts.

PARAMETER DESCRIPTION
data

An iterable of Task objects or dicts that can be converted to Tasks.

TYPE: Iterable[Union[Task, dict]]

RETURNS DESCRIPTION
BaseTaskQueue

A new queue instance containing the tasks.

RAISES DESCRIPTION
TypeError

If an item is neither a Task nor a dict.

to_list

to_list() -> List[Task]

Return a copy of the internal task list.

RETURNS DESCRIPTION
List[Task]

List of all tasks in the queue.

View source

SequentialTaskQueue

Bases: BaseTaskQueue

Execute tasks in their original order.

This queue maintains the current sequential execution model, processing tasks in the order they appear in the input iterable. It's the default queue used when no explicit queue is provided.

Example
queue = SequentialTaskQueue(tasks)
for task in queue:
    result = execute(task)

__getitem__

__getitem__(idx: int) -> Task
__getitem__(idx: slice) -> BaseTaskQueue
__getitem__(
    idx: Union[int, slice],
) -> Union[Task, BaseTaskQueue]

Get a task by index or a slice of tasks.

PARAMETER DESCRIPTION
idx

Integer index or slice object.

TYPE: Union[int, slice]

RETURNS DESCRIPTION
Union[Task, BaseTaskQueue]

A single Task for integer index, or a new queue instance for slices.

__init__

__init__(tasks: Iterable[Task]) -> None

Initialize the task queue.

PARAMETER DESCRIPTION
tasks

An iterable of Task objects to schedule.

TYPE: Iterable[Task]

__iter__

__iter__() -> Iterator[Task]

Yield tasks in original order.

__len__

__len__() -> int

Return the total number of tasks in the queue.

append

append(task: Task) -> None

Add a task to the end of the queue.

PARAMETER DESCRIPTION
task

The task to append.

TYPE: Task

extend

extend(tasks: Iterable[Task]) -> None

Add multiple tasks to the end of the queue.

PARAMETER DESCRIPTION
tasks

An iterable of tasks to append.

TYPE: Iterable[Task]

from_json_file classmethod

from_json_file(
    path: Union[str, Path], *, limit: Optional[int] = None
) -> BaseTaskQueue

Load tasks from a JSON file.

This helper understands the example file format used in examples/data.json where the top-level object has a data list and optional metadata.

PARAMETER DESCRIPTION
path

Path to the JSON file.

TYPE: Union[str, Path]

limit

Optional limit to the number of tasks to load.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
BaseTaskQueue

A new queue instance containing the loaded tasks.

from_list classmethod

from_list(
    data: Iterable[Union[Task, dict]],
) -> BaseTaskQueue

Create a queue from an iterable of Tasks or dicts.

PARAMETER DESCRIPTION
data

An iterable of Task objects or dicts that can be converted to Tasks.

TYPE: Iterable[Union[Task, dict]]

RETURNS DESCRIPTION
BaseTaskQueue

A new queue instance containing the tasks.

RAISES DESCRIPTION
TypeError

If an item is neither a Task nor a dict.

to_list

to_list() -> List[Task]

Return a copy of the internal task list.

RETURNS DESCRIPTION
List[Task]

List of all tasks in the queue.

View source

InformativeSubsetQueue

Bases: SequentialTaskQueue

Evaluates an informative subset of tasks in a specified order.

Used for efficient evaluation where a carefully selected subset of tasks can predict performance on the full dataset. The subset is defined by indices — integer positions into the original task list. Only tasks at those positions are yielded, in the order given by indices.

The informativeness criterion (how the indices were chosen) is determined by the caller or by a subclass. This base class is criterion-agnostic.

When indices is None, all tasks are yielded in their original order (equivalent to SequentialTaskQueue).

ATTRIBUTE DESCRIPTION
_all_tasks

The complete, unfiltered task list.

TYPE: List[Task]

_indices

The subset indices, or None.

TYPE: Optional[List[int]]

Example
# Evaluate only tasks at indices 0, 5, 12
queue = InformativeSubsetQueue(tasks, indices=[0, 5, 12])

for task in queue:
    result = execute(task)  # Only 3 tasks

__getitem__

__getitem__(idx: int) -> Task
__getitem__(idx: slice) -> BaseTaskQueue
__getitem__(
    idx: Union[int, slice],
) -> Union[Task, BaseTaskQueue]

Get a task by index or a slice of tasks.

PARAMETER DESCRIPTION
idx

Integer index or slice object.

TYPE: Union[int, slice]

RETURNS DESCRIPTION
Union[Task, BaseTaskQueue]

A single Task for integer index, or a new queue instance for slices.

__init__

__init__(
    tasks: Iterable[Task],
    indices: Optional[List[int]] = None,
) -> None

Initialize informative-subset task queue.

PARAMETER DESCRIPTION
tasks

Full list of tasks (ordered by index).

TYPE: Iterable[Task]

indices

Positions into tasks selecting which tasks to evaluate and in what order. If None, evaluates all tasks in order.

TYPE: Optional[List[int]] DEFAULT: None

__iter__

__iter__() -> Iterator[Task]

Yield tasks in original order.

__len__

__len__() -> int

Return the total number of tasks in the queue.

append

append(task: Task) -> None

Add a task to the end of the queue.

PARAMETER DESCRIPTION
task

The task to append.

TYPE: Task

extend

extend(tasks: Iterable[Task]) -> None

Add multiple tasks to the end of the queue.

PARAMETER DESCRIPTION
tasks

An iterable of tasks to append.

TYPE: Iterable[Task]

from_json_file classmethod

from_json_file(
    path: Union[str, Path], *, limit: Optional[int] = None
) -> BaseTaskQueue

Load tasks from a JSON file.

This helper understands the example file format used in examples/data.json where the top-level object has a data list and optional metadata.

PARAMETER DESCRIPTION
path

Path to the JSON file.

TYPE: Union[str, Path]

limit

Optional limit to the number of tasks to load.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
BaseTaskQueue

A new queue instance containing the loaded tasks.

from_list classmethod

from_list(
    data: Iterable[Union[Task, dict]],
) -> BaseTaskQueue

Create a queue from an iterable of Tasks or dicts.

PARAMETER DESCRIPTION
data

An iterable of Task objects or dicts that can be converted to Tasks.

TYPE: Iterable[Union[Task, dict]]

RETURNS DESCRIPTION
BaseTaskQueue

A new queue instance containing the tasks.

RAISES DESCRIPTION
TypeError

If an item is neither a Task nor a dict.

to_list

to_list() -> List[Task]

Return a copy of the internal task list.

RETURNS DESCRIPTION
List[Task]

List of all tasks in the queue.

View source

DISCOQueue

Bases: InformativeSubsetQueue

Diversity-based informative subset using DISCO anchor points.

Selects a diverse subset of tasks (anchor points) for evaluation. Full benchmark performance is then predicted from results on this subset using DISCO (Diversifying Sample Condensation for Efficient Model Evaluation).

The informativeness criterion is diversity: anchor points are chosen to maximise disagreement across models, so that a small evaluation set captures the discriminative structure of the full benchmark.

Reference: DISCO: Diversifying Sample Condensation for Efficient Model Evaluation <https://arxiv.org/abs/2510.07959>_

Example
queue = DISCOQueue(tasks, anchor_points=[0, 5, 12])
# or load from file:
queue = DISCOQueue(tasks, anchor_points_path="anchor_points.pkl")

for task in queue:
    result = execute(task)  # Only anchor-point tasks

__getitem__

__getitem__(idx: int) -> Task
__getitem__(idx: slice) -> BaseTaskQueue
__getitem__(
    idx: Union[int, slice],
) -> Union[Task, BaseTaskQueue]

Get a task by index or a slice of tasks.

PARAMETER DESCRIPTION
idx

Integer index or slice object.

TYPE: Union[int, slice]

RETURNS DESCRIPTION
Union[Task, BaseTaskQueue]

A single Task for integer index, or a new queue instance for slices.

__init__

__init__(
    tasks: Iterable[Task],
    anchor_points: Optional[List[int]] = None,
    anchor_points_path: Optional[Union[str, Path]] = None,
) -> None

Initialize DISCO task queue.

Anchor points can be supplied directly via anchor_points or loaded from a file via anchor_points_path. Providing both is an error.

PARAMETER DESCRIPTION
tasks

Full list of tasks (ordered by index).

TYPE: Iterable[Task]

anchor_points

Diversity-selected indices into tasks. Typically downloaded from a HuggingFace DISCO model repo. If None and anchor_points_path is also None, evaluates all tasks in order.

TYPE: Optional[List[int]] DEFAULT: None

anchor_points_path

Path to a .json or .pkl file containing anchor-point indices. Mutually exclusive with anchor_points.

TYPE: Optional[Union[str, Path]] DEFAULT: None

__iter__

__iter__() -> Iterator[Task]

Yield tasks in original order.

__len__

__len__() -> int

Return the total number of tasks in the queue.

append

append(task: Task) -> None

Add a task to the end of the queue.

PARAMETER DESCRIPTION
task

The task to append.

TYPE: Task

extend

extend(tasks: Iterable[Task]) -> None

Add multiple tasks to the end of the queue.

PARAMETER DESCRIPTION
tasks

An iterable of tasks to append.

TYPE: Iterable[Task]

from_json_file classmethod

from_json_file(
    path: Union[str, Path], *, limit: Optional[int] = None
) -> BaseTaskQueue

Load tasks from a JSON file.

This helper understands the example file format used in examples/data.json where the top-level object has a data list and optional metadata.

PARAMETER DESCRIPTION
path

Path to the JSON file.

TYPE: Union[str, Path]

limit

Optional limit to the number of tasks to load.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
BaseTaskQueue

A new queue instance containing the loaded tasks.

from_list classmethod

from_list(
    data: Iterable[Union[Task, dict]],
) -> BaseTaskQueue

Create a queue from an iterable of Tasks or dicts.

PARAMETER DESCRIPTION
data

An iterable of Task objects or dicts that can be converted to Tasks.

TYPE: Iterable[Union[Task, dict]]

RETURNS DESCRIPTION
BaseTaskQueue

A new queue instance containing the tasks.

RAISES DESCRIPTION
TypeError

If an item is neither a Task nor a dict.

load_anchor_points staticmethod

load_anchor_points(path: Union[str, Path]) -> List[int]

Load anchor points from a .json or .pkl file.

PARAMETER DESCRIPTION
path

Path to anchor points file. JSON files should contain a list of integer indices. Pickle files may contain a list or a numpy array.

TYPE: Union[str, Path]

RETURNS DESCRIPTION
List[int]

List of integer anchor-point indices.

RAISES DESCRIPTION
FileNotFoundError

If the file does not exist.

to_list

to_list() -> List[Task]

Return a copy of the internal task list.

RETURNS DESCRIPTION
List[Task]

List of all tasks in the queue.

View source

PriorityTaskQueue

Bases: BaseTaskQueue

Execute tasks ordered by priority.

Tasks are sorted by task.protocol.priority at construction time. Higher priority values are executed first by default. Tasks with equal priority maintain their relative order from the original input (stable sort).

This queue uses task.protocol.priority as the sole source of priority. Pre-compute priority values and assign them to tasks before creating the queue.

PARAMETER DESCRIPTION
tasks

An iterable of Task objects to schedule.

TYPE: Iterable[Task]

reverse

If True (default), higher priority values execute first. If False, lower priority values execute first.

TYPE: bool DEFAULT: True

Example
# Assign priorities based on your criteria
for task in tasks:
    task.protocol.priority = compute_priority(task)

# Create queue (higher priority first)
queue = PriorityTaskQueue(tasks)

# Or lower priority first
queue = PriorityTaskQueue(tasks, reverse=False)

__getitem__

__getitem__(idx: int) -> Task
__getitem__(idx: slice) -> BaseTaskQueue
__getitem__(
    idx: Union[int, slice],
) -> Union[Task, BaseTaskQueue]

Get a task by index or a slice of tasks.

PARAMETER DESCRIPTION
idx

Integer index or slice object.

TYPE: Union[int, slice]

RETURNS DESCRIPTION
Union[Task, BaseTaskQueue]

A single Task for integer index, or a new queue instance for slices.

__init__

__init__(
    tasks: Iterable[Task], reverse: bool = True
) -> None

Initialize priority queue with sorted tasks.

PARAMETER DESCRIPTION
tasks

An iterable of Task objects to schedule.

TYPE: Iterable[Task]

reverse

If True (default), higher priority values execute first.

TYPE: bool DEFAULT: True

__iter__

__iter__() -> Iterator[Task]

Yield tasks in priority order.

__len__

__len__() -> int

Return the total number of tasks in the queue.

append

append(task: Task) -> None

Add a task to the end of the queue.

PARAMETER DESCRIPTION
task

The task to append.

TYPE: Task

extend

extend(tasks: Iterable[Task]) -> None

Add multiple tasks to the end of the queue.

PARAMETER DESCRIPTION
tasks

An iterable of tasks to append.

TYPE: Iterable[Task]

from_json_file classmethod

from_json_file(
    path: Union[str, Path], *, limit: Optional[int] = None
) -> BaseTaskQueue

Load tasks from a JSON file.

This helper understands the example file format used in examples/data.json where the top-level object has a data list and optional metadata.

PARAMETER DESCRIPTION
path

Path to the JSON file.

TYPE: Union[str, Path]

limit

Optional limit to the number of tasks to load.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
BaseTaskQueue

A new queue instance containing the loaded tasks.

from_list classmethod

from_list(
    data: Iterable[Union[Task, dict]],
) -> BaseTaskQueue

Create a queue from an iterable of Tasks or dicts.

PARAMETER DESCRIPTION
data

An iterable of Task objects or dicts that can be converted to Tasks.

TYPE: Iterable[Union[Task, dict]]

RETURNS DESCRIPTION
BaseTaskQueue

A new queue instance containing the tasks.

RAISES DESCRIPTION
TypeError

If an item is neither a Task nor a dict.

to_list

to_list() -> List[Task]

Return a copy of the internal task list.

RETURNS DESCRIPTION
List[Task]

List of all tasks in the queue.

View source

AdaptiveTaskQueue

Bases: BaseTaskQueue, BenchmarkCallback, ABC

Abstract base class for adaptive task scheduling.

AdaptiveTaskQueue enables dynamic task ordering based on execution results. It inherits from BenchmarkCallback to integrate with the benchmark's callback system, creating a clean bidirectional communication model:

  • Benchmark → Queue: Via iterator protocol (for task in queue)
  • Queue → Benchmark: Via callback (on_task_repeat_end())

The queue automatically moves completed tasks from _remaining to _completed and calls update_state() to let subclasses adapt their scheduling strategy based on task results.

Subclasses must implement
  • initial_state(): Return initial state dict for adaptive algorithm
  • select_next_task(remaining, state): Choose the next task to execute
  • update_state(task, report, state): Update and return new state

The state dict is managed by the base class: initialized via initial_state() at iteration start, passed to both methods, and updated from update_state() return value. This functional approach keeps state flow explicit while allowing subclasses to store any data they need.

Internal state (managed by base class, do not modify directly): - _remaining: Tasks not yet executed - _completed: Completed tasks paired with their reports - _state: Current adaptive state dict - _stop_flag: Flag to signal early termination

When used with Benchmark.run(), the queue is automatically registered as a callback and receives on_task_repeat_end() notifications.

Example
class IRTTaskQueue(AdaptiveTaskQueue):
    '''Item Response Theory-based adaptive testing.'''

    def initial_state(self) -> Dict[str, Any]:
        return {"ability": 0.0}

    def select_next_task(
        self, remaining: Sequence[Task], state: Dict[str, Any]
    ) -> Optional[Task]:
        # Select task with difficulty closest to current ability estimate
        return min(
            remaining,
            key=lambda t: abs(t.metadata.get("difficulty", 0) - state["ability"])
        )

    def update_state(
        self, task: Task, report: Dict[str, Any], state: Dict[str, Any]
    ) -> Dict[str, Any]:
        # Update ability estimate based on task result
        correct = report.get("eval", [{}])[0].get("correct", False)
        return {"ability": state["ability"] + (0.5 if correct else -0.5)}

queue = IRTTaskQueue(tasks)
results = benchmark.run(queue)  # Auto-registered as callback

__getitem__

__getitem__(idx: int) -> Task
__getitem__(idx: slice) -> BaseTaskQueue
__getitem__(
    idx: Union[int, slice],
) -> Union[Task, BaseTaskQueue]

Get a task by index or a slice of tasks.

PARAMETER DESCRIPTION
idx

Integer index or slice object.

TYPE: Union[int, slice]

RETURNS DESCRIPTION
Union[Task, BaseTaskQueue]

A single Task for integer index, or a new queue instance for slices.

__init__

__init__(tasks: Iterable[Task]) -> None

Initialize adaptive queue.

PARAMETER DESCRIPTION
tasks

An iterable of Task objects to schedule.

TYPE: Iterable[Task]

__iter__

__iter__() -> Iterator[Task]

Yield tasks selected by the adaptive algorithm.

Initializes state via initial_state() at iteration start, then continues until select_next_task() returns None, _remaining is empty, or stop() is called.

Note: select_next_task() is only called when _remaining is non-empty, so implementers don't need to check for empty list.

__len__

__len__() -> int

Return the total number of tasks in the queue.

append

append(task: Task) -> None

Add a task to the end of the queue.

PARAMETER DESCRIPTION
task

The task to append.

TYPE: Task

extend

extend(tasks: Iterable[Task]) -> None

Add multiple tasks to the end of the queue.

PARAMETER DESCRIPTION
tasks

An iterable of tasks to append.

TYPE: Iterable[Task]

from_json_file classmethod

from_json_file(
    path: Union[str, Path], *, limit: Optional[int] = None
) -> BaseTaskQueue

Load tasks from a JSON file.

This helper understands the example file format used in examples/data.json where the top-level object has a data list and optional metadata.

PARAMETER DESCRIPTION
path

Path to the JSON file.

TYPE: Union[str, Path]

limit

Optional limit to the number of tasks to load.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
BaseTaskQueue

A new queue instance containing the loaded tasks.

from_list classmethod

from_list(
    data: Iterable[Union[Task, dict]],
) -> BaseTaskQueue

Create a queue from an iterable of Tasks or dicts.

PARAMETER DESCRIPTION
data

An iterable of Task objects or dicts that can be converted to Tasks.

TYPE: Iterable[Union[Task, dict]]

RETURNS DESCRIPTION
BaseTaskQueue

A new queue instance containing the tasks.

RAISES DESCRIPTION
TypeError

If an item is neither a Task nor a dict.

gather_traces

gather_traces() -> dict[str, Any]

Gather execution traces from this callback.

By default, callbacks don't store traces, but subclasses can override this to provide custom tracing data.

RETURNS DESCRIPTION
dict[str, Any]

Dictionary with basic callback information. Subclasses should

dict[str, Any]

extend this with their own data.

initial_state abstractmethod

initial_state() -> Dict[str, Any]

Return the initial state for adaptive selection.

This state dict will be passed to select_next_task() and update_state() throughout the benchmark run. Store any data your adaptive algorithm needs (ability estimates, history, etc.).

RETURNS DESCRIPTION
Dict[str, Any]

Initial state dict. Can contain any keys/values you need.

on_event

on_event(event_name: str, **data) -> None

Handle a generic event.

on_task_repeat_end

on_task_repeat_end(
    benchmark: Benchmark, report: Dict[str, Any]
) -> None

BenchmarkCallback hook called after each task repetition completes.

This method extracts the task from the report, moves it from _remaining to _completed, and calls update_state() to let the subclass update its adaptive model.

PARAMETER DESCRIPTION
benchmark

The benchmark instance (unused in this implementation).

TYPE: Benchmark

report

The execution report containing task_id and results.

TYPE: Dict[str, Any]

select_next_task abstractmethod

select_next_task(
    remaining: Sequence[Task], state: Dict[str, Any]
) -> Optional[Task]

Select the next task to execute.

Implement this method to define your adaptive selection algorithm (e.g., IRT-based selection, uncertainty sampling, bandit algorithms).

PARAMETER DESCRIPTION
remaining

Read-only sequence of tasks not yet executed. Do not modify this sequence; the queue manages task lifecycle.

TYPE: Sequence[Task]

state

Current adaptive state from initial_state() or update_state().

TYPE: Dict[str, Any]

RETURNS DESCRIPTION
Optional[Task]

The next Task to execute from remaining, or None to

Optional[Task]

signal early termination.

Note

This method is only called when remaining is non-empty, so you don't need to check for an empty sequence.

stop

stop() -> None

Signal that no more tasks should be processed.

Call this from update_state() to trigger early termination (e.g., when confidence threshold is reached).

The _stop_flag is checked in __iter__, which will stop yielding tasks and naturally terminate the benchmark's iteration loop via Python's iterator protocol.

to_list

to_list() -> List[Task]

Return a copy of the internal task list.

RETURNS DESCRIPTION
List[Task]

List of all tasks in the queue.

update_state abstractmethod

update_state(
    task: Task,
    report: Dict[str, Any],
    state: Dict[str, Any],
) -> Dict[str, Any]

Update state after task completion.

Implement this method to update ability estimates, difficulty models, or other adaptive state based on task results.

PARAMETER DESCRIPTION
task

The task that just completed.

TYPE: Task

report

The execution report containing status and eval results.

TYPE: Dict[str, Any]

state

Current state dict.

TYPE: Dict[str, Any]

RETURNS DESCRIPTION
Dict[str, Any]

Updated state dict (can be the same dict mutated, or a new dict).

Note

Call self.stop() here to halt iteration before the next task selection.