Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2387,6 +2387,11 @@ def all_runs(self):
def latest_run(self):
return self.pipelineruns.first() if self.pipelineruns.exists() else None

@property
def latest_successful_run(self):
successful_runs = self.pipelineruns.filter(run_end_date__isnull=False, run_exitcode=0)
return successful_runs.first() if successful_runs.exists() else None

@property
def earliest_run(self):
return self.pipelineruns.earliest("run_start_date") if self.pipelineruns.exists() else None
Expand Down
71 changes: 64 additions & 7 deletions vulnerabilities/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,23 @@


import logging
from collections import Counter
from contextlib import suppress
from io import StringIO
from traceback import format_exc as traceback_format_exc

import django_rq
from redis.exceptions import ConnectionError
from rq import Worker

from vulnerabilities import models
from vulnerabilities.importer import Importer
from vulnerabilities.improver import Improver
from vulnerablecode.settings import RQ_QUEUES

logger = logging.getLogger(__name__)

default_queue = django_rq.get_queue("default")
high_queue = django_rq.get_queue("high")

queues = {
"default": django_rq.get_queue("default"),
"high": django_rq.get_queue("high"),
}
queues = {queue: django_rq.get_queue(queue) for queue in RQ_QUEUES.keys()}


def execute_pipeline(pipeline_id, run_id):
Expand Down Expand Up @@ -151,3 +150,61 @@ def dequeue_job(job_id):
for queue in queues.values():
if job_id in queue.jobs:
queue.remove(job_id)


def compute_queue_load_factor():
"""
Compute worker load per queue.

Load factor is the ratio of the total compute required to run all active pipelines
in a queue to the available worker capacity for that queue over a 24-hour period.
A value greater than 1 indicates that the number of workers is insufficient to
run all pipelines within the schedule.

Also compute the additional workers needed to balance each queue
"""
field = models.PipelineSchedule._meta.get_field("run_priority")
label_to_value = {label: value for value, label in field.choices}
total_compute_seconds_per_queue = {}
worker_per_queue = {}
load_per_queue = {}
seconds_in_24_hr = 86400

with suppress(ConnectionError):
redis_conn = django_rq.get_connection()
queue_names = [
w.queue_names()[0] for w in Worker.all(connection=redis_conn) if w.queue_names()
]
worker_per_queue = dict(Counter(queue_names))

for queue in RQ_QUEUES.keys():
total_compute_seconds_per_queue[queue] = sum(
(p.latest_successful_run.runtime / (p.run_interval / 24))
for p in models.PipelineSchedule.objects.filter(
is_active=True, run_priority=label_to_value[queue]
)
if p.latest_successful_run
)
if queue not in worker_per_queue:
worker_per_queue[queue] = 0

for queue_name, worker_count in worker_per_queue.items():
net_load_on_queue = "no_worker"
total_compute = total_compute_seconds_per_queue.get(queue_name, 0)
if total_compute == 0:
continue

unit_load_on_queue = total_compute / seconds_in_24_hr

num_of_worker_for_balanced_queue = round(unit_load_on_queue)
addition_worker_needed = max(num_of_worker_for_balanced_queue - worker_count, 0)

if worker_count > 0:
net_load_on_queue = unit_load_on_queue / worker_count

load_per_queue[queue_name] = {
"load_factor": net_load_on_queue,
"additional_worker": addition_worker_needed,
}

return dict(sorted(load_per_queue.items(), key=lambda x: x[0], reverse=True))
76 changes: 71 additions & 5 deletions vulnerabilities/templates/pipeline_dashboard.html
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
{% extends "base.html" %}

{% load utils %}

{% block title %}
Pipeline Dashboard
{% endblock %}
Expand All @@ -22,6 +24,18 @@
.column {
word-break: break-word;
}

.has-text-orange {
color: #ff8c42 !important;
}

.has-tooltip-orange::before {
background-color: #ff8c42 !important;
}

.has-tooltip-orange::after {
border-top-color: #ff8c42 !important;
}
</style>
{% endblock %}

Expand All @@ -48,11 +62,63 @@ <h1>Pipeline Dashboard</h1>
</form>

<div class="box">
<div class="column has-text-right">
<p class="has-text-weight-semibold">
{{ active_pipeline_count|default:0 }} active pipeline{{ active_pipeline_count|default:0|pluralize }},
{{ disabled_pipeline_count|default:0 }} disabled pipeline{{ disabled_pipeline_count|default:0|pluralize }}
</p>
<div class="columns is-multiline is-vcentered mb-0">
<div class="column is-half has-text-left">
{% if load_per_queue %}
<p class="ml-3">
<span class="has-text-weight-bold has-text-black is-size-6 has-tooltip-arrow has-tooltip-multiline"
data-tooltip="Load factor is the ratio of the total compute required to run all active pipelines
in a queue to the available worker capacity for that queue over a 24-hour period.
A value greater than 1 indicates that the number of workers is insufficient to
run all pipelines within the schedule.">
Load Factor:
</span>
{% for queue_name, values in load_per_queue.items %}

<span class="has-text-weight-bold is-size-6 has-tooltip-arrow has-tooltip-multiline"
data-tooltip="{{ queue_name|capfirst }} priority pipeline queue.">
{{ queue_name| capfirst }}
</span>
{% with load_factor=values|get_item:"load_factor" additional=values|get_item:"additional_worker" %}
{% if load_factor == "no_worker" %}
<span class="has-text-weight-bold is-size-6 has-text-danger has-tooltip-arrow has-tooltip-multiline has-tooltip-danger"
data-tooltip="All workers in the {{ queue_name }} queue are down. Please run {{ additional }}
worker{{ additional|pluralize }} for the {{ queue_name }} queue.">
<span class="icon"><i class="fa fa-exclamation-triangle"></i></span>
</span>
{% elif load_factor < 1 %}
<span class="has-text-weight-bold is-size-6 has-text-success has-tooltip-arrow has-tooltip-multiline has-tooltip-success"
data-tooltip="{{ queue_name|capfirst }} queue perfectly balanced.">
{{ load_factor|floatformat:2 }}
<span class="icon"><i class="fa fa-check-circle"></i></span>
</span>
{% elif load_factor < 1.6 %}
<span class="has-text-weight-bold is-size-6 has-text-orange has-tooltip-arrow has-tooltip-multiline has-tooltip-orange"
data-tooltip="Consider adding {{ additional }} additional worker{{ additional|pluralize }} to the {{ queue_name }} queue.">
{{ load_factor|floatformat:2 }}
<span class="icon"><i class="fa fa-info-circle"></i></span>
</span>
{% else %}
<span class="has-text-weight-bold is-size-6 has-text-danger has-tooltip-arrow has-tooltip-multiline has-tooltip-danger"
data-tooltip="Consider adding {{ additional }} additional worker{{ additional|pluralize }} to the {{ queue_name }} queue.">
{{ load_factor|floatformat:2 }}
<span class="icon"><i class="fa fa-exclamation-circle"></i></span>
</span>
{% endif %}
{% endwith %}

{% if not forloop.last %} &bull; {% endif %}

{% endfor %}
</p>
{% endif %}
</div>
<div class="column is-half has-text-right">
<p class="has-text-grey-dark has-text-weight-semibold mr-3">
{{ active_pipeline_count|default:0 }} active pipeline{{ active_pipeline_count|default:0|pluralize }},
{{ disabled_pipeline_count|default:0 }} disabled pipeline{{ disabled_pipeline_count|default:0|pluralize }}
</p>
</div>
</div>
<table class="table is-striped is-hoverable is-fullwidth">
<thead>
Expand Down
11 changes: 11 additions & 0 deletions vulnerabilities/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from cvss.exceptions import CVSS4MalformedError
from django.contrib import messages
from django.contrib.auth.views import LoginView
from django.core.cache import cache
from django.core.exceptions import ValidationError
from django.core.mail import send_mail
from django.db.models import Exists
Expand Down Expand Up @@ -48,6 +49,7 @@
from vulnerabilities.pipelines.v2_importers.epss_importer_v2 import EPSSImporterPipeline
from vulnerabilities.severity_systems import EPSS
from vulnerabilities.severity_systems import SCORING_SYSTEMS
from vulnerabilities.tasks import compute_queue_load_factor
from vulnerabilities.throttling import AnonUserUIThrottle
from vulnerabilities.utils import TYPES_WITH_MULTIPLE_IMPORTERS
from vulnerabilities.utils import get_advisories_from_groups
Expand All @@ -57,6 +59,8 @@

PAGE_SIZE = 10

CACHE_TIMEOUT = 60 * 5


class VulnerableCodeView(View):
"""
Expand Down Expand Up @@ -961,6 +965,13 @@ def get_queryset(self):

def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
load_per_queue = cache.get("load_per_queue")

if load_per_queue is None:
load_per_queue = compute_queue_load_factor()
cache.set("load_per_queue", load_per_queue, CACHE_TIMEOUT)

context["load_per_queue"] = load_per_queue
context["active_pipeline_count"] = PipelineSchedule.objects.filter(is_active=True).count()
context["disabled_pipeline_count"] = PipelineSchedule.objects.filter(
is_active=False
Expand Down