Using etcd as a Clusterized, Immediately-Consistent Key-Value Storage

The etcd project was one of the first popular, public platforms built on the Raft algorithm (a relatively simple consensus algorithm, used to allow several nodes to remain in sync). Raft represents a shift away from its predecessor, Paxos, which is considerably more difficult to understand, and usually requires shortcuts to implement. As an added bonus, etcd is also implemented in Go.

etcd looks and smells like every other KV store, with three especially-notable differences:

  • You can maintain a heirarchy of keys.
  • You can long-poll for changes on keys.
  • Distributed-locks are built-in.

We’re going to use Python’s etcd package (project is here). This package presents a very intuitive interface that completely manages responses from the server and is built in such a way that future API changes should be backward-compatible (to within reason). These things are important, as other clients have historically allowed the application too much direct access to the actual server requests, and left too much of the interpretation of the responses to the application as well.

To connect the client (assuming the same machine with the default port):

from etcd import Client

c = Client()

To set a value:

c.node.set('/test/key', 5)

To get a value:

r = c.node.get('/test/key')
print(r.node.value)

Which outputs:

5

To wait on a value to change, run this from another terminal:

r = c.node.wait('/test/key')

Try setting the node to something else using a command similar to before. The wait call will return with the same result as the instance of the client that actually made the request.

To work with distributed locks, just wrap the code that needs to be synchronized in a with statements:

with c.module.lock.get_lock('test_lock_1', ttl=10):
    print("In lock 1.")

It’s worth mentioning that the response objects have a consistent and informative interface no matter what the operation. You can see a number properties just by printing it. This is from the set operation above:

<RESPONSE: <NODE(ResponseV2AliveNode) [set] [/test/key] IS_HID=[False] IS_DEL=[False] IS_DIR=[False] IS_COLL=[False] TTL=[None] CI=(2) MI=(2)>>

This is from the get operation:

<RESPONSE: <NODE(ResponseV2AliveNode) [get] [/test/key] IS_HID=[False] IS_DEL=[False] IS_DIR=[False] IS_COLL=[False] TTL=[None] CI=(2) MI=(2)>>

I’ll omit the examples of working with heirarchical keys because the functionality is every bit as intuitive as it should be.

There’s a lot of functionality in the Python etcd package, but it’s built to be lightweight and obvious. The GitHub page is extremely thorough, and the API is also completely documented at ReadTheDocs.

Advertisements

Using etcd as a Highly Available and Innovative Key-Value Storage

etcd was created as the primary building-block on which CoreOS is built. It uses the Raft algorithm to keep changes consistent throughout a cluster by electing a leader and distributing a log of operations (“commands”) from the leader to the other systems. Due to these features and others, etcd to be used for robust service-discovery and cluster configuration, replacing ZooKeeper. Entries are referred-to as “nodes”.

 

Distributed Locks

Every update automatically increments the “index”, which is a global, monotonically-increasing value, incremented for every operation:

c.set('/a/b/c', 5).index
# 66
c.set('/a/b/c', 5).index
# 67

The index increases for every operation, not just those with side-effects. Per the mailing list (2013-11-29), the reason for this is:

That’s a side effect of how Raft works. When new commands come in they get sent to Raft immediately which increments the index. We’re not able to check the current value of the key before insert because Raft batches commands so there may be uncommitted changes between the current state and the state at the time when the command is being committed. That’s also why changes that cause errors can increment the index even though no change was made.

etcd also gives us a “CAS” (“compare and swap”) call (“test_and_set” in the Python client). This allows us to assign a value to a key, but only when the existing value meets one or more conditions:

  1. The existing value is set to something specific (a “previous value” condition).
  2. The existing index is set to something specific (a “previous index” condition).
  3. The key either currently exists or doesn’t (a “previously exists” condition).

The existence of a monotonic, atomic counter and a CAS function happen to be the exact dependencies required to establish distributed locking. The process might be the following:

  1. Initialize a node for the specific lock (“lock node”). Use CAS with a “prevExists” of “false” and a value of “0”.
  2. Assign some value to some dummy key used for the purpose of incrementing and grabbing the index. This index will be used as a unique ID for the current thread/instance (“instance ID”).
  3. Do a CAS on the lock node with a “prevValue” of “0”, a value of the instance-ID, and a TTL of whatever maximum lock time we should allow.
    • If error, watch the lock node. Give the HTTP client a timeout. Try again after long-polling returns or timeout hits.
    • If no error, do whatever logic is required, and, to release, use a CAS to set the lock-node to “0” with a “prevValue” of the instance-ID. If this fails (ValueError), then the lock has been reowned by another instance after having timed-out.

It’s important to mention that the “test_and_set” operation in the Python client only currently supports the “prevValue” condition. With the “prevValue” condition, you’ll get a KeyError if the key doesn’t exist. If the real existing value does not match the stated existing value, you’ll get a ValueError (which is a standard consideration when using this call).

 

Additional Features

Aside from being so consistent and having easy access to the operations via REST, there are two non-traditional operations that you’ll see with etcd but not with [most] other KV solutions:

  1. Entries can be stored in a hierarchy
  2. Long-polling to wait on a change to a key or folder (“watch”)

With (2), you can monitor a key that doesn’t yet exist, or even a folder (in which case, it’ll block until any value inside the folder changes, recursively). You can use this to achieve event-driven scripts (a neat usage mentioned on the mailing list).

Lastly, before moving on to the example, the cluster should be kept small:

Every command the client sends to the master is broadcast to all of the 
followers. The command is not committed until the majority of the cluster peers 
receive that command.

Because of this majority voting property, the ideal cluster should be kept 
small to keep speed up and be made up of an odd number of peers.

(what size cluster should I use)

etcd is based on Google’s Chubby (which uses Paxos rather than Raft).

 

Quick Start

For this example, we’re going to establish and interact with etcd using three different terminals on the same system. etcd requires Go 1.1+. You’ll probably have to build it (via a “Git” clone call, and a build), as it’s not yet available via many package managers (Ubuntu, specifically).

Run etcd:

$ etcd
[etcd] Nov 28 13:02:20.849 INFO      | Wrote node configuration to 'info'
[etcd] Nov 28 13:02:20.849 INFO      | etcd server [name default-name, listen on 127.0.0.1:4001, advertised url http://127.0.0.1:4001]
[etcd] Nov 28 13:02:20.850 INFO      | raft server [name default-name, listen on 127.0.0.1:7001, advertised url http://127.0.0.1:7001]

Creating a cluster is as easy as simply launching additional instances of the daemon on new hosts. Now, install Python’s python-etcd:

sudo pip install python-etcd

Connect the client:

from etcd import Client
c = Client(host='127.0.0.1')

Set a value (notice that we have to specify a folder, even if it’s only the root):

c.set('/test_entry', 'abc')

EtcdResult(action=u'SET', index=9, key=u'/test_entry', prevValue=None, value=u'abc', expiration=None, ttl=None, newKey=True)
# Actions available on EtcdResult: action, count, expiration, index, key, newKey, prevValue, ttl, value

Get the value:

r = c.get('/test_entry')
print(r.value)
# Prints "abc"

In a second terminal, connect the client and run the following to block for a change to the given folder (it doesn’t currently exist):

r = c.watch('/test_folder')

Back in the first terminal, run:

c.set('/test_folder/test_inner_folder/deep_test', 'abc')

The command waiting in the second terminal has now returned. Examine “r”:

print(r)
EtcdResult(action=u'SET', index=15, key=u'/test_folder/test_inner_folder/deep_test', prevValue=None, value=u'abc', expiration=None, ttl=None, newKey=True)

Get a listing of children. This may or may not work on “/”, depending on your python-etcd version:

from pprint import pprint
c.set('/test_folder/entry_1', 'test_value_1')
c.set('/test_folder/entry_2', 'test_value_2')
list_ = c.get('/test_folder')
pprint(list_)
#[EtcdResult(action=u'GET', index=4, key=u'/test_folder/entry_1', prevValue=None, value=u'test_value_1', expiration=None, ttl=None, newKey=None),
# EtcdResult(action=u'GET', index=4, key=u'/test_folder/entry_2', prevValue=None, value=u'test_value_2', expiration=None, ttl=None, newKey=None)]

etcd also allows for TTLs (in seconds) on “put” operations:

from time import sleep
c.set('/disappearing_entry', 'inconsequential_value', ttl=5)
sleep(5)
c.get('/disappearing_entry')

You’ll get the following error (a proper KeyError):

Traceback (most recent call last):
  File "", line 1, in 
  File "/Library/Python/2.7/site-packages/etcd/client.py", line 284, in get
    response = self.api_execute(self.key_endpoint + key, self._MGET)
  File "/Library/Python/2.7/site-packages/etcd/client.py", line 357, in api_execute
    raise error_exception(message)
KeyError: u'Key Not Found : get: /disappearing_entry'

Miscellaneous functions:

c.machines
# ['http://127.0.0.1:4001']
c.leader
# 'http://127.0.0.1:7001'

As a final note, you don’t have to choose between cURL requests and the API. Rather, there’s also etcdctl for command-line control:

$ etcdctl set /foo/bar "Hello world"
Hello world

 

FAQ

Leaders are elected using elections. However, there’s a chance that a leader won’t be elected, and the elections will have to be reattempted. From the mailing list (2013-11-29):

Q: What would cause a leader candidate to not receive a majority of votes from nodes, during elections?
A: The common case election failure would be due to either a network partition causing less than a quorum to vote, or another candidate being elected first.

Q: Is there any decision-making involved during elections, such as the consideration of the CPU utilizations of individual machines?
A: Not at this time. It might make sense to add some sort of fitness to the leader proposal decision later.