# Copyright 2019, 2021-2024 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Data models for db workers."""
import copy
import hashlib
from typing import Any, Optional, TYPE_CHECKING
from django.db import IntegrityError, models, transaction
from django.db.models import (
CheckConstraint,
Count,
F,
JSONField,
Q,
QuerySet,
)
from django.utils import timezone
from django.utils.text import slugify
from debusine.db.models.auth import Token
from debusine.tasks.models import WorkerType
if TYPE_CHECKING:
from django_stubs_ext.db.models import TypedModelMeta
else:
TypedModelMeta = object
class WorkerManager(models.Manager["Worker"]):
"""Manager for Worker model."""
def connected(self) -> QuerySet["Worker"]:
"""Return connected workers."""
return Worker.objects.filter(connected_at__isnull=False).order_by(
'connected_at'
)
def waiting_for_work_request(self) -> QuerySet["Worker"]:
"""
Return workers that can be assigned a new work request.
The workers with fewer associated pending or running work requests
than their concurrency level could take more work right now and are
thus waiting for a work request.
Worker's token must be enabled.
"""
# Import here to prevent circular imports
from debusine.db.models.work_requests import WorkRequest
running_work_request_count = Count(
'assigned_work_requests',
filter=Q(
assigned_work_requests__status__in=[
WorkRequest.Statuses.RUNNING,
WorkRequest.Statuses.PENDING,
]
),
)
workers = (
Worker.objects.filter(connected_at__isnull=False)
.order_by('connected_at')
.annotate(count_running=running_work_request_count)
.filter(count_running__lt=F("concurrency"))
.filter(Q(worker_type=WorkerType.CELERY) | Q(token__enabled=True))
)
return workers
@staticmethod
def _generate_unique_name(name: str, counter: int) -> str:
"""Return name slugified adding "-counter" if counter != 1."""
new_name = slugify(name.replace('.', '-'))
if counter != 1:
new_name += f'-{counter}'
return new_name
@classmethod
def create_with_fqdn(
cls,
fqdn: str,
token: Token,
worker_type: WorkerType = WorkerType.EXTERNAL,
) -> "Worker":
"""Return a new Worker with its name based on fqdn, with token."""
counter = 1
while True:
name = cls._generate_unique_name(fqdn, counter)
try:
with transaction.atomic():
return Worker.objects.create(
name=name,
token=token,
worker_type=worker_type,
registered_at=timezone.now(),
)
except IntegrityError:
counter += 1
@classmethod
def get_or_create_celery(cls) -> "Worker":
"""Return a new Worker representing the Celery task queue."""
try:
return Worker.objects.get(
name="celery", worker_type=WorkerType.CELERY
)
except Worker.DoesNotExist:
return Worker.objects.create(
name="celery",
worker_type=WorkerType.CELERY,
registered_at=timezone.now(),
)
def get_worker_by_token_key_or_none(
self, token_key: str
) -> Optional["Worker"]:
"""Return a Worker identified by its associated secret token."""
try:
token_hash = hashlib.sha256(token_key.encode()).hexdigest()
return Worker.objects.get(token__hash=token_hash)
except Worker.DoesNotExist:
return None
def get_worker_or_none(self, worker_name: str) -> Optional["Worker"]:
"""Return the worker with worker_name or None."""
try:
return self.get(name=worker_name)
except Worker.DoesNotExist:
return None
[docs]class Worker(models.Model):
"""Database model of a worker."""
name = models.SlugField(
unique=True,
help_text='Human readable name of the worker based on the FQDN',
)
registered_at = models.DateTimeField()
connected_at = models.DateTimeField(blank=True, null=True)
# This is the token used by the Worker to authenticate
# Users have their own tokens - this is specific to a single worker.
token = models.OneToOneField(
Token, null=True, on_delete=models.PROTECT, related_name="worker"
)
static_metadata = JSONField(default=dict, blank=True)
dynamic_metadata = JSONField(default=dict, blank=True)
dynamic_metadata_updated_at = models.DateTimeField(blank=True, null=True)
worker_type = models.CharField(
max_length=8,
choices=WorkerType.choices,
default=WorkerType.EXTERNAL,
editable=False,
)
concurrency = models.PositiveIntegerField(
default=1,
help_text="Number of tasks this worker can run simultaneously",
)
class Meta(TypedModelMeta):
constraints = [
# Non-Celery workers must have a token.
CheckConstraint(
name="%(app_label)s_%(class)s_celery_or_token",
check=Q(worker_type=WorkerType.CELERY) | Q(token__isnull=False),
)
]
[docs] def mark_disconnected(self) -> None:
"""Update and save relevant Worker fields after disconnecting."""
self.connected_at = None
self.save()
[docs] def mark_connected(self) -> None:
"""Update and save relevant Worker fields after connecting."""
self.connected_at = timezone.now()
self.save()
[docs] def connected(self) -> bool:
"""Return True if the Worker is connected."""
return self.connected_at is not None
[docs] def is_busy(self) -> bool:
"""
Return True if the Worker is busy with work requests.
A Worker is busy if it has as many running or pending work requests
as its concurrency level.
"""
# Import here to prevent circular imports
from debusine.db.models.work_requests import WorkRequest
return (
WorkRequest.objects.running(worker=self)
| WorkRequest.objects.pending(worker=self)
).count() >= self.concurrency
def __str__(self) -> str:
"""Return the id and name of the Worker."""
return f"Id: {self.id} Name: {self.name}"
objects = WorkerManager()