Tasks
Collection of tasks.
The debusine.tasks module hierarchy hosts a collection of Task
that are
used by workers to fulfill WorkRequest sent by the debusine scheduler.
Creating a new task requires adding a new file containing a class inheriting
from the Task
base class. The name of the class must be unique among
all child classes.
A child class must, at the very least, override the Task.execute()
method.
- class debusine.tasks.Task[source]
Base class for tasks.
A Task object serves two purpose: encapsulating the logic of what needs to be done to execute the task (cf
configure()
andexecute()
that are run on a worker), and supporting the scheduler by determining if a task is suitable for a given worker. That is done in a two-step process, collating metadata from each worker (with theanalyze_worker()
method that is run on a worker) and then, based on this metadata, see if a task is suitable (withcan_run_on()
that is executed on the scheduler).- TASK_DATA_SCHEMA: dict[str, Any] = {}
Can be overridden to enable jsonschema validation of the
task_data
parameter passed toconfigure()
.
- TASK_VERSION: Optional[int] = None
Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.
- property aborted: bool
Return if the task is aborted.
Tasks cannot transition from aborted -> not-aborted.
- analyze_worker() dict [source]
Return dynamic metadata about the current worker.
This method is called on the worker to collect information about the worker. The information is stored as a set of key-value pairs in a dictionary.
That information is then reused on the scheduler to be fed to
can_run_on()
and determine if a task is suitable to be executed on the worker.Derived objects can extend the behaviour by overriding the method, calling
metadata = super().analyze_worker()
, and then adding supplementary data in the dictionary.To avoid conflicts on the names of the keys used by different tasks you should use key names obtained with
self.prefix_with_task_name(...)
.- Returns:
a dictionary describing the worker.
- Return type:
dict.
- classmethod analyze_worker_all_tasks()[source]
Return dictionary with metadata for each task in Task._sub_tasks.
Subclasses of Task get registered in Task._sub_tasks. Return a dictionary with the metadata of each of the subtasks.
This method is executed in the worker when submitting the dynamic metadata.
- can_run_on(worker_metadata: dict) bool [source]
Check if the specified worker can run the task.
This method shall take its decision solely based on the supplied
worker_metadata
and on the configured task data (self.data
).The default implementation returns always True except if there’s a mismatch between the :py:attribute:TASK_VERSION on the scheduler side and on the worker side.
Derived objects can implement further checks by overriding the method in the following way:
if not super().can_run_on(worker_metadata): return False if ...: return False return True
- Parameters:
worker_metadata (dict) – The metadata collected from the worker by running
analyze_worker()
on all the tasks on the worker under consideration.- Returns:
the boolean result of the check.
- Return type:
bool.
- static class_from_name(sub_task_class_name: str) Type[Task] [source]
Return class for :param sub_task_class_name (case-insensitive).
__init_subclass__() registers Task subclasses’ into Task._sub_tasks.
- configure(task_data)[source]
Configure the task with the supplied
task_data
.The supplied data is first validated against the JSON schema defined in the TASK_DATA_SCHEMA class attribute. If validation fails, a TaskConfigError is raised. Otherwise, the supplied task_data is stored in the data attribute.
Derived objects can extend the behaviour by overriding the method and calling
super().configure(task_data)
however the extra checks must not access any resource of the worker as the method can also be executed on the server when it tries to schedule work requests.- Parameters:
task_data (dict) – The supplied data describing the task.
- Raises:
TaskConfigError – if the JSON schema is not respected.
- create_debug_log_file(filename: str, *, mode='w') Union[TextIO, BinaryIO] [source]
Create a temporary file, open and return writer.
The caller must call .close() when finished writing.
- execute() bool [source]
Call the _execute() method, upload debug artifacts.
See _execute() for more information.
- Returns:
result of the _execute() method.
- static is_valid_task_name(task_name) bool [source]
Return True if task_name is registered (its class is imported).
- logger
A
logging.Logger
instance that can be used in child classes when you override methods to implement the task.
- name
The name of the task. It is computed by
__init__()
by converting the class name to lowercase.