Using Bitly’s NSQ Job Queue

I’ve recently been impressed by Bitly’s NSQ server, written in Go. Aside from the part about Go capturing my attention, the part that most interested me was 1) they claim that it achieves 90,000 messages/second (which is decent), and 2) it’s relatively easy to set-up, and it’s self-managing.

The topology for NSQ is straight forward: N queue servers (nsqd), 0+ lookup servers (nsqlookupd), and an optional admin (dashboard) server (nsqadmin). The lookup servers are optional, but they allow auto-discovery of which hosts are managing which topics. Bitly recommends that a cluster of three are used in production. To start multiple instances, just launch them. You’ll have to pass in a list of nsqlookupd hosts to the consumer client, and a list of nsqd hosts to the producer client.

The message pipeline is intuitive: messages are pushed along with topics/classifiers, and consumers listen for topics and channels. A channel is a named grouping of consumers that work on similar tasks, where the “channel” is presented as a string to the consumer instance. NSQ uses the concepts of topics and channels to drive multicast and distributed delivery.

As far as optimization goes, there are about three dozen parameters for nsqd, but you need not concern yourself with most of them, here.

This example resembles the one from the NSQ website, plus some additional info. All four processes can be run from the same system.

Quick Start

Get and build the primary components. $GOPATH needs to either be set to your Go workspace (mine is ~/.go, below), or an empty directory that will be used for it. $GOPATH/bin needs to be in the path.

go get github.com/kr/godep

godep get github.com/bitly/nsq/nsqd
godep get github.com/bitly/nsq/nsqlookupd
godep get github.com/bitly/nsq/nsqadmin

To start, run each of the following services in a different terminal on the same system.

A lookup server instance:

nsqlookupd

A queue instance:

nsqd --lookupd-tcp-address=127.0.0.1:4160

An admin server instance:

nsqadmin --template-dir=~/.go/src/github.com/bitly/nsq/nsqadmin/templates --lookupd-http-address=127.0.0.1:4161

To push test-items:

curl -d 'hello world 1' 'http://127.0.0.1:4151/put?topic=test'
curl -d 'hello world 2' 'http://127.0.0.1:4151/put?topic=test'
curl -d 'hello world 3' 'http://127.0.0.1:4151/put?topic=test'

The “apps” aren’t built, apparently, by default. We’ll need these so we can get a message-dumper, for testing:

~/.go/src/github.com/bitly/nsq$ make
cd build/apps

To dump data that’s already waiting in the queues:

./nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161

Display queue data:

cat /tmp/test.*.log
hello world 1
hello world 2
hello world 3

Python Library

Matt Reiferson wrote pynsq, which is a Python client that employs Tornado for it’s message-loops. The gotcha is that both the consumers -and- producers both require you to use IOLoop, Tornado’s message-loop. This is because pynsq not only allows you to define a “receive” callback, but a post-send callback as well. Though you don’t have to define one, there is an obscure, but real, chance that a send will fail, per Matt, and should always be checked for.

Because of this design, you should be prepared to put all of your core loop logic into the Tornado loop.

To install the client:

sudo pip install pynsq tornado

A producer example from the “pynsq” website:

import nsq
import tornado.ioloop
import time

def pub_message():
    writer.pub('test', time.strftime('%H:%M:%S'), finish_pub)

def finish_pub(conn, data):
    print data

writer = nsq.Writer(['127.0.0.1:4150'])
tornado.ioloop.PeriodicCallback(pub_message, 1000).start()
nsq.run()

An asynchronous consumer example from the “pynsq” website (doesn’t correspond to the producer example):

import nsq

buf = []

def process_message(message):
    global buf
    message.enable_async()
    # cache the message for later processing
    buf.append(message)
    if len(buf) >= 3:
        for msg in buf:
            print msg
            msg.finish()
        buf = []
    else:
        print 'deferring processing'

r = nsq.Reader(message_handler=process_message,
        lookupd_http_addresses=['http://127.0.0.1:4161'],
        topic='nsq_reader', channel='async', max_in_flight=9)
nsq.run()

Give it a try.

FAQ

(Courtesy of a dialogue with Matt Reiferson)

Q: Most job-queues allow you send messages without imposing a loop. Is the 
   IOLoop required for both receiving -and- sending in pynsq?
A: Yes. pynsq supports the notion of completion-callbacks to signal when a send 
   finishes. Even if you don't use it, it's accounted-for in the mechanics. If 
   you want to send synchronous messages without the loop, hit the HTTP 
   endpoint. However, facilitating both the receive and send IOLoops allows for 
   the fasted possible dialogue, especially when the writers and readers are 
   paired to the same hosts.

Q: An IOLoop is even required for asynchronous sends?
A: Yes. If you want to simply send one-off asynchronous messages, 
   consider opening a worker process that manages delivery. It can apply its 
   own callback to catch failures, and transmit successes, failures, etc.. to 
   an IPC queue (if you need this info).

Q: Are there any delivery guarantees (like in ZeroMQ)?
A: No. It's considered good-practice by the NSQ guys to always check the 
   results of message-sends in any situation (in any kind of messaging, in 
   general). You'd do this from the callbacks, with pynsq.

    The reasons that a send would fail are the following:

    1: The topic name is not formatted correctly (to character/length 
       restrictions). There is no official documentation of this, however.
    2: The message is too large (this can be set via a parameter to nsqd).
    3: There is a breakdown related to a race-condition with a publish and a 
       delete happening on a specific topic. This is rare.
    4: Client connection-related failures.

Q: In scenario (3) of the potential reasons for a send-failure, can I mitigate 
   the publish/delete phenomena if I am either not deleting topics or have 
   orchestrated deletions such that writes eliciting topic creations will never 
   be done until a sufficient amount of time has elapsed since a deletion?
A: Largely. Though, if nowhere else, this can also happen internally to NSQ at 
   shutdown.

Q: How are new topics announced to the cluster?
A: The first writer or reader request for a topic will be applied on the 
   upstream nsqd host, and will then propagate to the nsqlookupd hosts. They will 
   eventually spread to the other readers from there. The same thing applies to 
   a new topic, as well as a previously-deleted one.

Advertisements