Een afbeelding van een gegevensstroom die samenkomt.

Monitoring Job Queues with Streaming Analytics

How to create proper insights into a distributed job queue using streaming analytics.

Technologies
Apache Kafka
Event-driven Architecture
Streaming Analytics

Introduction

A distributed system is one in which the failure of a computer you didn’t even know existed can render your own computer unusable.” – Leslie Lamport

Managing a distributed system is hard. Two-phase commits, silent failures and phased deployment strategies are just a subset of challenges teams have to face. Unfortunately, scaling vertically has its limits, so many companies do inevitably end up with a distributed system.

A popular way to scale horizontally is to introduce a job queue and start application workers. This article discusses how one can add crucial insights into a job queue. In particular, we discuss how to monitor performance and keep track of causality for jobs using streaming analytics.

Job Queues

tests Figure 1: schematic of a job queue.

Background on Distributed Systems

For the sake of this post, we assume that a distributed system is a set of instances (nodes) that do work and can communicate through messages. Such a system might serve web pages, provide API’s or process data. It can do all this in parallel as well. Examples of distributed systems are:

  • A CDN: the workload of a node either being redirecting requests or serving content.
  • Most databases are built with a distributed architecture in mind, such as Apache Kafka and Cassandra. This helps reduce the risk of losing data by replication and distributes query load for maximum performance.
  • Many data analytics pipelines have a separation of compute and storage and leverage cloud computing, so you only pay for what you process.
  • Scaling horizontally can be done by using a job queue. This makes it easy to add workers and distribute the work load evenly.

In recent years, a lot of cloud native tooling has become available to help companies set up their scalable software solution professionally. In terms of writing software, Kubernetes and Docker have made it really easy to deploy a distributed system in the cloud. In this day and age, not losing sight of communication flows in your system has become a big challenge.

A common approach to distribute the load of a system is to use the last approach: use a job queue. Popular backend application frameworks such as Laravel and Django make it easy to implement one. A job defines a unit of work that can be dispatched and handled by a worker. A worker could be an application process on a different machine, but also your favorite cloud native tooling (think AWS Lambda and Google’s Cloud Functions). You can use jobs to asynchronously send emails to users, update metrics or transform a batch of data.

Workflows

tests Figure 2: a Github Action workflow.

Workers can often dispatch new jobs while running another job. This creates a chain of jobs performing a workflow. A well-known example of this are the workflows defined in a Github Action, see Figure 2.

Finishing the ”Build the app”-job triggers both the “Check code”-job and the “Run test suite”-job, etc. Another popular tool that uses workflows is Apache Airflow, see Figure 3.

tests Figure 3: Apache Airflow DAG example. [Source]

Apache Airflow helps you build data pipelines using what they call DAG’s, which stands for directed acyclic graph.

Simply put, a directed acyclic graph is a set of nodes and arrows connecting the nodes, that has no cycle, i.e. the workflow does not contain a recursive loop making it run infinitely.

As an example: when you add an order to a booking system, you expect it to send a confirmation email, create an invoice, transfer the order to a shipping service, and perhaps update some analytics on the system.

Job and Workflow Metrics

Now that we know the fundamentals of a job queue, what are the most important metrics to keep track of? There are two main categories and each answers their own set of questions. the first one is what we cal Job statistics, which focuses on metrics of one job, such as:

  • The average runtimes per job type
  • The average queueing time per queue
  • The longest jobs per job type
  • The total runtimes and queueing times of the system.
  • The number of jobs in the queue
  • Etc.

The other one is Job causality, also known as tracing, which focuses on metrics and insights into workflows:

  • How many new jobs were triggered inside a job
  • The average time it takes to complete a workflow
  • The amount of workflows completed each day
  • Etc.

Such insights would allow organizations to monitor their system on a granular level and optimize workflows across their business. In the order example, you also want to know which order triggered the creation of which invoice, but be able to create invoices manually at times as well. Moreover, once customers start complaining about orders not arriving, you are often too late. Ideally, you want to spot clogs in the pipeline as soon as they occur.

In addition to such a live view, you want to see each job’s impact on overall performance and trace jobs back to the original job that dispatched them. And you want to see it in real-time.

A lot of tooling around message streaming and data workflows (such as Apache Airflow) provide insights out of the box. However, many job queues are not designed for a high volume stream of messages to provide real-time analytics and workflow analysis.

Streaming Analytics

How do you add these insights to the system? We can deduce a reasonable set of requirements for such a solution:

  • Low overhead, since this affects all jobs in an application, spanning multiple services
  • Application-level transparency, to keep application level requirements to a minimum
  • Scalability, as data is collected from all nodes and services in a cluster

This is often how you want your analytics to work: with minimal impact on crucial systems.

One technique that fits into these requirements is event streaming. Storing events is a popular way to create granular insight into a system since it shows you the changes in your application, not only the current state (as a relational database often depicts). Storage is cheap these days, and computers are fast. Hence, storing and querying all relevant changes to a system is feasible. This also enables you to create real-time insights: every event can be processed immediately to trigger new jobs, update a view or update a statistic.

For the insights into job queues, it is also possible to use event streaming. You can keep track of counters, relations and aggregates of jobs that update with every event coming in. This technique is better known as streaming analytics. Streaming analytics requires you to rethink what it means to do analytics, since you do not have all data available at once.

Example Implementation

But how to implement such streaming analytics in our use-case? To keep track of both job statistics and causality, we need to keep track of several events that happen in a system with a job queue. Three important steps in the life cycle of a queued job are: when it is dispatched, when it starts and when it ends. From these we can easily distill the queueing time (start_time - dispatch_time) and runtime (end_time - start_time). Hence, throwing events at these stages in the system allows us to keep track of all of the job statistics we defined earlier.

However, this does not solve the issue of tracking the causality between jobs. For that issue, we need to add three id’s per job to the data stream:

  • job_id: unique identifier for a job
  • parent_id: the job_id for the parent job that dispatched the current job
  • trace_id: identifier shared among jobs connected through parent-child relations

The last two identifiers allow us to gather all jobs that were triggered from a single source. We could group by all jobs that were triggered through a client placing an order, for example.

Note: Jobs are often defined as either a class implementing some interface or a function in the code. But there usually is some unique name associated with every job that can be run. This could for example be “SendEmail”, where each “SendEmail” job has an email address and body as parameters. We call “SendEmail” the job type. A job type defines a piece of code you can monitor and optimize if needed, so it naturally defines the language of the insights.

Once this data is determined, it is time to connect to a database that supports event streams, such as Apache Kafka or RabbitMQ. An example log that includes the required data would have the following fields:

1
2
3
4
5
6
7
8
9
{
  "job_id": "uuid",
  "parent_job_id": "uuid",
  "trace_id": "uuid",
  "job_type": "DoSomethingRelevantInDomain",
  "dispatch_time": "2022-10-10T10:10:10.10000",
  "start_time": null,  // (empty for a dispatch log)
  "end_time": null  // (empty for start and dispatch log)
}

A job that triggers a pipeline of other jobs is collected into a trace. All jobs in a trace (one run of a workflow) have the same trace_id. The parent_job_id points to the job that dispatched the current job.

Consuming the Data

Of course, producing the data is not enough. We have to consume and transform the data to make it available for users. (In a later blog post, we go into more detail about writing a dispatcher and consumer for these kinds of events, including simple ways to store such data.)

A simplistic overview of the added system would be as follows:

tests Figure 4: A schematic of a system where workers (w) send events to Apache Kafka, while a consumer transforms and save data to make it available for users.

The job of the consumer is to process all events by updating the relevant statistics, and saving these statistics to disk. When it encounters an event signalling that a job has ended, for example:

1
2
3
4
5
6
7
8
9
{
  "job_id": "843da420-b233-4bd3-b045-2a15c9d4f72a",
  "parent_job_id": "852fa24c-c01d-40bb-88fa-3189e63b75ef",
  "trace_id": "635ba6c5-e846-4da6-a1db-a6cbc4f20480",
  "job_type": "DoSomethingRelevantInDomain",
  "dispatch_time": "2022-10-10T00:00:00.00000",
  "start_time": "2022-10-10T00:00:10.00000",
  "end_time": "2022-10-10T00:00:30.00000"
}

the consumer can keep track of the average runtime by keeping track of the total amount of jobs it processed and the total runtime of these jobs. Averages can be calculated from the total runtime and total amount of jobs and anomalies - the longest jobs for instance - can be tracked separately. The same holds for (average) queueing time and the number of jobs in the queue. In this example, we add 10 seconds to the total queueing time and 20 seconds to the total runtime. And it is probably wise to aggregate this per job_type.

In Python, a basic implementation of a self-updating data class would be:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from dataclasses import dataclass
from datetime import datetime
from typing import Optional


@dataclass
class Log:
    trace_id: str
    parent_job_id: Optional[str]  # Not all jobs have a parent!
    job_id: str

    job_type: str

    dispatch_time: datetime
    start_time: Optional[datetime]
    end_time: Optional[datetime]


@dataclass
class AggregatedJobTypesData:
    total_count: int = 0
    total_runtime: float = 0.0
    total_queue_time: float = 0.0

    def update(self, end_log: Log):  # Only "end-logs" reach this method
        self.total_count += 1

        added_runtime = end_log.end_time - end_log.start_time
        self.total_runtime += added_runtime.total_seconds()

        added_queue_time = end_log.start_time - end_log.dispatch_time
        self.total_queue_time += added_queue_time.total_seconds()


aggregated_data = AggregatedJobTypesData()

def handle_log(log: Log):
    if log.end_time:
      aggregated_data.update(log)


def get_average_runtime() -> float:
    return aggregated_data.total_runtime / aggregated_data.total_count


def get_average_queueing_time() -> float:
  return aggregated_data.total_queue_time / aggregated_data.total_count

As can be seen, it is easy to extract the relevant metrics. Of course, this gets more complicated when you introduce aggregations and keeping track of anomalies. Making separate aggregation classes that can handle logs independently keeps responsibilities nicely separated in this case (also see the observer pattern).

For the collection of traces, we need to keep track of how many jobs have not ended yet for a single trace_id. This can be done by tracking the amount of dispatch-logs versus the amount of end-logs, assuming that we receive all dispatch-logs of child-jobs before receiving an end-log of a job.

The metrics can be saved to disk frequently to provide a live view to the user from a dashboard.

Conclusion

Although these events are fairly simple, they can provide a significant amount of value when used properly. Using the regular statistics in together with the parent-child relationship, we can start showing the user:

  • The last time a job of a specific type was run, and what triggered it
  • The average wall-clock time it takes for a whole trace to finish, starting from the “root” job of a trace.
  • The type of jobs (in a trace) that take up the most time.

Knowing these three properties for all jobs allows us to optimize appropriately and respond to queues piling up. Moreover, since these numbers are provided real-time, it is possible to build an alerting system into the pipeline: the relevant people could be notified that something is wrong, and act accordingly.

Next Steps

We will soon add a post about a specific implementation of a dispatcher that sends out the three events in Laravel. This will show what implementation details to keep in mind when adding such functionality to your application. Stay tuned!

Further Reading

About the author

Donny Peeters

Donny is wiskundige en begon als software- en data-engineer te freelancen voordat hij BiteStreams begon. Hij is goed in het ontwerpen van systemen die aan (complexe) behoeftes van de klant voldoen. In zijn vrije tijd sport en leest hij en gaat hij graag een drankje doen met vrienden.

Meer lezen

Verder Lezen

Enjoyed reading this post? Check out our other articles.

Wilt u meer inzicht krijgen in uw Systeem? Contacteer ons nu

Wordt meer datagedreven met BiteStreams en laat de concurrentie achter je.

Contacteer ons