Tasks

Collection of tasks.

The debusine.tasks module hierarchy hosts a collection of Task that are used by workers to fulfill WorkRequest sent by the debusine scheduler.

Creating a new task requires adding a new file containing a class inheriting from the Task base class. The name of the class must be unique among all child classes.

A child class must, at the very least, override the Task.execute() method.

class debusine.tasks.Task[source]

Base class for tasks.

A Task object serves two purpose: encapsulating the logic of what needs to be done to execute the task (cf configure() and execute() that are run on a worker), and supporting the scheduler by determining if a task is suitable for a given worker. That is done in a two-step process, collating metadata from each worker (with the analyze_worker() method that is run on a worker) and then, based on this metadata, see if a task is suitable (with can_run_on() that is executed on the scheduler).

TASK_DATA_SCHEMA: dict[str, Any] = {}

Can be overridden to enable jsonschema validation of the task_data parameter passed to configure().

TASK_VERSION: Optional[int] = None

Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.

__init__()[source]

Initialize the task.

abort()[source]

Task does not need to be executed. Once aborted cannot be changed.

property aborted: bool

Return if the task is aborted.

Tasks cannot transition from aborted -> not-aborted.

analyze_worker() dict[source]

Return dynamic metadata about the current worker.

This method is called on the worker to collect information about the worker. The information is stored as a set of key-value pairs in a dictionary.

That information is then reused on the scheduler to be fed to can_run_on() and determine if a task is suitable to be executed on the worker.

Derived objects can extend the behaviour by overriding the method, calling metadata = super().analyze_worker(), and then adding supplementary data in the dictionary.

To avoid conflicts on the names of the keys used by different tasks you should use key names obtained with self.prefix_with_task_name(...).

Returns:

a dictionary describing the worker.

Return type:

dict.

classmethod analyze_worker_all_tasks()[source]

Return dictionary with metadata for each task in Task._sub_tasks.

Subclasses of Task get registered in Task._sub_tasks. Return a dictionary with the metadata of each of the subtasks.

This method is executed in the worker when submitting the dynamic metadata.

can_run_on(worker_metadata: dict) bool[source]

Check if the specified worker can run the task.

This method shall take its decision solely based on the supplied worker_metadata and on the configured task data (self.data).

The default implementation returns always True except if there’s a mismatch between the :py:attribute:TASK_VERSION on the scheduler side and on the worker side.

Derived objects can implement further checks by overriding the method in the following way:

if not super().can_run_on(worker_metadata):
    return False

if ...:
    return False

return True
Parameters:

worker_metadata (dict) – The metadata collected from the worker by running analyze_worker() on all the tasks on the worker under consideration.

Returns:

the boolean result of the check.

Return type:

bool.

static class_from_name(sub_task_class_name: str) Type[Task][source]

Return class for :param sub_task_class_name (case-insensitive).

__init_subclass__() registers Task subclasses’ into Task._sub_tasks.

configure(task_data)[source]

Configure the task with the supplied task_data.

The supplied data is first validated against the JSON schema defined in the TASK_DATA_SCHEMA class attribute. If validation fails, a TaskConfigError is raised. Otherwise, the supplied task_data is stored in the data attribute.

Derived objects can extend the behaviour by overriding the method and calling super().configure(task_data) however the extra checks must not access any resource of the worker as the method can also be executed on the server when it tries to schedule work requests.

Parameters:

task_data (dict) – The supplied data describing the task.

Raises:

TaskConfigError – if the JSON schema is not respected.

configure_server_access(debusine: Debusine)[source]

Set the object to access the server.

create_debug_log_file(filename: str, *, mode='w') Union[TextIO, BinaryIO][source]

Create a temporary file, open and return writer.

The caller must call .close() when finished writing.

execute() bool[source]

Call the _execute() method, upload debug artifacts.

See _execute() for more information.

Returns:

result of the _execute() method.

execute_logging_exceptions() bool[source]

Execute self.execute() logging any raised exceptions.

static is_valid_task_name(task_name) bool[source]

Return True if task_name is registered (its class is imported).

logger

A logging.Logger instance that can be used in child classes when you override methods to implement the task.

name

The name of the task. It is computed by __init__() by converting the class name to lowercase.

prefix_with_task_name(text: str) str[source]
Returns:

the text prefixed with the task name and a colon.

static task_names() list[str][source]

Return list of sub-task names.

exception debusine.tasks.TaskConfigError[source]

Halt the task due to invalid configuration.