Writing a new worker task for Debusine

Tutorial on how to write a worker task for Debusine

Debusine is a tool designed for Debian developers and Operating System developers in general. You can try out Debusine on debusine.debian.net, and follow its development on salsa.debian.org.

This post describes how to write a new worker task for Debusine. It can be used to add tasks to a self-hosted Debusine instance, or to submit to the Debusine project new tasks to add new capabilities to Debusine.

Tasks are Debusine’s unit of work, and the lower-level pieces of Debusine workflows. Examples of tasks are Sbuild, Lintian, Debdiff (see the available tasks).

This post will document the steps to write a new basic worker task. The example will add a worker task that runs reprotest and creates an artifact of the new type ReprotestArtifact with the reprotest log.

Tasks are usually used by workflows. Workflows solve high-level goals by creating and orchestrating different tasks (e.g. a Sbuild workflow would create different Sbuild tasks, one for each architecture).

Overview of tasks

A task usually does the following:

  • It receives structured data defining its input artifacts and configuration
  • Input artifacts are downloaded
  • A process is run by the worker (e.g. lintian, debdiff, etc.). In this blog post, it will run reprotest
  • The output (files, logs, exit code, etc.) is analyzed, artifacts and relations might be generated, and the work request is marked as completed, either with Success or Failure

If you want to follow the tutorial and add the Reprotest task, your Debusine development instance should have at least one worker, one user, a debusine client set up, and permissions for the client to create tasks. All of this can be setup following the steps in the Contribute section of the documentation.

This blog post shows a functional Reprotest task. This task is not currently part of Debusine. The Reprotest task implementation is simplified (no error handling, unit tests, specific view, docs, some shortcuts in the environment preparation, etc.). At some point, in Debusine, we might add a debrebuild task which is based on buildinfo files and uses snapshot.debian.org to recreate the binary packages.

Defining the inputs of the task

The input of the reprotest task will be a source artifact (a Debian source package). We model the input with pydantic in debusine/tasks/models.py:

class ReprotestData(BaseTaskDataWithExecutor):
    """Data for Reprotest task."""

    source_artifact: LookupSingle

The ReprotestData is what the user will input. A LookupSingle is a lookup that resolves to a single artifact.

We would also have configuration for the desired variations to test, but we have left that out of this example for simplicity. Configuring variations is left as an exercise for the reader.

Since ReprotestData is a subclass of BaseTaskDataWithExecutor it also contains environment where the user can specify in which environment the task will run. The environment is an artifact with a Debian image.

Add the new Reprotest artifact data class

In order for the reprotest task to create a new Artifact of the type DebianReprotest with the log and output metadata: add the new category to ArtifactCategory in debusine/artifacts/models.py:

    REPROTEST = "debian:reprotest"

In the same file add the DebianReprotest class:

class DebianReprotest(ArtifactData):
    """Data for debian:reprotest artifacts."""

    reproducible: bool | None = None

    def get_label(self) -> str:
        """Return a short human-readable label for the artifact."""
        return "reprotest analysis"

It could also include the package name or version.

In the same file, map the REPROTEST category to its data model:

#: Index ArtifactData model classes by category
ARTIFACT_DATA_MODELS_BY_CATEGORY: dict[ArtifactCategory, type[ArtifactData]] = {
    ArtifactCategory.REPROTEST: DebianReprotest,
}

(DebianReprotest must be defined above the ARTIFACT_DATA_MODELS_BY_CATEGORY dictionary)

In order to have the category listed in the work request output artifacts table, edit the file debusine/db/models/artifacts.py: In ARTIFACT_CATEGORY_ICON_NAMES add ArtifactCategory.REPROTEST: "folder", and in ARTIFACT_CATEGORY_SHORT_NAMES add ArtifactCategory.REPROTEST: "reprotest",.

Create the new Task class

In debusine/tasks/ create a new file reprotest.py.

reprotest.py
# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Task to use reprotest in debusine."""

from pathlib import Path
from typing import Any, override

from debusine import utils
from debusine.artifacts.local_artifact import ReprotestArtifact
from debusine.artifacts.models import (
    ArtifactCategory,
    DebianSourcePackage,
    DebianUpload,
    WorkRequestResults,
    get_source_package_name,
    get_source_package_version,
)
from debusine.client.models import RelationType
from debusine.tasks import BaseTaskWithExecutor, RunCommandTask, inputs
from debusine.tasks.inputs import Stage
from debusine.tasks.models import (
    BaseDynamicTaskDataWithExecutor,
    ReprotestData,
)


class Reprotest(
    RunCommandTask[ReprotestData, BaseDynamicTaskDataWithExecutor],
    BaseTaskWithExecutor[ReprotestData, BaseDynamicTaskDataWithExecutor],
):
    """Task to use reprotest in debusine."""

    TASK_VERSION = 1

    CAPTURE_OUTPUT_FILENAME = "reprotest.log"

    # Resolve environment from task data
    environment = inputs.EnvironmentInput(stage=Stage.PENDING)

    # Resolve source_artifact from task data into a specific artifact
    source_artifact = inputs.SingleInput(
        field="source_artifact",
        categories=(
            ArtifactCategory.SOURCE_PACKAGE,
            ArtifactCategory.UPLOAD,
        ),
        stage=Stage.PENDING,
    )

    def __init__(
        self,
        task_data: dict[str, Any],
        dynamic_task_data: dict[str, Any] | None = None,
    ) -> None:
        """Initialize object."""
        super().__init__(task_data, dynamic_task_data)

        self._reprotest_target: Path | None = None

    @override
    def get_subject(self) -> str | None:
        """Return the subject used to look up task configuration."""
        assert isinstance(
            self.source_artifact.data, (DebianSourcePackage, DebianUpload)
        )
        return get_source_package_name(self.source_artifact.data)

    def compute_dynamic_data(self) -> BaseDynamicTaskDataWithExecutor:
        """Compute and return BaseDynamicTaskData."""
        source_artifact = self.source_artifact
        assert isinstance(
            source_artifact.data, (DebianSourcePackage, DebianUpload)
        )
        package_name = get_source_package_name(source_artifact.data)
        version = get_source_package_version(source_artifact.data)

        return BaseDynamicTaskDataWithExecutor(
            environment_id=self.environment.artifact_id,
            subject=self.get_subject(),
            parameter_summary=f"{package_name}_{version}",
        )

    def fetch_input(self, destination: Path) -> bool:
        """Download the required artifacts."""
        assert self.dynamic_data

        artifact_id = self.source_artifact.artifact_id
        self.fetch_artifact(artifact_id, destination)

        return True

    def configure_for_execution(self, download_directory: Path) -> bool:
        """
        Find a .dsc in download_directory.

        Install reprotest and other utilities used in _cmdline.
        Set self._reprotest_target to it.

        :param download_directory: where to search the files
        :return: True if valid files were found
        """
        self._prepare_executor_instance()

        if self.executor_instance is None:
            raise AssertionError("self.executor_instance cannot be None")

        self.run_executor_command(
            ["apt-get", "update"],
            log_filename="install.log",
            run_as_root=True,
            check=True,
        )
        self.run_executor_command(
            [
                "apt-get",
                "--yes",
                "--no-install-recommends",
                "install",
                "reprotest",
                "dpkg-dev",
                "devscripts",
                "equivs",
                "sudo",
            ],
            log_filename="install.log",
            run_as_root=True,
        )

        self._reprotest_target = utils.find_file_suffixes(
            download_directory, [".dsc"]
        )
        return True

    def _cmdline(self) -> list[str]:
        """
        Build the reprotest command line.

        Use configuration of self.data and self._reprotest_target.
        """
        target = self._reprotest_target
        assert target is not None

        cmd = [
            "bash",
            "-c",
            f"TMPDIR=/tmp ; cd /tmp ; dpkg-source -x {target} package/; "
            "cd package/ ; mk-build-deps ; apt-get install --yes ./*.deb ; "
            "rm *.deb ; "
            "reprotest --vary=-time,-user_group,-fileordering,-domain_host .",
        ]

        return cmd

    @staticmethod
    def _cmdline_as_root() -> bool:
        r"""apt-get install --yes ./\*.deb must be run as root."""
        return True

    def task_result(
        self,
        returncode: int | None,
        execute_directory: Path,  # noqa: U100
    ) -> WorkRequestResults:
        """
        Evaluate task output and return success.

        For a successful run of reprotest:
        -must have the output file
        -exit code is 0

        :return: WorkRequestResults.SUCCESS or WorkRequestResults.FAILURE.
        """
        reprotest_file = execute_directory / self.CAPTURE_OUTPUT_FILENAME

        if reprotest_file.exists() and returncode == 0:
            return WorkRequestResults.SUCCESS

        return WorkRequestResults.FAILURE

    def upload_artifacts(
        self, exec_directory: Path, *, execution_result: WorkRequestResults
    ) -> None:
        """Upload the ReprotestArtifact with the files and relationships."""
        if not self.debusine:
            raise AssertionError("self.debusine not set")

        assert self.dynamic_data is not None
        assert self.dynamic_data.parameter_summary is not None

        reprotest_artifact = ReprotestArtifact.create(
            reprotest_output=exec_directory / self.CAPTURE_OUTPUT_FILENAME,
            reproducible=execution_result == WorkRequestResults.SUCCESS,
        )

        uploaded = self.debusine.upload_artifact(
            reprotest_artifact,
            workspace=self.workspace_name,
            work_request=self.work_request_id,
        )

        assert self.dynamic_data is not None
        self.debusine.relation_create(
            uploaded.id,
            self.source_artifact.artifact_id,
            RelationType.RELATES_TO,
        )

Below are the main parts with some basic explanation.

In order for Debusine to discover the task, in the file debusine/tasks/__init__.py add from debusine.tasks.reprotest import Reprotest; and , then in the list __all__ add "Reprotest".

Let’s explain the different parts of the Reprotest class:

Resolving inputs: input fields and dynamic data

The worker has no access to Debusine’s database. Lookups are all resolved before the task gets dispatched to a worker, so all it has to do is download the specified input artifacts.

The resolution is performed automatically by task input fields, which make the result of the resolution available as attributes of the task object.

Reprotest defines two members as input fields: environment and source_artifact, which resolve into InputArtifactSingle structures.

The get_subject method is used to compute the subject for looking up possible task configuration entries for this task, by representing the significant aspect of the task’s input: in this case, the source package name.

The compute_dynamic_data method is used when the task gets ready to be run, to perform the final consistency checks on the input fields and populate the rest of the dynamic task data. This structure holds information useful for displaying the task in the UI (like parameter_summary), for inspecting the lifetime of the task (subject, configuration_context), for statistics (runtime_context), and used to hold the IDs of artifacts before task input fields were introduced.

    def compute_dynamic_data(self) -> BaseDynamicTaskDataWithExecutor:
        """Compute and return BaseDynamicTaskData."""
        source_artifact = self.source_artifact
        assert isinstance(
            source_artifact.data, (DebianSourcePackage, DebianUpload)
        )
        package_name = get_source_package_name(source_artifact.data)
        version = get_source_package_version(source_artifact.data)

        return BaseDynamicTaskDataWithExecutor(
            environment_id=self.environment.artifact_id,
            subject=self.get_subject(),
            parameter_summary=f"{package_name}_{version}",
        )

fetch_input method

Download the required artifacts on the worker.

    def fetch_input(self, destination: Path) -> bool:
        """Download the required artifacts."""
        assert self.dynamic_data

        artifact_id = self.source_artifact.artifact_id
        self.fetch_artifact(artifact_id, destination)

        return True

configure_for_execution method

Install the packages needed by the task and set _reprotest_target, which is used to build the task’s command line.

    def configure_for_execution(self, download_directory: Path) -> bool:
        """
        Find a .dsc in download_directory.

        Install reprotest and other utilities used in _cmdline.
        Set self._reprotest_target to it.

        :param download_directory: where to search the files
        :return: True if valid files were found
        """
        self._prepare_executor_instance()

        if self.executor_instance is None:
            raise AssertionError("self.executor_instance cannot be None")

        self.run_executor_command(
            ["apt-get", "update"],
            log_filename="install.log",
            run_as_root=True,
            check=True,
        )
        self.run_executor_command(
            [
                "apt-get",
                "--yes",
                "--no-install-recommends",
                "install",
                "reprotest",
                "dpkg-dev",
                "devscripts",
                "equivs",
                "sudo",
            ],
            log_filename="install.log",
            run_as_root=True,
        )

        self._reprotest_target = utils.find_file_suffixes(
            download_directory, [".dsc"]
        )
        return True

_cmdline method

Return the command line to run the task.

In this case, and to keep the example simple, we will run reprotest directly in the worker’s executor VM/container, without giving it an isolated virtual server.

So, this command installs the build dependencies required by the package (so reprotest can build it) and runs reprotest itself.

    def _cmdline(self) -> list[str]:
        """
        Build the reprotest command line.

        Use configuration of self.data and self._reprotest_target.
        """
        target = self._reprotest_target
        assert target is not None

        cmd = [
            "bash",
            "-c",
            f"TMPDIR=/tmp ; cd /tmp ; dpkg-source -x {target} package/; "
            "cd package/ ; mk-build-deps ; apt-get install --yes ./*.deb ; "
            "rm *.deb ; "
            "reprotest --vary=-time,-user_group,-fileordering,-domain_host .",
        ]

        return cmd

Some reprotest variations are disabled. This is to keep the example simple with the set of packages to install and reprotest features.

_cmdline_as_root method

Since during the execution it’s needed to install packages, run it as root (in the container):

    @staticmethod
    def _cmdline_as_root() -> bool:
        r"""apt-get install --yes ./\*.deb must be run as root."""
        return True

task_result method

Task succeeded if a log is generated and the return code is 0.

    def task_result(
        self,
        returncode: int | None,
        execute_directory: Path,  # noqa: U100
    ) -> WorkRequestResults:
        """
        Evaluate task output and return success.

        For a successful run of reprotest:
        -must have the output file
        -exit code is 0

        :return: WorkRequestResults.SUCCESS or WorkRequestResults.FAILURE.
        """
        reprotest_file = execute_directory / self.CAPTURE_OUTPUT_FILENAME

        if reprotest_file.exists() and returncode == 0:
            return WorkRequestResults.SUCCESS

        return WorkRequestResults.FAILURE

upload_artifacts method

Create the ReprotestArtifact with the log and the reproducible boolean, upload it, and then add a relation between the ReprotestArtifact and the source package:

    def upload_artifacts(
        self, exec_directory: Path, *, execution_result: WorkRequestResults
    ) -> None:
        """Upload the ReprotestArtifact with the files and relationships."""
        if not self.debusine:
            raise AssertionError("self.debusine not set")

        assert self.dynamic_data is not None
        assert self.dynamic_data.parameter_summary is not None

        reprotest_artifact = ReprotestArtifact.create(
            reprotest_output=exec_directory / self.CAPTURE_OUTPUT_FILENAME,
            reproducible=execution_result == WorkRequestResults.SUCCESS,
        )

        uploaded = self.debusine.upload_artifact(
            reprotest_artifact,
            workspace=self.workspace_name,
            work_request=self.work_request_id,
        )

        assert self.dynamic_data is not None
        self.debusine.relation_create(
            uploaded.id,
            self.source_artifact.artifact_id,
            RelationType.RELATES_TO,
        )

Execution example

To run this task in a local Debusine (see steps to have it ready with an environment, permissions and users created) you can do:

$ python3 -m debusine.client artifact import-debian -w System http://deb.debian.org/debian/pool/main/h/hello/hello_2.10-5.dsc

(get the artifact ID from the output of that command)

The artifact can be seen in http://$DEBUSINE/debusine/System/artifact/$ARTIFACTID/.

Then create a reprotest.yaml:

$ cat <<EOF > reprotest.yaml
source_artifact: $ARTIFACT_ID
environment: "debian/match:codename=bookworm"
EOF

Instead of debian/match:codename=bookworm it could use the artifact ID.

Finally, create the work request to run the task:

$ python3 -m debusine.client create-work-request -w System reprotest --data reprotest.yaml

Using Debusine web you can see the work request, which should go to Running status, then Completed with Success or Failure (depending if reprotest could reproduce it or not). Clicking on the Output tab would have an artifact of type debian:reprotest with one file: the log. In the Metadata tab of the artifact it has Data: the package name and reproducible (true or false).

What is left to do?

This was a simple example of creating a task. Other things that could be done:

  • unit tests
  • documentation
  • configurable variations
  • running reprotest directly on the worker host, using the executor environment as a reprotest “virtual server”
  • in this specific example, the command line might be doing too many things that could maybe be done by other parts of the task, such as prepare_environment.
  • integrate it in a workflow so it’s easier to use (e.g. part of QaWorkflow)
  • extract more from the log than just pass/fail
  • display the output in a more useful way (implement an artifact specialized view)

by . Tags : debusine, planet-debian , 2151 Words.