I’ve recently been impressed by Bitly’s NSQ server, written in Go. Aside from the part about Go capturing my attention, the part that most interested me was 1) they claim that it achieves 90,000 messages/second (which is decent), and 2) it’s relatively easy to set-up, and it’s self-managing.
The topology for NSQ is straight forward: N queue servers (nsqd), 0+ lookup servers (nsqlookupd), and an optional admin (dashboard) server (nsqadmin). The lookup servers are optional, but they allow auto-discovery of which hosts are managing which topics. Bitly recommends that a cluster of three are used in production. To start multiple instances, just launch them. You’ll have to pass in a list of nsqlookupd hosts to the consumer client, and a list of nsqd hosts to the producer client.
The message pipeline is intuitive: messages are pushed along with topics/classifiers, and consumers listen for topics and channels. A channel is a named grouping of consumers that work on similar tasks, where the “channel” is presented as a string to the consumer instance. NSQ uses the concepts of topics and channels to drive multicast and distributed delivery.
As far as optimization goes, there are about three dozen parameters for nsqd, but you need not concern yourself with most of them, here.
This example resembles the one from the NSQ website, plus some additional info. All four processes can be run from the same system.
Quick Start
Get and build the primary components. $GOPATH needs to either be set to your Go workspace (mine is ~/.go, below), or an empty directory that will be used for it. $GOPATH/bin needs to be in the path.
go get github.com/kr/godep godep get github.com/bitly/nsq/nsqd godep get github.com/bitly/nsq/nsqlookupd godep get github.com/bitly/nsq/nsqadmin
To start, run each of the following services in a different terminal on the same system.
A lookup server instance:
nsqlookupd
A queue instance:
nsqd --lookupd-tcp-address=127.0.0.1:4160
An admin server instance:
nsqadmin --template-dir=~/.go/src/github.com/bitly/nsq/nsqadmin/templates --lookupd-http-address=127.0.0.1:4161
To push test-items:
curl -d 'hello world 1' 'http://127.0.0.1:4151/put?topic=test' curl -d 'hello world 2' 'http://127.0.0.1:4151/put?topic=test' curl -d 'hello world 3' 'http://127.0.0.1:4151/put?topic=test'
The “apps” aren’t built, apparently, by default. We’ll need these so we can get a message-dumper, for testing:
~/.go/src/github.com/bitly/nsq$ make cd build/apps
To dump data that’s already waiting in the queues:
./nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
Display queue data:
cat /tmp/test.*.log hello world 1 hello world 2 hello world 3
Python Library
Matt Reiferson wrote pynsq, which is a Python client that employs Tornado for it’s message-loops. The gotcha is that both the consumers -and- producers both require you to use IOLoop, Tornado’s message-loop. This is because pynsq not only allows you to define a “receive” callback, but a post-send callback as well. Though you don’t have to define one, there is an obscure, but real, chance that a send will fail, per Matt, and should always be checked for.
Because of this design, you should be prepared to put all of your core loop logic into the Tornado loop.
To install the client:
sudo pip install pynsq tornado
A producer example from the “pynsq” website:
import nsq import tornado.ioloop import time def pub_message(): writer.pub('test', time.strftime('%H:%M:%S'), finish_pub) def finish_pub(conn, data): print data writer = nsq.Writer(['127.0.0.1:4150']) tornado.ioloop.PeriodicCallback(pub_message, 1000).start() nsq.run()
An asynchronous consumer example from the “pynsq” website (doesn’t correspond to the producer example):
import nsq buf = [] def process_message(message): global buf message.enable_async() # cache the message for later processing buf.append(message) if len(buf) >= 3: for msg in buf: print msg msg.finish() buf = [] else: print 'deferring processing' r = nsq.Reader(message_handler=process_message, lookupd_http_addresses=['http://127.0.0.1:4161'], topic='nsq_reader', channel='async', max_in_flight=9) nsq.run()
Give it a try.
FAQ
(Courtesy of a dialogue with Matt Reiferson)
Q: Most job-queues allow you send messages without imposing a loop. Is the IOLoop required for both receiving -and- sending in pynsq? A: Yes. pynsq supports the notion of completion-callbacks to signal when a send finishes. Even if you don't use it, it's accounted-for in the mechanics. If you want to send synchronous messages without the loop, hit the HTTP endpoint. However, facilitating both the receive and send IOLoops allows for the fasted possible dialogue, especially when the writers and readers are paired to the same hosts. Q: An IOLoop is even required for asynchronous sends? A: Yes. If you want to simply send one-off asynchronous messages, consider opening a worker process that manages delivery. It can apply its own callback to catch failures, and transmit successes, failures, etc.. to an IPC queue (if you need this info). Q: Are there any delivery guarantees (like in ZeroMQ)? A: No. It's considered good-practice by the NSQ guys to always check the results of message-sends in any situation (in any kind of messaging, in general). You'd do this from the callbacks, with pynsq. The reasons that a send would fail are the following: 1: The topic name is not formatted correctly (to character/length restrictions). There is no official documentation of this, however. 2: The message is too large (this can be set via a parameter to nsqd). 3: There is a breakdown related to a race-condition with a publish and a delete happening on a specific topic. This is rare. 4: Client connection-related failures. Q: In scenario (3) of the potential reasons for a send-failure, can I mitigate the publish/delete phenomena if I am either not deleting topics or have orchestrated deletions such that writes eliciting topic creations will never be done until a sufficient amount of time has elapsed since a deletion? A: Largely. Though, if nowhere else, this can also happen internally to NSQ at shutdown. Q: How are new topics announced to the cluster? A: The first writer or reader request for a topic will be applied on the upstream nsqd host, and will then propagate to the nsqlookupd hosts. They will eventually spread to the other readers from there. The same thing applies to a new topic, as well as a previously-deleted one.