Building a Job Queue with NATS.io and Go
Job queues are essential components in software architecture that are designed to manage and process tasks asynchronously. A job queue allows applications to defer work that doesn't need to be completed immediately. Instead of handling tasks synchronously and potentially blocking the main application flow, tasks are placed in a queue and processed asynchronously by worker processes. Common applications of job queues include processing large batches of data, sending emails or notifications, generating reports, and handling long-running computations. They can also decouple different parts of a system, allowing each component to operate independently and communicate through queued tasks.
This article will cover how to create a lightweight and performant job queue, along with a dead letter queue. We'll explain how to implement this using the Go programming language and NATS, a high-performance messaging system designed for cloud-native applications, IoT messaging, and microservices architectures.
What Is a Job Queue?
At its core, a job queue is a data structure that stores tasks to be executed asynchronously. Jobs are placed into the queue by producers (parts of the application that generate work) and pulled from the queue by consumers (worker processes that perform the actual work). This decoupling of job production and consumption allows for flexible scaling, as the number of workers can be adjusted based on the workload to ensure efficient resource utilization. Job queues often come with additional features like persistence to prevent data loss in case of system failures, retry mechanisms for failed jobs, and monitoring capabilities to track queue health and performance.
Consider a web application. When a user uploads a file, the application can quickly acknowledge the upload and offload the file processing (like resizing images or extracting metadata) to a job queue. For sending notifications, such as emails or SMS messages, the main application only needs to enqueue the message request, leaving the actual sending to be handled by a background worker. In an e-commerce application, the order processing system can place new orders into a job queue while a separate inventory service retrieves and processes these orders independently. This decoupling ensures that the order placement and inventory update processes do not directly depend on each other and can be scaled independently. The failure of one service also doesn't affect the other.
Job Queue Implementation with Go and NATS
NATS is used for real-time data streaming and communication between distributed systems. The core component in NATS offers reliable publish-subscribe, request-reply, and queuing messaging patterns. NATS JetStream is an extension of the NATS messaging system that provides advanced features like persistent storage, message replay, and streaming. JetStream is built into nats-server
itself—it's a feature that you can enable or disable according to your requirements. It enables persistence mechanisms for streams, increasing system resilience and reliability and providing additional features such as message replay.
Solution Overview
To create our job queue and dead letter queue, we'll build a sample application with three components, consisting of a publisher and two stream consumers:
- Publisher: Sends data to a NATS subject
- Consumer (worker): Receives and processes data from the NATS stream; we can run multiple instances to load balance the processing among them
- Dead letter queue processor: Handles failed messages that could not be processed by the consumer
We will develop these components using the NATS Go client. NATS clients are used to connect to and communicate with the NATS server. The ones in the nats-io GitHub organization are official clients maintained by NATS authors.
Note that the NATS Go client is JetStream-enabled, so we don't need a different library for JetStream-specific operations.
In this article, we'll use Synadia Cloud, which offers a globally accessible, fully managed NATS.io platform that works across multiple clouds and includes a user-friendly admin portal and API.
Prerequisites
To follow along, make sure you have a recent version of the Go programming language installed for your operating system.
You'll also need to install natscli
to communicate with the NATS server. Follow the steps in the documentation to install natscli
for your operating system.
Setting Up Synadia Cloud
Before you begin, sign up for Synadia Cloud.
After you sign up, select "NGS (NATS Global System)". There should be a default account created when you log in:
Inside the account, navigate to Users. You should have a "CLI" user created in advance. You'll use this user for the sample application in this guide.
To download the user credentials, click the user details and choose Get Connected > Download Credentials to save the .creds
file to your machine.
Save the Synadia Cloud connection configuration as context. Replace ENTER_PATH_TO_CREDS_FILE
with the path where you saved the .creds
file:
Once the configuration is successfully saved, you should see this output:
To confirm that you are connected to Synadia Cloud, execute the command below to get your account details:
Creating a NATS JetStream
There are multiple ways to create a stream. We can use the Synadia Cloud console, NATS client SDKs, or natscli
.
Let's create one using the stream
subcommand of natscli
:
JetStream allows for a lot of flexibility via configurations. Let's walk through the ones used here:
--subjects
is a list of subjects that the stream will consume from. The wildcardorders.*
means the stream will include any subject that starts withorders.
.--storage
specifies the persistence mechanism for messages.file
means that the messages will be persisted on disk.--replicas
specifies the number of replicas in clustered mode. A value of1
means there is no replication.--retention
sets how messages are retained. Thework
retention policy keeps the messages until it is delivered to one consumer that explicitly acknowledges receipt, and then it's cleared.--discard
determines what happens when a stream reaches its limits of size or messages.new
means new messages will be discarded if limits are reached.--max-msgs
defines the number of messages to retain in the store for this stream. A value of-1
means there is no limit on the number of messages.--max-msgs-per-subject
specifies the number of messages to retain in the store for this stream per unique subject. A value of-1
means there is no limit on the number of messages per subject.--max-bytes
is the maximum combined size of all messages in a stream. Here, the limit is 256 megabytes.--max-age
defines the oldest messages that can be stored in the stream. A value of-1
means messages do not expire based on age.--max-msg-size
is the maximum size any single message can be to be accepted by the stream. The Synadia Cloud account limit defaults to 1.0MiB message size, which is what we used here.--dupe-window
defines the window for identifying duplicate messages. Here, a window of 2 minutes is set, meaning duplicates within this time frame will be detected if the message was published using a unique MsgId.--deny-delete
is used to enable/disable message deletion via the API.--deny-purge
disallows the entire stream and subject to be purged via the API.--allow-direct
allows direct access to stream data via the direct get API.
Finally, we include the name of the stream (orders_stream
).
Creating a Dead Letter Queue
A dead letter queue (DLQ) is a special type of stream that is used to temporarily store messages that failed to deliver. A DLQ is often used to troubleshoot any issues that might cause message delivery to fail.
Create the orders_unprocessed
JetStream that will function as a dead letter queue:
Writing the JetStream Publisher
Now that we've created the stream, we can create our application.
Create a Go project:
Install the required dependencies:
Create a file at publisher/publisher.go
, then start by importing the necessary modules:
In the main
function, get the credentials file path from the environment variable:
Use the nats.Connect
function to connect with the Synadia Cloud nats
server. A NATS connection will attempt to remain always connected, with built-in reconnection logic in case of network failures or server restarts:
Create an instance of jetstream.JetStream
with jetstream.New
:
Use the Publish
function on the JetStream
instance in a loop to send twenty messages to the subject:
Start the publisher to send data to the orders.new
subject. Replace ENTER_PATH_TO_CREDS_FILE
with the path where we saved the .creds
file:
The application will send twenty messages to the subject before terminating. The application logs should look something like this:
Writing the JetStream Consumer
Every message that is sent to the subject(s) configured with the stream will be automatically available in the stream. A JetStream consumer client can consume a subset of messages stored in a stream. On the server side, NATS will keep track of which messages were delivered and acknowledged by clients.
Create consumer/consumer.go
and import the modules:
As before, fetch the credentials file from the environment variable in the main
function:
The consumer receives two arguments: the name of the stream to consume the messages from and a subject filter that filters the subjects the consumer can read from:
As before, connect to the NATS server and create a JetStream:
After connecting to the Synadia Cloud nats
server and creating the JetStream
instance, use the CreateOrUpdateConsumer
function to create a jetstream.Consumer
instance.
Take note of the configuration, specifically MaxDeliver
, BackOff
, and FilterSubject
:
Invoke the Consume
function on the jetstream.Consumer
instance with a callback. Messages from the stream will be continuously received and processed with the provided callback logic:
The processing logic here is simplified on purpose; it simply logs the message payload along with the message sequence number. Take note of the if rand.Intn(10) < 5
section, which purposely creates a situation where the message is not acknowledged (hence it will be redelivered). In this case, redelivery will only happen twice (as per the consumer configuration), after which it will be handled by NATS.
We'll explore this further in the next section, where we'll implement a dead letter queue to make sure unprocessed messages are not lost.
Start the consumer application to process data from the stream. Replace ABSOLUTE_PATH_TO_CREDS_FILE
with the path where we saved the .creds
file:
The application logs should look something like this:
A few messages (orders) are received and processed successfully, as evident in the logs: received order-0 from stream sequence # 448
. The messages that fail to be processed (more on this in the upcoming sections) are also highlighted in the logs, such as error processing order-2
.
Keep the consumer running, as we'll use it again later.
Implementing a Dead Letter Queue Using JetStream
If a message reaches its maximum number of delivery attempts (as per the MaxDeliver
configuration), an advisory message is published on the $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.<STREAM>.<CONSUMER>
subject. The advisory message payload contains a stream_seq
field that contains the sequence number of the message in the stream, which can be used to get the message from the stream and reprocess it. This forms the foundation of a DLQ implementation. We've already created the orders_unprocessed
stream, which reads from the $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.orders_stream.*
subject and acts as the DLQ.
Create dlq/consumer.go
and add the starting code:
The program will receive two arguments, consisting of the name of the DLQ stream and the name of the original stream:
As before, connect to the NATS server:
Create a consumer for the DLQ:
Get a reference to the original stream:
The DLQ handler implementation is similar to the consumer component. The application subscribes to the $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.orders_stream.*
subject and:
- Extracts the stream sequence from the event
JSON
payload - Uses the
GetMsg
function on thejetstream.Stream
(oforders_stream
) to get the message payload - Processes the message (in this case, logs it to standard out) and acknowledges it (
Ack()
)
Start the dead letter queue processor to process data from the stream. Replace ABSOLUTEt_PATH_TO_CREDS_FILE
with the path where you saved the .creds
file:
The application logs should look something like this:
Load Balanced Processing
You can run multiple instances of the consumer application to balance the load among them. Load balancing comes out of the box with NATS. When multiple consumers listen to the same subject, NATS automatically distributes the messages across these subscribers. This balances the load, ensuring that no single consumer is overwhelmed with too much work while others are idle. Since NATS handles load balancing internally, developers don’t need to set up or maintain external load balancers. This reduces complexity and the chances of misconfiguration. It also allows for horizontal scaling without needing to configure external load balancers or adjust message routing logic.
We should already have the consumer running. Start a second instance of the consumer application. Replace ABSOLUTE_PATH_TO_CREDS_FILE
with the path where we saved the .creds
file:
Now, run the producer application again. Replace ABSOLUTE_PATH_TO_CREDS_FILE
with the path where we saved the .creds
file:
Follow the application logs of both instances to verify that each of them is processing a different message from the orders_stream
stream. This will be evident from the order (for example, order-1
might be processed by the first instance and order-2
might be processed by the second one), as well as the stream sequence number.
The dead letter queue processor should continue to work the same way and process the failed messages.
You can find the complete code on Github.
Conclusion
In this article, we learned about the basics of job queues and how they enable asynchronous processing. Using NATS JetStream, we implemented a job processing application along with dead letter queue functionality in Go.
Thanks to a fully managed solution like Synadia Cloud, we didn't have to manage NATS-related infrastructure. We were able to easily set up the stream with persistence, fault tolerance, and message replay capabilities using natscli
and connect Go application components (producer, consumers, and DLQ processor) to Synadia Cloud using the JetStream-compliant NATS Go client.