NATS Weekly #6.5

NATS Weekly #6.5

Week of December 20 - 26, 2021

⚠️ This is part two of this week. If you haven't already please check out issue #6 for the news, announcements, and the first two "Recently asked questions".

💡 Recently asked questions

Questions sourced from Slack, Twitter, or individuals. Responses and examples are in my own words, unless otherwise noted.

When designing subject hierarchies, especially ones with larger number of tokens, is the place of token (it's index in the list of tokens) a relevant performance consideration?

This was a fantastic, nuanced question that @ToddBeats answered. I largely going pull out quotes from the Slack thread so its documented here!

The OP showed two examples, one where the subject puts the message scope/intent first, followed by the location from where it was published from.

task.submit.new.zone.us

task.submit.cancel.zone.east

task.submit.restart.zone.east.atlanta

task.status.eu.east

task.submit.new.zone.eu.warsaw

and then one in reverse, where the location is specified first.

zone.us.task.submit.new

zone.east.task.submit.cancel

zone.east.atlanta.task.submit.restart

zone.eu.east.task.status

zone.eu.warsaw.task.submit.new

Todd offered some pragmatic insight:

I'll offer a qualitative "answer" and deep experts can jump in if I am very wrong.  Subject matching does have a left-to-right orientation algorithmically.  That said, computers these days are really fast, and I would opine that short-circuit optimization there is unlikely to make appreciable difference in your end-to-end solution.

Instead, I would put the focus in what the natural Subject hierarchy (and ordering) of tokens is in your business domain (i.e. what reads correctly functionally and what makes sense from a business domain taxonomy perspective).   I think that will have the bigger dividend for you long term.

There is some cross-over for above I think.  Think hard about where you might want to filter subscription with > wildcard, especially with JS SOURCE for example.  If I want to send the stream for all zones in the EU somewhere,  zone.eu.> is pretty concise.

In addition to the hierarchy above, Todd suggested adding some kind of version token:

Also, think about whether a token that represents "version" makes sense in your hierarchies, that allow for some managed future growth.  You can always use * for "any version" or you can filter on specific version, etc.

I will add one additional design heuristic for subject hierarchies which is to order your tokens from a smaller to larger cardinality. For example, there will likely be many more task message types (task submit, cancel, etc) than there will be deployment zones, especially over time.

There are some tokens that act as a prefix/scope/qualifier for the subsequent token (e.g. zone or task) so those must be grouped with the subsequent tokens. A version token can also be thought as an extension to further specialization (increased cardinality) for the message type.

This heuristic along with considering your expected subscription filtering strategy (where you are going to put the wildcards) will provide a good foundation for growth. If you do happen to get it wrong there are strategies for remapping subjects to new ones to facilitate a migration.

What’s happening with the stream data when I perform rollout restart and NATS nodes get restarted one after another?

This question was asked indicating the stream is memory-based with more than one replica. The OP also indicated that they are using Kubernetes which has native support for doing a "rolling restart" of a set of nodes.

For memory-based streams, all data is held in memory and it never gets snapshotted to disk at any point, even if the node serving that replica is gracefully shut down.

Memory-based streams rely on replication to keep the data alive. Since every stream with 3, 5, or 7 replicas uses consensus for writes, we know that the majority of the replicas are consistent copies. This means that the minority replicas are allowed to be lagging behind, inconsistent, or have no data at all! The majority replicas will share the correct data with these minority replicas as they self-heal.

What this means in practice is that is as long as the majority is up and healthy, writes can still be accepted and replication to other replicas continues. Even if there is only one healthy replica, writes will need to stop, but it can still be used for replication.

How does this affect the rolling restart strategy?

If the nodes holding the replicas for a stream are blindly restarted (as with Kubernetes) without considering the replication state of the stream (are the majority still healthy, etc) then the stream and its data will effectively be lost (along with consumer state).

To do a proper rolling restart, one node needs to be restarted at a time, first putting the node in lame duck mode (LDM) which slowly disconnects clients so they can be reconnected to other nodes. Once all client connections are removed, the node can be safely restarted. Upon restart, NATS will catch up the node with any data replication that needs to be done.

If the node needs to be removed all together, following LDM it can be evicted. This allows NATS to immediately choose a different node in the cluster to start replicating data to. Note, this requires at least N+1 cluster size relative to the stream replicas since two replicas for the same stream cannot be hosted by one node. In other words, if you have a stream with r=3, you should have four nodes in your cluster prior to evicting a node to ensure the replication gets offloaded to the node without a replica.

I posted this a couple issues back, but here is the general algorithm for doing a proper rolling restart (with node replacement).

Is there any production ready distributed task system/framework/library for Go supporting NATS as a broker?

A succinct, but subtle response from NATS creator @derekcollison

I believe JetStream with WorkQueue retention and pull based scale out consumers would work well there..

The point here is that there is no need for anything but NATS! To elaborate on this response a bit, a JetStream stream supports a retention policy of WorkQueuePolicy which means it will behave as a queue. Messages are written to the stream and once they are consumed and ACK'ed, the message is removed from the stream.

A simple configuration example may be:

js.AddStream(&nats.StreamConfig{

Name: "QUEUE",

Subjects: []string{"queue.>"},

Retention: nats.WorkQueuePolicy,

Storage: nats.FileStorage,

Replicas: 3,

})

The file-based storage and replicas are technically optional, but if you need high availability and durability those should be used. And of course the name can be changed and the set of subjects can vary for your needs.

On the consumption side, a queue subscription can be used in order to fan out the workload. Multiple subscriptions to the same queue group can be deployed in order to parallelize consumption of the queue (this assumes the order of processing does not matter).

Depending on how you choose to consume the queue, a pull or push consumer could be used. Most will likely want a pull consumer so the clients can manage flow control. Otherwise a push consumer will result in the server pushing batches of messages to the clients as quickly as possible.

As an aside, I find the various ways of setting up consumers a tad overwhelming, but I am writing a post coming in the next week or two with a bunch of examples to fully grok consumers!

What does it mean that NATS clustered servers have a forwarding limit of one hop?

This was a question about a section in the NATS documentation describing how clustered servers forward messages.

Note that NATS clustered servers have a forwarding limit of one hop. This means that each nats-server instance will only forward messages that it has received from a client to the immediately adjacent nats-server instances to which it has routes. Messages received from a route will only be distributed to local clients.

This reads like it is a limitation (e.g. "forwarding limit"), however this is a feature! Since the NATS cluster form a full mesh via gossip, only one hop is necessary for forwarding messages. In the case of a gateway connection, two hops would be necessary since message needs to flow from cluster to cluster.

This provides guarantees around latency since excessive servers hops won't be performed.