Writing a new worker task for Debusine

Tutorial on how to write a worker task for Debusine

Debusine is a tool designed for Debian developers and Operating System developers in general. You can try out Debusine on debusine.debian.net, and follow its development on salsa.debian.org.

This post describes how to write a new worker task for Debusine. It can be used to add tasks to a self-hosted Debusine instance, or to submit to the Debusine project new tasks to add new capabilities to Debusine.

Tasks are the lower-level pieces of Debusine workflows. Examples of tasks are Sbuild, Lintian, Debdiff (see the available tasks).

This post will document the steps to write a new basic worker task. The example will add a worker task that runs reprotest and creates an artifact of the new type ReprotestArtifact with the reprotest log.

Tasks are usually used by workflows. Workflows solve high-level goals by creating and orchestrating different tasks (e.g. a Sbuild workflow would create different Sbuild tasks, one for each architecture).

Overview of tasks

A task usually does the following:

  • It receives structured data defining its input artifacts and configuration
  • Input artifacts are downloaded
  • A process is run by the worker (e.g. lintian, debdiff, etc.). In this blog post, it will run reprotest
  • The output (files, logs, exit code, etc.) is analyzed, artifacts and relations might be generated, and the work request is marked as completed, either with Success or Failure

If you want to follow the tutorial and add the Reprotest task, your Debusine development instance should have at least one worker, one user, a debusine client set up, and permissions for the client to create tasks. All of this can be setup following the steps in the Contribute section of the documentation.

This blog post shows a functional Reprotest task. This task is not currently part of Debusine. The Reprotest task implementation is simplified (no error handling, unit tests, specific view, docs, some shortcuts in the environment preparation, etc.). At some point, in Debusine, we might add a debrebuild task which is based on buildinfo files and uses snapshot.debian.org to recreate the binary packages.

Defining the inputs of the task

The input of the reprotest task will be a source artifact (a Debian source package). We model the input with pydantic in debusine/tasks/models.py:

class ReprotestData(BaseTaskDataWithExecutor):
   """Data for Reprotest task."""

   source_artifact: LookupSingle

class ReprotestDynamicData(BaseDynamicTaskDataWithExecutor):
   """Reprotest dynamic data."""

   source_artifact_id: int | None = None

The ReprotestData is what the user will input. A LookupSingle is a lookup that resolves to a single artifact.

We would also have configuration for the desired variations to test, but we have left that out of this example for simplicity. Configuring variations is left as an exercise for the reader.

Since ReprotestData is a subclass of BaseTaskDataWithExecutor it also contains environment where the user can specify in which environment the task will run. The environment is an artifact with a Debian image.

The ReprotestDynamicData holds the resolution of all lookups. These can be seen in the “Internals” tab of the work request view.

Add the new Reprotest artifact data class

In order for the reprotest task to create a new Artifact of the type DebianReprotest with the log and output metadata: add the new category to ArtifactCategory in debusine/artifacts/models.py:

    REPROTEST = "debian:reprotest"

In the same file add the DebianReprotest class:

class DebianReprotest(ArtifactData):
   """Data for debian:reprotest artifacts."""

   reproducible: bool | None = None

   def get_label(self) -> str:
       """Return a short human-readable label for the artifact."""
       return "reprotest analysis"

It could also include the package name or version.

In order to have the category listed in the work request output artifacts table, edit the file debusine/db/models/artifacts.py: In ARTIFACT_CATEGORY_ICON_NAMES add ArtifactCategory.REPROTEST: "folder", and in ARTIFACT_CATEGORY_SHORT_NAMES add ArtifactCategory.REPROTEST: "reprotest",.

Create the new Task class

In debusine/tasks/ create a new file reprotest.py.

reprotest.py
# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Task to use reprotest in debusine."""

from pathlib import Path
from typing import Any

from debusine import utils
from debusine.artifacts.local_artifact import ReprotestArtifact
from debusine.artifacts.models import (
    ArtifactCategory,
    CollectionCategory,
    DebianSourcePackage,
    DebianUpload,
    WorkRequestResults,
    get_source_package_name,
    get_source_package_version,
)
from debusine.client.models import RelationType
from debusine.tasks import BaseTaskWithExecutor, RunCommandTask
from debusine.tasks.models import ReprotestData, ReprotestDynamicData
from debusine.tasks.server import TaskDatabaseInterface


class Reprotest(
    RunCommandTask[ReprotestData, ReprotestDynamicData],
    BaseTaskWithExecutor[ReprotestData, ReprotestDynamicData],
):
    """Task to use reprotest in debusine."""

    TASK_VERSION = 1

    CAPTURE_OUTPUT_FILENAME = "reprotest.log"

    def __init__(
        self,
        task_data: dict[str, Any],
        dynamic_task_data: dict[str, Any] | None = None,
    ) -> None:
        """Initialize object."""
        super().__init__(task_data, dynamic_task_data)

        self._reprotest_target: Path | None = None

    def build_dynamic_data(
        self, task_database: TaskDatabaseInterface
    ) -> ReprotestDynamicData:
        """Compute and return ReprotestDynamicData."""
        input_source_artifact = task_database.lookup_single_artifact(
            self.data.source_artifact
        )

        assert input_source_artifact is not None
        self.ensure_artifact_categories(
            configuration_key="input.source_artifact",
            category=input_source_artifact.category,
            expected=(
                ArtifactCategory.SOURCE_PACKAGE,
                ArtifactCategory.UPLOAD,
            ),
        )
        assert isinstance(
            input_source_artifact.data, (DebianSourcePackage, DebianUpload)
        )
        subject = get_source_package_name(input_source_artifact.data)
        version = get_source_package_version(input_source_artifact.data)

        assert self.data.environment is not None

        environment = self.get_environment(
            task_database,
            self.data.environment,
            default_category=CollectionCategory.ENVIRONMENTS,
        )

        return ReprotestDynamicData(
            source_artifact_id=input_source_artifact.id,
            subject=subject,
            parameter_summary=f"{subject}_{version}",
            environment_id=environment.id,
        )

    def get_input_artifacts_ids(self) -> list[int]:
        """Return the list of input artifact IDs used by this task."""
        if not self.dynamic_data:
            return []

        return [
            self.dynamic_data.source_artifact_id,
            self.dynamic_data.environment_id,
        ]

    def fetch_input(self, destination: Path) -> bool:
        """Download the required artifacts."""
        assert self.dynamic_data

        artifact_id = self.dynamic_data.source_artifact_id
        assert artifact_id is not None
        self.fetch_artifact(artifact_id, destination)

        return True

    def configure_for_execution(self, download_directory: Path) -> bool:
        """
        Find a .dsc in download_directory.

        Install reprotest and other utilities used in _cmdline.
        Set self._reprotest_target to it.

        :param download_directory: where to search the files
        :return: True if valid files were found
        """
        self._prepare_executor_instance()

        if self.executor_instance is None:
            raise AssertionError("self.executor_instance cannot be None")

        self.run_executor_command(
            ["apt-get", "update"],
            log_filename="install.log",
            run_as_root=True,
            check=True,
        )
        self.run_executor_command(
            [
                "apt-get",
                "--yes",
                "--no-install-recommends",
                "install",
                "reprotest",
                "dpkg-dev",
                "devscripts",
                "equivs",
                "sudo",
            ],
            log_filename="install.log",
            run_as_root=True,
        )

        self._reprotest_target = utils.find_file_suffixes(
            download_directory, [".dsc"]
        )
        return True

    def _cmdline(self) -> list[str]:
        """
        Build the reprotest command line.

        Use configuration of self.data and self._reprotest_target.
        """
        target = self._reprotest_target
        assert target is not None

        cmd = [
            "bash",
            "-c",
            f"TMPDIR=/tmp ; cd /tmp ; dpkg-source -x {target} package/; "
            "cd package/ ; mk-build-deps ; apt-get install --yes ./*.deb ; "
            "rm *.deb ; "
            "reprotest --vary=-time,-user_group,-fileordering,-domain_host .",
        ]

        return cmd

    @staticmethod
    def _cmdline_as_root() -> bool:
        r"""apt-get install --yes ./\*.deb must be run as root."""
        return True

    def task_result(
        self,
        returncode: int | None,
        execute_directory: Path,  # noqa: U100
    ) -> WorkRequestResults:
        """
        Evaluate task output and return success.

        For a successful run of reprotest:
        -must have the output file
        -exit code is 0

        :return: WorkRequestResults.SUCCESS or WorkRequestResults.FAILURE.
        """
        reprotest_file = execute_directory / self.CAPTURE_OUTPUT_FILENAME

        if reprotest_file.exists() and returncode == 0:
            return WorkRequestResults.SUCCESS

        return WorkRequestResults.FAILURE

    def upload_artifacts(
        self, exec_directory: Path, *, execution_result: WorkRequestResults
    ) -> None:
        """Upload the ReprotestArtifact with the files and relationships."""
        if not self.debusine:
            raise AssertionError("self.debusine not set")

        assert self.dynamic_data is not None
        assert self.dynamic_data.parameter_summary is not None

        reprotest_artifact = ReprotestArtifact.create(
            reprotest_output=exec_directory / self.CAPTURE_OUTPUT_FILENAME,
            reproducible=execution_result == WorkRequestResults.SUCCESS,
            package=self.dynamic_data.parameter_summary,
        )

        uploaded = self.debusine.upload_artifact(
            reprotest_artifact,
            workspace=self.workspace_name,
            work_request=self.work_request_id,
        )

        assert self.dynamic_data is not None
        assert self.dynamic_data.source_artifact_id is not None
        self.debusine.relation_create(
            uploaded.id,
            self.dynamic_data.source_artifact_id,
            RelationType.RELATES_TO,
        )

Below are the main methods with some basic explanation.

In order for Debusine to discover the task, add "Reprotest" in the file debusine/tasks/__init__.py in the __all__ list.

Let’s explain the different methods of the Reprotest class:

build_dynamic_data method

The worker has no access to Debusine’s database. Lookups are all resolved before the task gets dispatched to a worker, so all it has to do is download the specified input artifacts.

build_dynamic_data method lookup the artifact, assert that is a valid category, extract the package name and version, and get the environment in which it will be executed.

The environment is needed to run the task (reprotest will run in a container using unshare, incus…).

    def build_dynamic_data(
        self, task_database: TaskDatabaseInterface
    ) -> ReprotestDynamicData:
        """Compute and return ReprotestDynamicData."""
        input_source_artifact = task_database.lookup_single_artifact(
            self.data.source_artifact
        )

        assert input_source_artifact is not None
        self.ensure_artifact_categories(
            configuration_key="input.source_artifact",
            category=input_source_artifact.category,
            expected=(
                ArtifactCategory.SOURCE_PACKAGE,
                ArtifactCategory.UPLOAD,
            ),
        )
        assert isinstance(
            input_source_artifact.data, (DebianSourcePackage, DebianUpload)
        )
        subject = get_source_package_name(input_source_artifact.data)
        version = get_source_package_version(input_source_artifact.data)

        assert self.data.environment is not None

        environment = self.get_environment(
            task_database,
            self.data.environment,
            default_category=CollectionCategory.ENVIRONMENTS,
        )

        return ReprotestDynamicData(
            source_artifact_id=input_source_artifact.id,
            subject=subject,
            parameter_summary=f"{subject}_{version}",
            environment_id=environment.id,
        )

get_input_artifacts_ids method

Used to list the task’s input artifacts in the web UI.

   def get_input_artifacts_ids(self) -> list[int]:
       """Return the list of input artifact IDs used by this task."""
       if not self.dynamic_data:
           return []

       assert self.dynamic_data.source_artifact_id is not None
       return [self.dynamic_data.source_artifact_id]

fetch_input method

Download the required artifacts on the worker.

    def fetch_input(self, destination: Path) -> bool:
        """Download the required artifacts."""
        assert self.dynamic_data

        artifact_id = self.dynamic_data.source_artifact_id
        assert artifact_id is not None
        self.fetch_artifact(artifact_id, destination)

        return True

configure_for_execution method

Install the packages needed by the task and set _reprotest_target, which is used to build the task’s command line.

   def configure_for_execution(self, download_directory: Path) -> bool:
       """
       Find a .dsc in download_directory.

       Install reprotest and other utilities used in _cmdline.
       Set self._reprotest_target to it.

       :param download_directory: where to search the files
       :return: True if valid files were found
       """
       self._prepare_executor_instance()

       if self.executor_instance is None:
           raise AssertionError("self.executor_instance cannot be None")

       self.run_executor_command(
           ["apt-get", "update"],
           log_filename="install.log",
           run_as_root=True,
           check=True,
       )
       self.run_executor_command(
           [
               "apt-get",
               "--yes",
               "--no-install-recommends",
               "install",
               "reprotest",
               "dpkg-dev",
               "devscripts",
               "equivs",
               "sudo",
           ],
           log_filename="install.log",
           run_as_root=True,
       )

       self._reprotest_target = utils.find_file_suffixes(
           download_directory, [".dsc"]
       )
       return True

_cmdline method

Return the command line to run the task.

In this case, and to keep the example simple, we will run reprotest directly in the worker’s executor VM/container, without giving it an isolated virtual server.

So, this command installs the build dependencies required by the package (so reprotest can build it) and runs reprotest itself.

   def _cmdline(self) -> list[str]:
       """
       Build the reprotest command line.

       Use configuration of self.data and self._reprotest_target.
       """
       target = self._reprotest_target
       assert target is not None

       cmd = [
           "bash",
           "-c",
           f"TMPDIR=/tmp ; cd /tmp ; dpkg-source -x {target} package/; "
           "cd package/ ; mk-build-deps ; apt-get install --yes ./*.deb ; "
           "rm *.deb ; "
           "reprotest --vary=-time,-user_group,-fileordering,-domain_host .",
       ]

       return cmd

Some reprotest variations are disabled. This is to keep the example simple with the set of packages to install and reprotest features.

_cmdline_as_root method

Since during the execution it’s needed to install packages, run it as root (in the container):

   @staticmethod
   def _cmdline_as_root() -> bool:
       r"""apt-get install --yes ./\*.deb must be run as root."""
       return True

task_result method

Task succeeded if a log is generated and the return code is 0.

    def task_result(
        self,
        returncode: int | None,
        execute_directory: Path,  # noqa: U100
    ) -> WorkRequestResults:
        """
        Evaluate task output and return success.

        For a successful run of reprotest:
        -must have the output file
        -exit code is 0

        :return: WorkRequestResults.SUCCESS or WorkRequestResults.FAILURE.
        """
        reprotest_file = execute_directory / self.CAPTURE_OUTPUT_FILENAME

        if reprotest_file.exists() and returncode == 0:
            return WorkRequestResults.SUCCESS

        return WorkRequestResults.FAILURE

upload_artifacts method

Create the ReprotestArtifact with the log and the reproducible boolean, upload it, and then add a relation between the ReprotestArtifact and the source package:

    def upload_artifacts(
        self, exec_directory: Path, *, execution_result: WorkRequestResults
    ) -> None:
        """Upload the ReprotestArtifact with the files and relationships."""
        if not self.debusine:
            raise AssertionError("self.debusine not set")

        assert self.dynamic_data is not None
        assert self.dynamic_data.parameter_summary is not None

        reprotest_artifact = ReprotestArtifact.create(
            reprotest_output=exec_directory / self.CAPTURE_OUTPUT_FILENAME,
            reproducible=execution_result == WorkRequestResults.SUCCESS,
            package=self.dynamic_data.parameter_summary,
        )

        uploaded = self.debusine.upload_artifact(
            reprotest_artifact,
            workspace=self.workspace_name,
            work_request=self.work_request_id,
        )

        assert self.dynamic_data is not None
        assert self.dynamic_data.source_artifact_id is not None
        self.debusine.relation_create(
            uploaded.id,
            self.dynamic_data.source_artifact_id,
            RelationType.RELATES_TO,
        )

Execution example

To run this task in a local Debusine (see steps to have it ready with an environment, permissions and users created) you can do:

$ python3 -m debusine.client artifact import-debian -w System http://deb.debian.org/debian/pool/main/h/hello/hello_2.10-5.dsc

(get the artifact ID from the output of that command)

The artifact can be seen in http://$DEBUSINE/debusine/System/artifact/$ARTIFACTID/.

Then create a reprotest.yaml:

$ cat <<EOF > reprotest.yaml
source_artifact: $ARTIFACT_ID
environment: "debian/match:codename=bookworm"
EOF

Instead of debian/match:codename=bookworm it could use the artifact ID.

Finally, create the work request to run the task:

$ python3 -m debusine.client create-work-request -w System reprotest --data reprotest.yaml

Using Debusine web you can see the work request, which should go to Running status, then Completed with Success or Failure (depending if reprotest could reproduce it or not). Clicking on the Output tab would have an artifact of type debian:reprotest with one file: the log. In the Metadata tab of the artifact it has Data: the package name and reproducible (true or false).

What is left to do?

This was a simple example of creating a task. Other things that could be done:

  • unit tests
  • documentation
  • configurable variations
  • running reprotest directly on the worker host, using the executor environment as a reprotest “virtual server”
  • in this specific example, the command line might be doing too many things that could maybe be done by other parts of the task, such as prepare_environment.
  • integrate it in a workflow so it’s easier to use (e.g. part of QaWorkflow)
  • extract more from the log than just pass/fail
  • display the output in a more useful way (implement an artifact specialized view)

by . Tags : debusine, planet-debian , 2123 Words.