/dev/urandom

/dev/urandom

Pseudorandom thoughts generator.

I'm Niccolò Maggioni.
Student, geek and developer.

Kue for Node.JS microservices

Kue is, as defined in its GitHub repository:

A priority job queue backed by Redis, built for Node.JS.

What is a job queue?

A job/message queue is an interesting tool when it comes to communication between microservices: think of it as a cache box: a process leaves a message in the box for another, and the latter picks it up. The message is read, the correspondent actions are made, and an answer is put back in the box for the first process to notice.

It is a pretty convenient way to achieve multi-process communication, isn’t it? Sum the speed of Redis and its in-memory storage to the efficiency of Kue itself and you pretty much have an immediate solution to the problem.

Using Kue isn’t particularly difficult, as you only have to define a queue name to begin sending and receiving messages; the rest of the control structures are pretty much the same all over the place.

This message queue also has many advanced features such as message priority and retry attempts, but in this article I’ll only cover the basics; a firm grasp on the core mechanism is first needed to make use of those features, and you can always check the official repo for detailed info on them.


Step one - Redis

For the rest of this article I’ll assume that you already have a Redis instance running. You don’t need anything fancy, installing it straight away from your distribution’s repositories or downloading it from the official downloads page is usually enough. Make sure that the service is running and move on.

Note: from now on I’ll be referring to a Redis instance running on your local machine; I’m sure that using a remote instance is possible as well, but I haven’t got to test how much the network latency could affect Kue’s speeds.

Step two - Node.JS (process 1)

Create a new directory in your favorite temporary folder - some use /tmp, some use ~/tmp, some other just don’t care and use ~/Documents. Just make a folder somewhere and cd into it.

Use our beloved NPM to install Kue - creating a package can be skipped for these simple examples:

1
npm install kue

Now to create the skeleton of the first process - ES6 synax follows, to be used with Node v6.0 or higher or transpiled:

process1.js
1
2
const kue = require('kue');
const queue = kue.createQueue();

Here the Kue module is being required, and a queue created in Redis via the kue.createQueue() call. If you need to specify any option for the Redis connection, you can do so by passing them in an object to that function.

Now the fun part: actually saving a message to the queue.

process1.js
1
2
3
4
5
6
7
8
9
10
11
12
13
const kue = require('kue');
const queue = kue.createQueue();

let job = queue.create('myQueue', {
from: 'process1',
type: 'testMessage',
data: {
msg: 'Hello world!'
}
}).save((err) => {
if (err) throw err;
console.log(`Job ${job.id} saved to the queue.`);
});

Running the above code will produce something along the lines of:

1
Job 5157 saved to the queue.

In these examples error sare not handled for the sake of brevity; always make sure that you take action when an error bubbles up!

Leave the process running and move on. If an ECONNREFUSED error pops up make sure that Redis is running and that it’s listening on the default 6379 port.

The first parameter of the queue.create call is the name of the queue - it must be the same in the sender and in the receiver in order for them to read each other’s messages, but be aware that you’re not limited in any way with the numbers of queues that you handle in a single process.

The second parameter is a JSON object than can be composed of every kind of information that you want to send. My very personal take on this is having two constant keys, from and type, and a variable content key, data.

This model allows you, as you’ll see further on, to easily differentiate between different services and purposes by using a single function.

process1.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const kue = require('kue');
const queue = kue.createQueue();

let job = queue.create('myQueue', {
from: 'process1',
type: 'testMessage',
data: {
msg: 'Hello world!'
}
}).save((err) => {
if (err) throw err;
console.log(`Job ${job.id} saved to the queue.`);
});

queue.on('job complete', (id, result) => {
kue.Job.get(id, (err, job) => {
if (err) throw err;
job.remove((err) => {
if (err) throw err;
console.log(`Removed completed job ${job.id}`);
});
});
});

The last block of the file contains the job complete event listener, which deletes a message from the database when it is marked as completed by the receiving process.

A simpler alternative if you don’t need completed jobs’ data might be getting rid of lines 15-23 and change line 10 to:

1
}).removeOnComplete(true).save((err) => {

Step three - Node.JS (process 2)

process2.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const kue = require('kue');
const queue = kue.createQueue();

function processMessage(data, callback) {
switch (data.from) {
case 'process1':
switch (data.type) {
case 'testMessage':
handleTestMessage(data.data, callback);
break;
}
break;
}
}

function handleTestMessage(data, callback) {
console.log(`Process1 wants me to say: "${data.msg}"`);
callback();
}

queue.process('myQueue', function(job, done){
processMessage(job.data, done);
});

As you can see above, receiving a message is actually simpler than sending one: by calling queue.process() you define the behavior of your application for incoming messages on a specific queue.

Please note that this is an event-like handler, and it will be called whenever a new message is available - there’s no need to schedule checking for new items manually. I encourage you to try out this aspect by leaving the second process running and restarting the first one multiple times, or placing the queue.create call of the latter inside a setInterval loop.

The first parameter to queue.process is, once again, the name of the queue to observe. The second parameter is the function that handles the messages. This last function takes two arguments: a Kue job object and a done callback (remember the job complete listener of the first process? It is triggered by this callback). The JSON object that was given to the queue.create call of the first process is stored in the job.data attribute.

In this second process you can clearly see what I meant above with my data scheme: the queue.process handler is kept light & simple by just relaying the message to the processMessage function, and thus avoiding to mess with Kue jobs directly.

That function switches over the various eventual senders and message types, acting as a hub for all messages. In this example you can see that the testMessage type (line 8) coming from the process1 sender (line 6) is routed to the handleTestMessage function (line 9).

When the content has been processed the callback is finally called and, by being bubbled up all the way back to queue.process, it marks the jobs as completed and triggers its deletion from Redis - the deletion actually happens in the first process, or wherever you placed the job complete listener.


This approach, however, has a downside: poorly formatted or unknown messages are left lingering in the database for undetermined periods of time, potentially stealing precious memory to other processes. This can be avoided by adding a default clause to the hub function’s switch statements, that will handle all unknown senders or types:

process2.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// ...

function processMessage(data, callback) {
switch (data.from) {
case 'process1':
switch (data.type) {
case 'testMessage':
handleTestMessage(data.data, callback);
break;
default:
callback();
}
break;
default:
callback();
}
}

// ...

In this case they are just getting marked as completed and thus getting deleted from the database.


As an exercise you may want to send a proper confirmation message back to the first process from the second one, or make the second process do some kind of asynchronous task.

Kue is a very performant and complex message queue, taking a look at the extensive official documentation is highly encouraged. Just remember that, at least at the time of writing this article, it is still in sub-v1 version (v0.11.1 precisely), so the API might be subject to change at any time.

Share this