# Copyright 2021-2022 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""
Collection of tasks.
The debusine.tasks module hierarchy hosts a collection of :class:`Task` that are
used by workers to fulfill WorkRequest sent by the debusine scheduler.
Creating a new task requires adding a new file containing a class inheriting
from the :class:`Task` base class. The name of the class must be unique among
all child classes.
A child class must, at the very least, override the :py:meth:`Task.execute`
method.
"""
import copy
import logging
import tempfile
from pathlib import Path
from typing import Any, BinaryIO, Optional, TextIO, Type, Union
import jsonschema
from debusine.client.debusine import Debusine
from debusine.client.local_artifact import WorkRequestDebugLogs
[docs]class TaskConfigError(Exception):
"""Halt the task due to invalid configuration."""
[docs]class Task:
"""
Base class for tasks.
A Task object serves two purpose: encapsulating the logic of what
needs to be done to execute the task (cf :py:meth:`configure`
and :py:meth:`execute` that are run on a worker), and supporting the
scheduler by determining if a task is suitable for a given worker. That is
done in a two-step process, collating metadata from each worker (with the
:py:meth:`analyze_worker` method that is run on a worker) and then,
based on this metadata, see if a task is suitable (with
:py:meth:`can_run_on` that is executed on the scheduler).
"""
#: Must be overridden by child classes to document the current version of
#: the task's code. A task will only be scheduled on a worker if its task
#: version is the same as the one running on the scheduler.
TASK_VERSION: Optional[int] = None
#: Can be overridden to enable jsonschema validation of the ``task_data``
#: parameter passed to :py:meth:`configure`.
TASK_DATA_SCHEMA: dict[str, Any] = {}
_TASK_DATA_NOTIFICATIONS_SCHEMA = {
"type": "object",
"properties": {
"on_failure": {
"type": "array",
"items": {
"type": "object",
"properties": {
"channel": {"type": "string"},
"data": {"type": "object"},
},
},
"required": ["channel"],
}
},
"required": ["on_failure"],
}
_sub_tasks: dict[str, Type["Task"]] = {}
def __init_subclass__(cls, **kwargs):
"""
Register the subclass into Task._sub_tasks.
Used by Task.class_from_name() to return the class given the name.
"""
super().__init_subclass__(**kwargs)
sub_task_name_lowercase = cls.__name__.lower()
# The same sub-task could register twice
# (but assert that is the *same* class, not a different
# subtask with a name with a different capitalisation)
assert (
sub_task_name_lowercase not in cls._sub_tasks
or cls._sub_tasks[sub_task_name_lowercase] == cls
)
cls._sub_tasks[sub_task_name_lowercase] = cls
[docs] def __init__(self):
"""Initialize the task."""
#: Validated task data submitted through :py:meth:`configure` without
# Task generic data (e.g. "notifications")
self.data = None
#: The name of the task. It is computed by :py:meth:`__init__` by
#: converting the class name to lowercase.
self.name = self.__class__.__name__.lower()
#: A :class:`logging.Logger` instance that can be used in child classes
#: when you override methods to implement the task.
self.logger = logging.getLogger("debusine.tasks")
# Task is aborted: the task does not need to be executed, and can be
# stopped if it is already running
self._aborted = False
self.work_request: Optional[int] = None
self.debusine: Optional[Debusine] = None
self._debug_log_files_directory: Optional[
tempfile.TemporaryDirectory
] = None
[docs] def create_debug_log_file(
self, filename: str, *, mode="w"
) -> Union[TextIO, BinaryIO]:
"""
Create a temporary file, open and return writer.
The caller must call .close() when finished writing.
"""
if self._debug_log_files_directory is None:
self._debug_log_files_directory = tempfile.TemporaryDirectory(
prefix="debusine-task-debug-log-files-"
)
debug_file = Path(self._debug_log_files_directory.name) / filename
return debug_file.open(mode)
[docs] def prefix_with_task_name(self, text: str) -> str:
""":return: the ``text`` prefixed with the task name and a colon."""
return f"{self.name}:{text}"
[docs] def analyze_worker(self) -> dict:
"""
Return dynamic metadata about the current worker.
This method is called on the worker to collect information about the
worker. The information is stored as a set of key-value pairs in a
dictionary.
That information is then reused on the scheduler to be fed to
:py:meth:`can_run_on` and determine if a task is suitable to be
executed on the worker.
Derived objects can extend the behaviour by overriding
the method, calling ``metadata = super().analyze_worker()``,
and then adding supplementary data in the dictionary.
To avoid conflicts on the names of the keys used by different tasks
you should use key names obtained with
``self.prefix_with_task_name(...)``.
:return: a dictionary describing the worker.
:rtype: dict.
"""
version_key_name = self.prefix_with_task_name("version")
return {
version_key_name: self.TASK_VERSION,
}
[docs] @classmethod
def analyze_worker_all_tasks(cls):
"""
Return dictionary with metadata for each task in Task._sub_tasks.
Subclasses of Task get registered in Task._sub_tasks. Return
a dictionary with the metadata of each of the subtasks.
This method is executed in the worker when submitting the dynamic
metadata.
"""
metadata = {}
for task_class in cls._sub_tasks.values():
task = task_class()
metadata.update(task.analyze_worker())
return metadata
[docs] def can_run_on(self, worker_metadata: dict) -> bool:
"""
Check if the specified worker can run the task.
This method shall take its decision solely based on the supplied
``worker_metadata`` and on the configured task data (``self.data``).
The default implementation returns always True except if there's
a mismatch between the :py:attribute:TASK_VERSION on the scheduler
side and on the worker side.
Derived objects can implement further checks by overriding the method
in the following way::
if not super().can_run_on(worker_metadata):
return False
if ...:
return False
return True
:param dict worker_metadata: The metadata collected from the worker by
running :py:meth:`analyze_worker` on all the tasks on the worker
under consideration.
:return: the boolean result of the check.
:rtype: bool.
"""
version_key_name = self.prefix_with_task_name("version")
if worker_metadata.get(version_key_name) != self.TASK_VERSION:
return False
return True
[docs] def execute_logging_exceptions(self) -> bool:
"""Execute self.execute() logging any raised exceptions."""
try:
return self.execute()
except Exception as exc:
self.logger.exception("Exception in Task %s", self.name)
raise exc
[docs] def execute(self) -> bool:
"""
Call the _execute() method, upload debug artifacts.
See _execute() for more information.
:return: result of the _execute() method.
""" # noqa: D402
result = self._execute()
self._upload_work_request_debug_logs()
return result
def _execute(self) -> bool:
"""
Execute the requested task.
The task must first have been configured. It is allowed to take
as much time as required. This method will only be run on a worker. It
is thus allowed to access resources local to the worker.
It is recommended to fail early by raising a :py:exc:TaskConfigError if
the parameters of the task let you anticipate that it has no chance of
completing successfully.
:return: True to indicate success, False for a failure.
:rtype: bool.
:raises TaskConfigError: if the parameters of the work request are
incompatible with the worker.
"""
raise NotImplementedError()
[docs] def abort(self):
"""Task does not need to be executed. Once aborted cannot be changed."""
self._aborted = True
@property
def aborted(self) -> bool:
"""
Return if the task is aborted.
Tasks cannot transition from aborted -> not-aborted.
"""
return self._aborted
[docs] @staticmethod
def class_from_name(sub_task_class_name: str) -> Type["Task"]:
"""
Return class for :param sub_task_class_name (case-insensitive).
__init_subclass__() registers Task subclasses' into Task._sub_tasks.
"""
sub_task_class_name_lowercase = sub_task_class_name.lower()
if sub_task_class_name_lowercase in Task._sub_tasks:
return Task._sub_tasks[sub_task_class_name_lowercase]
raise ValueError(
f"'{sub_task_class_name_lowercase}' is not a registered task_name"
)
[docs] @staticmethod
def is_valid_task_name(task_name) -> bool:
"""Return True if task_name is registered (its class is imported)."""
return task_name.lower() in Task._sub_tasks
[docs] @staticmethod
def task_names() -> list[str]:
"""Return list of sub-task names."""
return sorted(Task._sub_tasks)
def _upload_work_request_debug_logs(self):
"""
Create a WorkRequestDebugLogs artifact and upload the logs.
The logs might exist in self._debug_log_files_directory and were
added via self.create_debug_log_file().
If self._source_artifact_id is not None: create a relation from the
created WorkRequestDebugLogs to it.
"""
if self._debug_log_files_directory is None:
return
work_request_debug_logs_artifact = WorkRequestDebugLogs.create(
files=Path(self._debug_log_files_directory.name).glob("*")
)
remote_artifact = self.debusine.upload_artifact(
work_request_debug_logs_artifact,
workspace=self._workspace,
work_request=self.work_request,
)
if self._source_artifact_id is not None:
self.debusine.relation_create(
remote_artifact.id,
self._source_artifact_id,
"relates-to",
)
self._debug_log_files_directory.cleanup()
self._debug_log_files_directory = None