Source code for debusine.signing.models

# Copyright 2024 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Signing service data models."""

import base64
import re
import subprocess
from collections.abc import Iterable
from enum import StrEnum
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import BinaryIO, Self, TYPE_CHECKING, TypeAlias

from django.conf import settings
from django.db import models

try:
    import pydantic.v1 as pydantic
except ImportError:
    import pydantic as pydantic  # type: ignore

from debusine.utils import calculate_hash

if TYPE_CHECKING:
    from django_stubs_ext.db.models import TypedModelMeta
    from nacl.public import PrivateKey, PublicKey
else:
    TypedModelMeta = object


[docs]class ProtectedKeyError(Exception): """An error handling a protected key."""
[docs]class ProtectedKeyStorage(StrEnum): """Possible values for ProtectedKey.storage.""" NACL = "nacl"
[docs]class ProtectedKeyNaCl(pydantic.BaseModel): """Data for a private key encrypted in software using NaCl.""" public_key: str encrypted: str
[docs] def __init__(self, public_key: str | bytes, encrypted: str | bytes) -> None: """Construct a :py:class:`ProtectedKeyNaCl`.""" return super().__init__( public_key=( public_key if isinstance(public_key, str) else base64.b64encode(public_key).decode() ), encrypted=( encrypted if isinstance(encrypted, str) else base64.b64encode(encrypted).decode() ), )
@property def public_key_bytes(self) -> bytes: """Return the base64-decoded public key.""" return base64.b64decode(self.public_key.encode()) @property def encrypted_bytes(self) -> bytes: """Return the base64-decoded encrypted data.""" return base64.b64decode(self.encrypted.encode())
[docs] @classmethod def encrypt(cls, public_key: "PublicKey", data: bytes) -> Self: """Encrypt data using a NaCl public key.""" from nacl.public import SealedBox try: encrypted = SealedBox(public_key).encrypt(data) except Exception as e: raise ProtectedKeyError(str(e)) from e return cls(public_key=bytes(public_key), encrypted=encrypted)
[docs] def decrypt(self, private_keys: Iterable["PrivateKey"]) -> bytes: """ Decrypt data using any of an iterable of NaCl private keys. This uses the private key that matches the stored public key, if one exists. This allows for key rotation. :raises ProtectedKeyError: if none of the given private keys match the stored public key. """ from nacl.exceptions import CryptoError from nacl.public import SealedBox public_key_bytes = self.public_key_bytes for private_key in private_keys: if bytes(private_key.public_key) == public_key_bytes: try: return SealedBox(private_key).decrypt(self.encrypted_bytes) except CryptoError as e: raise ProtectedKeyError(str(e)) from e raise ProtectedKeyError( "Key not encrypted using any of the given private keys" )
[docs]class ProtectedKey(pydantic.BaseModel): """A protected private key.""" storage: ProtectedKeyStorage # This should be a discriminated union, but we can't do that until we # have more than one implementation. (The next obvious choice would be # a handle for a private key in an HSM.) data: ProtectedKeyNaCl
def _openssl_generate( common_name: str, certificate_days: int, log_file: BinaryIO | None = None ) -> tuple[bytes, bytes]: """Generate a new private key and certificate using OpenSSL.""" with TemporaryDirectory() as tmp: key = Path(tmp, "tmp.key") certificate = Path(tmp, "tmp.crt") # openssl-req(1ssl) says: 'Special characters may be escaped by "\" # (backslash), whitespace is retained.' "/" and "=" are special # since they are used to delimit successive types and values. quoted_common_name = re.sub(r'([/=])', r'\\\1', common_name) subprocess.run( [ "openssl", "req", "-new", "-newkey", "rsa:2048", "-x509", "-subj", f"/CN={quoted_common_name}/", "-days", str(certificate_days), "-noenc", "-sha256", "-keyout", key, "-out", certificate, ], check=True, stdout=log_file, stderr=log_file, ) return key.read_bytes(), certificate.read_bytes() def _x509_fingerprint( certificate: bytes, log_file: BinaryIO | None = None ) -> str: """Get the SHA-256 fingerprint of an X.509 certificate.""" return ( subprocess.run( [ "openssl", "x509", "-inform", "PEM", "-noout", "-fingerprint", "-sha256", ], check=True, input=certificate, stdout=subprocess.PIPE, stderr=log_file, ) .stdout.decode() .rstrip("\n") # "sha256 Fingerprint=AA:BB:CC:..." .split("=", 1)[1] .replace(":", "") ) # TODO: The signing worker's systemd unit must use something along the lines # of "TemporaryFileSystem=/tmp:noswap /var/tmp:noswap" so that private keys # don't end up on disk. The "noswap" option was added in Linux 6.4, so we'd # either have to skip it and accept the possibility that private keys might # be swapped to disk under memory pressure, or use a backported kernel for # signing workers. def _sbsign( private_key: ProtectedKey, public_key: bytes, data_path: Path, signature_path: Path, log_file: BinaryIO | None = None, ): """Sign a UEFI boot image using `sbsign`.""" with TemporaryDirectory() as tmp: key = Path(tmp, "tmp.key") certificate = Path(tmp, "tmp.crt") assert private_key.storage == ProtectedKeyStorage.NACL key.write_bytes( private_key.data.decrypt(settings.DEBUSINE_SIGNING_PRIVATE_KEYS) ) certificate.write_bytes(public_key) subprocess.run( [ "sbsign", "--key", key, "--cert", certificate, "--output", signature_path, data_path, ], check=True, stdout=log_file, stderr=log_file, ) class _KeyPurpose(models.TextChoices): """Choices for Key.purpose.""" UEFI = "uefi", "UEFI Secure Boot"
[docs]class AuditLog(models.Model): """An audit log entry.""" objects = models.Manager["AuditLog"]()
[docs] class Event(models.TextChoices): GENERATE = "generate", "Generate" SIGN = "sign", "Sign"
purpose = models.CharField(max_length=7, choices=_KeyPurpose.choices) fingerprint = models.CharField(max_length=64) event = models.CharField(max_length=8, choices=Event.choices) data = models.JSONField() created_at = models.DateTimeField(auto_now_add=True) created_by_work_request_id = models.IntegerField() class Meta(TypedModelMeta): app_label = "signing" indexes = [ models.Index( "purpose", "fingerprint", name="%(app_label)s_%(class)s_key_idx", ), models.Index( "created_by_work_request_id", name="%(app_label)s_%(class)s_wr_idx", ), ]
[docs]class KeyManager(models.Manager["Key"]): """Manager for the Key model."""
[docs] def generate( self, purpose: _KeyPurpose, description: str, created_by_work_request_id: int, log_file: BinaryIO | None = None, ) -> "Key": """Generate a new key.""" match purpose: case _KeyPurpose.UEFI: # For now we just hardcode the certificate lifetime: 15 # years is enough to extend from the start of the # development period of a Debian release to the end of # Freexian's extended LTS period. days = 15 * 365 private_key, public_key = _openssl_generate( description, days, log_file=log_file ) fingerprint = _x509_fingerprint(public_key, log_file=log_file) case _ as unreachable: raise AssertionError(f"Unexpected purpose: {unreachable}") key = self.create( purpose=purpose, fingerprint=fingerprint, private_key=ProtectedKey( storage=ProtectedKeyStorage.NACL, data=ProtectedKeyNaCl.encrypt( settings.DEBUSINE_SIGNING_PRIVATE_KEYS[0].public_key, private_key, ), ).dict(), public_key=public_key, ) AuditLog.objects.create( event=AuditLog.Event.GENERATE, purpose=purpose, fingerprint=fingerprint, data={"description": description}, created_by_work_request_id=created_by_work_request_id, ) return key
[docs]class Key(models.Model): """A key managed by the debusine signing service.""" objects = KeyManager() Purpose: TypeAlias = _KeyPurpose purpose = models.CharField(max_length=7, choices=Purpose.choices) fingerprint = models.CharField(max_length=64) private_key = models.JSONField() public_key = models.BinaryField() created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(blank=True, null=True) class Meta(TypedModelMeta): app_label = "signing" constraints = [ models.UniqueConstraint( fields=("purpose", "fingerprint"), name="%(app_label)s_%(class)s_unique_purpose_fingerprint", ) ] @property def stored_private_key(self) -> ProtectedKey: """Access private_key as a pydantic model.""" return ProtectedKey(**self.private_key)
[docs] def sign( self, data_path: Path, signature_path: Path, created_by_work_request_id: int, log_file: BinaryIO | None = None, ) -> None: """ Sign data in `data_path` using this key. `signature_path` will be overwritten with the resulting signature. """ match self.purpose: case _KeyPurpose.UEFI: _sbsign( self.stored_private_key, self.public_key, data_path=data_path, signature_path=signature_path, log_file=log_file, ) case _ as unreachable: raise AssertionError(f"Unexpected purpose: {unreachable}") AuditLog.objects.create( event=AuditLog.Event.SIGN, purpose=self.purpose, fingerprint=self.fingerprint, data={"data_sha256": calculate_hash(data_path, "sha256").hex()}, created_by_work_request_id=created_by_work_request_id, )