Kue for Node.JS microservices
Kue is, as defined in its GitHub repository:
A priority job queue backed by Redis, built for Node.JS.
What is a job queue?
A job/message queue is an interesting tool when it comes to communication between microservices: think of it as a cache box: a process leaves a message in the box for another, and the latter picks it up. The message is read, the correspondent actions are made, and an answer is put back in the box for the first process to notice.
It is a pretty convenient way to achieve multi-process communication, isn’t it? Sum the speed of Redis and its in-memory storage to the efficiency of Kue itself and you pretty much have an immediate solution to the problem.
Using Kue isn’t particularly difficult, as you only have to define a queue name to begin sending and receiving messages; the rest of the control structures are pretty much the same all over the place.
This message queue also has many advanced features such as message priority and retry attempts, but in this article I’ll only cover the basics; a firm grasp on the core mechanism is first needed to make use of those features, and you can always check the official repo for detailed info on them.
Step one - Redis
For the rest of this article I’ll assume that you already have a Redis instance running. You don’t need anything fancy, installing it straight away from your distribution’s repositories or downloading it from the official downloads page is usually enough. Make sure that the service is running and move on.
Note: from now on I’ll be referring to a Redis instance running on your local machine; I’m sure that using a remote instance is possible as well, but I haven’t got to test how much the network latency could affect Kue’s speeds.
Step two - Node.JS (process 1)
Create a new directory in your favorite temporary folder - some use
/tmp, some use
~/tmp, some other just don’t care and use
~/Documents. Just make a folder somewhere and
cd into it.
Use our beloved NPM to install Kue - creating a package can be skipped for these simple examples:
Now to create the skeleton of the first process - ES6 synax follows, to be used with Node v6.0 or higher or transpiled:
Here the Kue module is being required, and a queue created in Redis via the
kue.createQueue() call. If you need to specify any option for the Redis connection, you can do so by passing them in an object to that function.
Now the fun part: actually saving a message to the queue.
Running the above code will produce something along the lines of:
In these examples error sare not handled for the sake of brevity; always make sure that you take action when an error bubbles up!
Leave the process running and move on. If an
ECONNREFUSED error pops up make sure that Redis is running and that it’s listening on the default
The first parameter of the
queue.create call is the name of the queue - it must be the same in the sender and in the receiver in order for them to read each other’s messages, but be aware that you’re not limited in any way with the numbers of queues that you handle in a single process.
The second parameter is a JSON object than can be composed of every kind of information that you want to send. My very personal take on this is having two constant keys,
type, and a variable content key,
fromholds a String that states which service originated the message, in this case
typeholds a String that states the purpose of the message, in this case
data, finally, holds a JSON object with everything that you want to send to the other process.
This model allows you, as you’ll see further on, to easily differentiate between different services and purposes by using a single function.
The last block of the file contains the
job complete event listener, which deletes a message from the database when it is marked as completed by the receiving process.
A simpler alternative if you don’t need completed jobs’ data might be getting rid of lines 15-23 and change line 10 to:
Step three - Node.JS (process 2)
As you can see above, receiving a message is actually simpler than sending one: by calling
queue.process() you define the behavior of your application for incoming messages on a specific queue.
Please note that this is an event-like handler, and it will be called whenever a new message is available - there’s no need to schedule checking for new items manually. I encourage you to try out this aspect by leaving the second process running and restarting the first one multiple times, or placing the
queue.createcall of the latter inside a
The first parameter to
queue.process is, once again, the name of the queue to observe. The second parameter is the function that handles the messages. This last function takes two arguments: a Kue
job object and a
done callback (remember the
job complete listener of the first process? It is triggered by this callback). The JSON object that was given to the
queue.create call of the first process is stored in the
In this second process you can clearly see what I meant above with my data scheme: the
queue.process handler is kept light & simple by just relaying the message to the
processMessage function, and thus avoiding to mess with Kue jobs directly.
That function switches over the various eventual senders and message types, acting as a hub for all messages. In this example you can see that the
testMessage type (line 8) coming from the
process1 sender (line 6) is routed to the
handleTestMessage function (line 9).
When the content has been processed the callback is finally called and, by being bubbled up all the way back to
queue.process, it marks the jobs as completed and triggers its deletion from Redis - the deletion actually happens in the first process, or wherever you placed the
job complete listener.
This approach, however, has a downside: poorly formatted or unknown messages are left lingering in the database for undetermined periods of time, potentially stealing precious memory to other processes. This can be avoided by adding a
default clause to the hub function’s
switch statements, that will handle all unknown senders or types:
In this case they are just getting marked as completed and thus getting deleted from the database.
As an exercise you may want to send a proper confirmation message back to the first process from the second one, or make the second process do some kind of asynchronous task.
Kue is a very performant and complex message queue, taking a look at the extensive official documentation is highly encouraged. Just remember that, at least at the time of writing this article, it is still in sub-v1 version (v0.11.1 precisely), so the API might be subject to change at any time.