Kue for Node.JS microservices
Kue is, as defined in its GitHub repository:
A priority job queue backed by Redis, built for Node.JS.
What is a job queue?
A job/message queue is an interesting tool when it comes to communication between microservices: think of it as a cache box: a process leaves a message in the box for another, and the latter picks it up. The message is read, the correspondent actions are made, and an answer is put back in the box for the first process to notice.
It is a pretty convenient way to achieve multi-process communication, isn’t it? Sum the speed of Redis and its in-memory storage to the efficiency of Kue itself and you pretty much have an immediate solution to the problem.
Using Kue isn’t particularly difficult, as you only have to define a queue name to begin sending and receiving messages; the rest of the control structures are pretty much the same all over the place.
This message queue also has many advanced features such as message priority and retry attempts, but in this article I’ll only cover the basics; a firm grasp on the core mechanism is first needed to make use of those features, and you can always check the official repo for detailed info on them.
Step one - Redis
For the rest of this article I’ll assume that you already have a Redis instance running. You don’t need anything fancy, installing it straight away from your distribution’s repositories or downloading it from the official downloads page is usually enough. Make sure that the service is running and move on.
Note: from now on I’ll be referring to a Redis instance running on your local machine; I’m sure that using a remote instance is possible as well, but I haven’t got to test how much the network latency could affect Kue’s speeds.
Step two - Node.JS (process 1)
Create a new directory in your favorite temporary folder - some use /tmp
, some use ~/tmp
, some other just don’t care and use ~/Documents
. Just make a folder somewhere and cd
into it.
Use our beloved NPM to install Kue - creating a package can be skipped for these simple examples:
1 | npm install kue |
Now to create the skeleton of the first process - ES6 synax follows, to be used with Node v6.0 or higher or transpiled:
1 | const kue = require('kue'); |
Here the Kue module is being required, and a queue created in Redis via the kue.createQueue()
call. If you need to specify any option for the Redis connection, you can do so by passing them in an object to that function.
Now the fun part: actually saving a message to the queue.
1 | const kue = require('kue'); |
Running the above code will produce something along the lines of:
1 | Job 5157 saved to the queue. |
In these examples error sare not handled for the sake of brevity; always make sure that you take action when an error bubbles up!
Leave the process running and move on. If an ECONNREFUSED
error pops up make sure that Redis is running and that it’s listening on the default 6379
port.
The first parameter of the queue.create
call is the name of the queue - it must be the same in the sender and in the receiver in order for them to read each other’s messages, but be aware that you’re not limited in any way with the numbers of queues that you handle in a single process.
The second parameter is a JSON object than can be composed of every kind of information that you want to send. My very personal take on this is having two constant keys, from
and type
, and a variable content key, data
.
from
holds a String that states which service originated the message, in this caseprocess1
.type
holds a String that states the purpose of the message, in this casetestMessage
.data
, finally, holds a JSON object with everything that you want to send to the other process.
This model allows you, as you’ll see further on, to easily differentiate between different services and purposes by using a single function.
1 | const kue = require('kue'); |
The last block of the file contains the job complete
event listener, which deletes a message from the database when it is marked as completed by the receiving process.
A simpler alternative if you don’t need completed jobs’ data might be getting rid of lines 15-23 and change line 10 to:
1 | }).removeOnComplete(true).save((err) => { |
Step three - Node.JS (process 2)
1 | const kue = require('kue'); |
As you can see above, receiving a message is actually simpler than sending one: by calling queue.process()
you define the behavior of your application for incoming messages on a specific queue.
Please note that this is an event-like handler, and it will be called whenever a new message is available - there’s no need to schedule checking for new items manually. I encourage you to try out this aspect by leaving the second process running and restarting the first one multiple times, or placing the
queue.create
call of the latter inside asetInterval
loop.
The first parameter to queue.process
is, once again, the name of the queue to observe. The second parameter is the function that handles the messages. This last function takes two arguments: a Kue job
object and a done
callback (remember the job complete
listener of the first process? It is triggered by this callback). The JSON object that was given to the queue.create
call of the first process is stored in the job.data
attribute.
In this second process you can clearly see what I meant above with my data scheme: the queue.process
handler is kept light & simple by just relaying the message to the processMessage
function, and thus avoiding to mess with Kue jobs directly.
That function switches over the various eventual senders and message types, acting as a hub for all messages. In this example you can see that the testMessage
type (line 8) coming from the process1
sender (line 6) is routed to the handleTestMessage
function (line 9).
When the content has been processed the callback is finally called and, by being bubbled up all the way back to queue.process
, it marks the jobs as completed and triggers its deletion from Redis - the deletion actually happens in the first process, or wherever you placed the job complete
listener.
This approach, however, has a downside: poorly formatted or unknown messages are left lingering in the database for undetermined periods of time, potentially stealing precious memory to other processes. This can be avoided by adding a default
clause to the hub function’s switch
statements, that will handle all unknown senders or types:
1 | // ... |
In this case they are just getting marked as completed and thus getting deleted from the database.
As an exercise you may want to send a proper confirmation message back to the first process from the second one, or make the second process do some kind of asynchronous task.
Kue is a very performant and complex message queue, taking a look at the extensive official documentation is highly encouraged. Just remember that, at least at the time of writing this article, it is still in sub-v1 version (v0.11.1 precisely), so the API might be subject to change at any time.