All posts

NATS Weekly #30

Week of June 6 - 12, 2022

🗞 Announcements, writings, and projects

A short list of  announcements, blog posts, projects updates and other news.

⚡Releases

Official releases from NATS repos and others in the ecosystem.

💬 Discussions

Github Discussions from various NATS repositories.

💡 Recently asked questions

Questions sourced from Slack, Twitter, or individuals. Responses and examples are in my own words, unless otherwise noted.

What exactly is a slow consumer?

A slow consumer is a subscription that is unable to handle the throughput of messages being sent to it by the server. Slow consumers are detected and handled to prevent any one subscriber slowing down the overall system, a kind of circuit breaker.

There are mechanisms built into client libraries to detect it early and try to recover as well as the server as a last resort if the client can’t address the issue. The server will simply drop the client connection. Of course thc client can reconnect, but this interruption may help recover from intermittent issues. Otherwise an operator can observe this issue and address.

One side note, this term predates JetStream and it does not apply to consumers themselves. Rather these apply to client-side subscriptions which may be subscribed to a standard subject or or bound to a JetStream consumer.

Here is an example of a core NATS subscription and where this error can be checked (other error handling elided for brevity).

sub, _ := nc.SubscribeSync("foo")

// The error returned could be nats.ErrSlowConsumer

msg, err := sub.NextMsg(time.Second)

if errors.Is(err, nats.ErrSlowConsumer) {

// Get the number of pending messages to report on what was dropped.

n, _, _ := sub.Pending()

fmt.Printf("slow consumer detected: dropped %d pending messages\n", n)

}

The default limits are quite high, but may vary from client to client since its an implementation detail. Per-subscription pending limits can be set using sub.SetPendingLimits(32_000, 16*1024*1024) where the first number is the number of messages and second is the number of total bytes. Use a value of -1 to remove the limit. Except for in unique cases, changing these limits are not needed or recommended.

For a ChanSubscribe, the pending limit is inherent in the buffer size of the channel you define.

ch := make(chan *nats.Msg, 100_000)

sub, _ := nc.SubscribeChan("foo", ch)

If the subscription is unable to send a message onto the channel, it will know the channel is filled and will report a slow consumer.

How does this apply to subscription bound to a JetStream consumer?

All of the logic is the same, however, by default, messages are expected to be ack’ed. If messages are dropped, these are not ack’ed and therefore the consumer will eventually resend this messages.

sub, _ := js.SubscribeSync("bar", nats.Bind("BAR", "c1"))

msg, err := sub.NextMsg(time.Second)

// Same logic applies except, on a successful handling of the message

// an msg.Ack() should be called.

The first (core NATS) example and the second (JetStream) example showcases the fundamental difference between at-most-once and at-least-once delivery.

Refer to the slow consumer documentation for more details and options in which you can prevent slow consumers from happening in the first place (such as using queue groups).

How can you update the replicas or location of a stream?

There may be cases where a stream starts out with one replica or in one location and there is a need to increase its durability/availability or move it.

Two new, experiemental features are the ability to change the number of replicas of a stream dynamically and similiarly change the placement of the replicas to a different set of servers.

Given a stream ORDERS with one replica, the NAT CLI can be used to increase the replica count to three in order to increase durability and availability.

nats stream edit --replicas 3 ORDERS

This will allocate replicas on available nodes in the cluster and the data will be replicated from the first replica (the leader). You can see the status of stream using nats stream report ORDERS (there is a replicas column). The time it takes to replicate depends on the size of the stream.

The second feature is changing the placement of the stream to a new set of nodes, whether it is in the same cluster or in a different cluster altogether. This requires each server to be configured with one or more tags, e.g.

# cluster 1 config.. where each node in the server could have

# a different az

server_tags: ["us-east", "az1"]

# cluster 2 config..

server_tags: ["us-west", "az1"]

If we have a stream in us-east, we could edit the stream and change the tag to us-west and it will move the stream across clusters.

nats stream edit --tag us-west

As noted above for changing the replica count, this may take some time as well as require a re-connect from clients since the leader is moving as well.