Source code for debusine.tasks._task

# Copyright 2021-2022 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""
Collection of tasks.

The debusine.tasks module hierarchy hosts a collection of :class:`Task` that are
used by workers to fulfill WorkRequest sent by the debusine scheduler.

Creating a new task requires adding a new file containing a class inheriting
from the :class:`Task` base class. The name of the class must be unique among
all child classes.

A child class must, at the very least, override the :py:meth:`Task.execute`
method.
"""
import copy
import logging
import tempfile
from pathlib import Path
from typing import Any, BinaryIO, Optional, TextIO, Type, Union

import jsonschema

from debusine.client.debusine import Debusine
from debusine.client.local_artifact import WorkRequestDebugLogs


[docs]class TaskConfigError(Exception): """Halt the task due to invalid configuration."""
[docs]class Task: """ Base class for tasks. A Task object serves two purpose: encapsulating the logic of what needs to be done to execute the task (cf :py:meth:`configure` and :py:meth:`execute` that are run on a worker), and supporting the scheduler by determining if a task is suitable for a given worker. That is done in a two-step process, collating metadata from each worker (with the :py:meth:`analyze_worker` method that is run on a worker) and then, based on this metadata, see if a task is suitable (with :py:meth:`can_run_on` that is executed on the scheduler). """ #: Must be overridden by child classes to document the current version of #: the task's code. A task will only be scheduled on a worker if its task #: version is the same as the one running on the scheduler. TASK_VERSION: Optional[int] = None #: Can be overridden to enable jsonschema validation of the ``task_data`` #: parameter passed to :py:meth:`configure`. TASK_DATA_SCHEMA: dict[str, Any] = {} _TASK_DATA_NOTIFICATIONS_SCHEMA = { "type": "object", "properties": { "on_failure": { "type": "array", "items": { "type": "object", "properties": { "channel": {"type": "string"}, "data": {"type": "object"}, }, }, "required": ["channel"], } }, "required": ["on_failure"], } _sub_tasks: dict[str, Type["Task"]] = {} def __init_subclass__(cls, **kwargs): """ Register the subclass into Task._sub_tasks. Used by Task.class_from_name() to return the class given the name. """ super().__init_subclass__(**kwargs) sub_task_name_lowercase = cls.__name__.lower() # The same sub-task could register twice # (but assert that is the *same* class, not a different # subtask with a name with a different capitalisation) assert ( sub_task_name_lowercase not in cls._sub_tasks or cls._sub_tasks[sub_task_name_lowercase] == cls ) cls._sub_tasks[sub_task_name_lowercase] = cls
[docs] def __init__(self): """Initialize the task.""" #: Validated task data submitted through :py:meth:`configure` without # Task generic data (e.g. "notifications") self.data = None #: The name of the task. It is computed by :py:meth:`__init__` by #: converting the class name to lowercase. self.name = self.__class__.__name__.lower() #: A :class:`logging.Logger` instance that can be used in child classes #: when you override methods to implement the task. self.logger = logging.getLogger("debusine.tasks") # Task is aborted: the task does not need to be executed, and can be # stopped if it is already running self._aborted = False self.work_request: Optional[int] = None self.debusine: Optional[Debusine] = None self._debug_log_files_directory: Optional[ tempfile.TemporaryDirectory ] = None
[docs] def configure_server_access(self, debusine: Debusine): """Set the object to access the server.""" self.debusine = debusine
[docs] def create_debug_log_file( self, filename: str, *, mode="w" ) -> Union[TextIO, BinaryIO]: """ Create a temporary file, open and return writer. The caller must call .close() when finished writing. """ if self._debug_log_files_directory is None: self._debug_log_files_directory = tempfile.TemporaryDirectory( prefix="debusine-task-debug-log-files-" ) debug_file = Path(self._debug_log_files_directory.name) / filename return debug_file.open(mode)
[docs] def prefix_with_task_name(self, text: str) -> str: """:return: the ``text`` prefixed with the task name and a colon.""" return f"{self.name}:{text}"
[docs] def analyze_worker(self) -> dict: """ Return dynamic metadata about the current worker. This method is called on the worker to collect information about the worker. The information is stored as a set of key-value pairs in a dictionary. That information is then reused on the scheduler to be fed to :py:meth:`can_run_on` and determine if a task is suitable to be executed on the worker. Derived objects can extend the behaviour by overriding the method, calling ``metadata = super().analyze_worker()``, and then adding supplementary data in the dictionary. To avoid conflicts on the names of the keys used by different tasks you should use key names obtained with ``self.prefix_with_task_name(...)``. :return: a dictionary describing the worker. :rtype: dict. """ version_key_name = self.prefix_with_task_name("version") return { version_key_name: self.TASK_VERSION, }
[docs] @classmethod def analyze_worker_all_tasks(cls): """ Return dictionary with metadata for each task in Task._sub_tasks. Subclasses of Task get registered in Task._sub_tasks. Return a dictionary with the metadata of each of the subtasks. This method is executed in the worker when submitting the dynamic metadata. """ metadata = {} for task_class in cls._sub_tasks.values(): task = task_class() metadata.update(task.analyze_worker()) return metadata
[docs] def can_run_on(self, worker_metadata: dict) -> bool: """ Check if the specified worker can run the task. This method shall take its decision solely based on the supplied ``worker_metadata`` and on the configured task data (``self.data``). The default implementation returns always True except if there's a mismatch between the :py:attribute:TASK_VERSION on the scheduler side and on the worker side. Derived objects can implement further checks by overriding the method in the following way:: if not super().can_run_on(worker_metadata): return False if ...: return False return True :param dict worker_metadata: The metadata collected from the worker by running :py:meth:`analyze_worker` on all the tasks on the worker under consideration. :return: the boolean result of the check. :rtype: bool. """ version_key_name = self.prefix_with_task_name("version") if worker_metadata.get(version_key_name) != self.TASK_VERSION: return False return True
[docs] def configure(self, task_data): """ Configure the task with the supplied ``task_data``. The supplied data is first validated against the JSON schema defined in the TASK_DATA_SCHEMA class attribute. If validation fails, a TaskConfigError is raised. Otherwise, the supplied `task_data` is stored in the `data` attribute. Derived objects can extend the behaviour by overriding the method and calling ``super().configure(task_data)`` however the extra checks must not access any resource of the worker as the method can also be executed on the server when it tries to schedule work requests. :param dict task_data: The supplied data describing the task. :raises TaskConfigError: if the JSON schema is not respected. """ task_data = copy.deepcopy(task_data) data_notifications = task_data.pop("notifications", None) if data_notifications is not None: try: jsonschema.validate( data_notifications, self._TASK_DATA_NOTIFICATIONS_SCHEMA, ) except jsonschema.ValidationError as exc: raise TaskConfigError(exc.message) try: jsonschema.validate(task_data, self.TASK_DATA_SCHEMA) except jsonschema.ValidationError as exc: raise TaskConfigError(exc.message) self.data = task_data
[docs] def execute_logging_exceptions(self) -> bool: """Execute self.execute() logging any raised exceptions.""" try: return self.execute() except Exception as exc: self.logger.exception("Exception in Task %s", self.name) raise exc
[docs] def execute(self) -> bool: """ Call the _execute() method, upload debug artifacts. See _execute() for more information. :return: result of the _execute() method. """ # noqa: D402 result = self._execute() self._upload_work_request_debug_logs() return result
def _execute(self) -> bool: """ Execute the requested task. The task must first have been configured. It is allowed to take as much time as required. This method will only be run on a worker. It is thus allowed to access resources local to the worker. It is recommended to fail early by raising a :py:exc:TaskConfigError if the parameters of the task let you anticipate that it has no chance of completing successfully. :return: True to indicate success, False for a failure. :rtype: bool. :raises TaskConfigError: if the parameters of the work request are incompatible with the worker. """ raise NotImplementedError()
[docs] def abort(self): """Task does not need to be executed. Once aborted cannot be changed.""" self._aborted = True
@property def aborted(self) -> bool: """ Return if the task is aborted. Tasks cannot transition from aborted -> not-aborted. """ return self._aborted
[docs] @staticmethod def class_from_name(sub_task_class_name: str) -> Type["Task"]: """ Return class for :param sub_task_class_name (case-insensitive). __init_subclass__() registers Task subclasses' into Task._sub_tasks. """ sub_task_class_name_lowercase = sub_task_class_name.lower() if sub_task_class_name_lowercase in Task._sub_tasks: return Task._sub_tasks[sub_task_class_name_lowercase] raise ValueError( f"'{sub_task_class_name_lowercase}' is not a registered task_name" )
[docs] @staticmethod def is_valid_task_name(task_name) -> bool: """Return True if task_name is registered (its class is imported).""" return task_name.lower() in Task._sub_tasks
[docs] @staticmethod def task_names() -> list[str]: """Return list of sub-task names.""" return sorted(Task._sub_tasks)
def _upload_work_request_debug_logs(self): """ Create a WorkRequestDebugLogs artifact and upload the logs. The logs might exist in self._debug_log_files_directory and were added via self.create_debug_log_file(). If self._source_artifact_id is not None: create a relation from the created WorkRequestDebugLogs to it. """ if self._debug_log_files_directory is None: return work_request_debug_logs_artifact = WorkRequestDebugLogs.create( files=Path(self._debug_log_files_directory.name).glob("*") ) remote_artifact = self.debusine.upload_artifact( work_request_debug_logs_artifact, workspace=self._workspace, work_request=self.work_request, ) if self._source_artifact_id is not None: self.debusine.relation_create( remote_artifact.id, self._source_artifact_id, "relates-to", ) self._debug_log_files_directory.cleanup() self._debug_log_files_directory = None