Subversion from Python

Generally, it’s preferable to bind to libraries rather than executables when given the option. In my case, I needed SVN access from Python and couldn’t, at that time, find a confidence-inspiring library to work with. So, I wrote svn.

It turns out that there is a Subversion-sponsored Python project. It looks to be SWIG-based.

This comes from the python-svn Apt package under Ubuntu.

The Programmer’s Guide has the following examples, among others:

cat:

import pysvn
client = pysvn.Client()
file_content = client.cat('file.txt')

ls:

import pysvn
client = pysvn.Client()
entry_list = client.ls('.')

info:

import pysvn
client = pysvn.Client()
entry = client.info('.')

Using inotify to watch for directory changes from Python

An inotify project is now available on PyPI. More documentation is available at the project homepage: PyInotify

Though the inotify functionality is uncomplicated to implement in C, it’s stupidly simple to implement in Python using this library.

To install:

$ sudo pip install inotify

This is the principal logic of the example provided in the project documentation:

i = inotify.adapters.Inotify()

i.add_watch('/tmp')

for event in i.event_gen():
    if event is not None:
        (header, type_names, watch_path, filename) = event

        _LOGGER.info("WD=(%d) MASK=(%d) COOKIE=(%d) LEN=(%d) MASK->NAMES=%s "
                     "WATCH-PATH=[%s] FILENAME=[%s]", 
                     header.wd, header.mask, header.cookie, header.len, type_names, 
                     watch_path, filename)

We ran the following operations on /tmp:

$ touch /tmp/aa
$ rm /tmp/aa
$ mkdir /tmp/dir1
$ rmdir /tmp/dir1

This was the corresponding output of the inotify process:

2015-04-24 05:02:06,667 - __main__ - INFO - WD=(1) MASK=(256) COOKIE=(0) LEN=(16) MASK->NAMES=['IN_CREATE'] FILENAME=[aa]
2015-04-24 05:02:06,667 - __main__ - INFO - WD=(1) MASK=(32) COOKIE=(0) LEN=(16) MASK->NAMES=['IN_OPEN'] FILENAME=[aa]
2015-04-24 05:02:06,667 - __main__ - INFO - WD=(1) MASK=(4) COOKIE=(0) LEN=(16) MASK->NAMES=['IN_ATTRIB'] FILENAME=[aa]
2015-04-24 05:02:06,667 - __main__ - INFO - WD=(1) MASK=(8) COOKIE=(0) LEN=(16) MASK->NAMES=['IN_CLOSE_WRITE'] FILENAME=[aa]
2015-04-24 05:02:17,412 - __main__ - INFO - WD=(1) MASK=(512) COOKIE=(0) LEN=(16) MASK->NAMES=['IN_DELETE'] FILENAME=[aa]
2015-04-24 05:02:22,884 - __main__ - INFO - WD=(1) MASK=(1073742080) COOKIE=(0) LEN=(16) MASK->NAMES=['IN_ISDIR', 'IN_CREATE'] FILENAME=[dir1]
2015-04-24 05:02:25,948 - __main__ - INFO - WD=(1) MASK=(1073742336) COOKIE=(0) LEN=(16) MASK->NAMES=['IN_ISDIR', 'IN_DELETE'] FILENAME=[dir1]

Lastly, this library also provides the ability to recursively watch a given directory. Just use the inotify.adapters.InotifyTree class instead of inotify.adapters.Inotify, and pass a path.

World’s Simplest Python epoll Example For Waiting on File/Socket Readiness

Once upon a time, the only way to wait to read or write on one or more sockets/descriptors in Linux was the select method, which was later superseded by poll, and then epoll. epoll is the most current and popular way to accomplish this, now. Note that this is only available for Linux, and not for Mac (though select and poll appear to be).

In Python, you can invoke this functionality in the built-in select package. You can use it on any standard system file-descriptor, whether it’s socket-oriented, inotify-related, etc.

import logging
import sys
import socket
import select

_MAX_CONNECTION_BACKLOG = 1
_PORT = 9999
_BINDING = ('0.0.0.0', _PORT)
_EPOLL_BLOCK_DURATION_S = 1

_DEFAULT_LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'

_LOGGER = logging.getLogger(__name__)

_CONNECTIONS = {}

_EVENT_LOOKUP = {
    select.POLLIN: 'POLLIN',
    select.POLLPRI: 'POLLPRI',
    select.POLLOUT: 'POLLOUT',
    select.POLLERR: 'POLLERR',
    select.POLLHUP: 'POLLHUP',
    select.POLLNVAL: 'POLLNVAL',
}

def _configure_logging():
    _LOGGER.setLevel(logging.DEBUG)

    ch = logging.StreamHandler()

    formatter = logging.Formatter(_DEFAULT_LOG_FORMAT)
    ch.setFormatter(formatter)

    _LOGGER.addHandler(ch)

def _get_flag_names(flags):
    names = []
    for bit, name in _EVENT_LOOKUP.items():
        if flags & bit:
            names.append(name)
            flags -= bit

            if flags == 0:
                break

    assert flags == 0, 
           "We couldn't account for all flags: (%d)" % (flags,)

    return names

def _handle_inotify_event(epoll, server, fd, event_type):
    # Common, but we're not interested.
    if (event_type & select.POLLOUT) == 0:
        flag_list = _get_flag_names(event_type)
        _LOGGER.debug("Received (%d): %s", 
                      fd, flag_list)

    # Activity on the master socket means a new connection.
    if fd == server.fileno():
        _LOGGER.debug("Received connection: (%d)", event_type)

        c, address = server.accept()
        c.setblocking(0)

        child_fd = c.fileno()

        # Start watching the new connection.
        epoll.register(child_fd)

        _CONNECTIONS[child_fd] = c
    else:
        c = _CONNECTIONS[fd]

        # Child connection can read.
        if event_type & select.EPOLLIN:
            b = c.recv(1024)
            sys.stdout.write(b)

def _create_server_socket():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind(_BINDING)
    s.listen(_MAX_CONNECTION_BACKLOG)
    s.setblocking(0)

    return s

def _run_server():
    s = _create_server_socket()

    e = select.epoll()

    # If not provided, event-mask defaults to (POLLIN | POLLPRI | POLLOUT). It 
    # can be modified later with modify().
    e.register(s.fileno())

    try:
        while True:
            events = e.poll(_EPOLL_BLOCK_DURATION_S)
            for fd, event_type in events:
                _handle_inotify_event(e, s, fd, event_type)
    finally:
        e.unregister(s.fileno())
        e.close()
        s.close()

if __name__ == '__main__':
    _configure_logging()
    _run_server()

Now, just connect via telnet to port 9999 on localhost. Submitted text in the client will be printed to the screen on the server:

$ python epoll.py 
2015-04-23 08:34:35,104 - __main__ - DEBUG - Received (3): ['POLLIN']
2015-04-23 08:34:35,104 - __main__ - DEBUG - Received connection: (1)
hello

Brew and PyEnv

PyEnv is a solution, like virtualenv, that helps you maintain parallel environments. PyEnv, however, allows you to maintain parallel versions Python. It will also expose the same versions of the Python tools, like pip. As a bonus, all of your pip packages will be installed locally to your user (no more sudo, at all).

Recently, in order to control and debug a series of sudden environmental problems, I upgraded to Yosemite. Unfortunately, Python 2.7.8 came with it.

I manage a number of components that depend on gevent (for the awesome coroutine functionality), and gevent is not Python3 compatible. Unfortunately, gevent is broken in 2.7.8 (the TypeError: __init__() got an unexpected keyword argument 'server_hostname' error: https://github.com/asciimoo/searx/issues/120), and there are no strong bug-fixes. You can fix this by hacking-in a no-op parameter to the module on your system, but I’d rather go back to 2.7.6 for all of my local projects, by default, and be running the same thing as the servers.

PyEnv worked great for this:

  1. Install PyEnv:
$ brew install pyenv
  1. Add to your user’s environment script:
$ eval "$(pyenv init -)"
  1. Run the command in (2) directly, or start a new shell.
  2. Download and build 2.7.6 . We installed zlib via Brew, but we had to set the CFLAGS variable to prevent the The Python zlib extension was not compiled. Missing the zlib? message:
$ CFLAGS="-I$(xcrun --show-sdk-path)/usr/include" pyenv install 2.7.6
  1. Elect this as the default, system version:
$ pyenv global 2.7.6
  1. Update the current user’s PyEnv configuration to point to the new Python executables:
$ pyenv rehash

Retrieving Multiple Result-Sets from SQLAlchemy

SQLAlchemy is a great Python-based database client, but, traditionally, it leaves you stuck when it comes to stored-procedures that return more than one dataset. This means that you’d have to either call separate queries or merge multiple datasets into one large, unnatural one. However, there is a way to read multiple datasets but it requires accessing the raw MySQL layer (which isn’t too bad).

This is the test-routine:

delimiter //

CREATE PROCEDURE `get_sets`()
BEGIN
    SELECT
        'value1' `series1_col1`,
        'value2' `series1_col2`;

    SELECT
        'value3' `series2_col1`,
        'value4' `series2_col2`;

    SELECT
        'value5' `series3_col1`,
        'value6' `series3_col2`;
END//

delimiter ;

The code:

import json

import sqlalchemy.pool

def _run_query(connection, query, parameters={}):
    sets = []

    try:
        cursor = connection.cursor()

        cursor.execute(query, parameters)

        while 1:
            #(column_name, type_, ignore_, ignore_, ignore_, null_ok, column_flags)
            names = [c[0] for c in cursor.description]

            set_ = []
            while 1:
                row_raw = cursor.fetchone()
                if row_raw is None:
                    break

                row = dict(zip(names, row_raw))
                set_.append(row)

            sets.append(list(set_))

            if cursor.nextset() is None:
                break

            # nextset() doesn't seem to be sufficiant to tell the end.
            if cursor.description is None:
                break
    finally:
        # Return the connection to the pool (won't actually close).
        connection.close()

    return sets

def _pretty_json_dumps(data):
    return json.dumps(
            data,
            sort_keys=True,
            indent=4, 
            separators=(',', ': ')) + "\n"

def _main():
    dsn = 'mysql+mysqldb://root:root@localhost:3306/test_database'

    engine = sqlalchemy.create_engine(
                dsn, 
                pool_recycle=7200,
                poolclass=sqlalchemy.pool.NullPool)

    # Grab a raw connection from the connection-pool.
    connection = engine.raw_connection()

    query = 'CALL get_sets()'
    sets = _run_query(connection, query)

    print(_pretty_json_dumps(sets))

if __name__ == '__main__':
    _main()

The output:

[
    [
        {
            "series1_col1": "value1",
            "series1_col2": "value2"
        }
    ],
    [
        {
            "series2_col1": "value3",
            "series2_col2": "value4"
        }
    ],
    [
        {
            "series3_col1": "value5",
            "series3_col2": "value6"
        }
    ]
]

Things to observe in the example:

  • The query parameters are still escaped (our parameters have spaces in them), even though we have to use classic Python string-substitution formatting with the raw connection-objects.
  • It’s up to us to extract the column-names from the cursor for each dataset.
  • The resulting datasets can’t be captured as generators, as they have to be read entirely before jumping to the next dataset. Technically, you can yield each dataset, but this has almost no usefulness since you’d rarely be required need to read through them sequentially and you’d only benefit if there were a large number of datasets.
  • The raw_connection() method claims a connection from the pool, and its close() method will return it to the pool without actually closing it.
  • I added pool_recycle for good measure. This is an enormous pain to have to deal with, if you’re new to SA and your connections keep “going away” because MySQL is closing them before SA can recycle them.

REFERENCE: Multiple Result Sets

Programmatically-Driven Websites in Python (with HTTPHandler and SO_LINGER)

We’re going to write a website whose requests are handled by subroutines, and use Python’s logging.handlers.HTTPHandler class to send requests to it. Documentation and/or examples for the former are sparse, and I thought that an example of the latter connecting to the former would be useful.

Understanding the Webserver

Using the built-in BaseHTTPServer.BaseHTTPRequestHandler webserver, you can wire methods for individual verbs (GET, POST, PUT, etc..). Requests on verbs that aren’t handled will return a 501. Aside from having to write the headers at the top of the methods yourself and needing to read a specific quantity of data-bytes (or you’ll block forever), this is similar to every other web-framework that you’ve used.

The only things that you really need to know are the following instance variables:

  • headers: A dictionary-like collection of headers.
  • rfile: A file-like object that will contain your data (if you receive any).
  • wfile: A file-like object that will receive your response data (if you send any).

You’ll also need to deal with how to handle unsent data when you terminate. Even if you shutdown a socket, it may not be closed by the system immediately if data has already moved across it. This relates to why we inherit from SocketServer.TCPServer and change the one class variable. We’ll discuss this more, below.

import pprint
import urlparse

import BaseHTTPServer
import SocketServer

_PORT = 8000


class TCPServerReusableSocket(SocketServer.TCPServer):
    allow_reuse_address = True


class HookedHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
    def __send_headers(self):
        self.send_response(200)
        self.send_header("Content-type", 'text/plain')
        self.end_headers()

    def do_GET(self):
        self.__send_headers()

        print("Received GET request for: %s" % (self.path,))

        self.wfile.write("Test from GET!\n")

    def do_POST(self):
        self.__send_headers()

        print("Received POST request for: %s" % (self.path,))

        print('')
        print('Headers')
        print('=======')
        pprint.pprint(self.headers.items())
        print('=======')

        length = int(self.headers['content-length'])
        data_raw = self.rfile.read(length)
        data = urlparse.parse_qs(data_raw)

        print('')
        print('Received')
        print('========')
        pprint.pprint(data)
        print('========')
        print('')

        self.wfile.write("Test from POST!\n")

httpd = TCPServerReusableSocket(
            ('localhost', _PORT), 
            HookedHTTPRequestHandler)

httpd.serve_forever()

We expect that what we’ve done above is fairly obvious and does not need an explanation. You can implement your own log_request(code=None, size=None) method in HookedHTTPRequestHandler to change how the requests are printed, or to remove them.

To continue our remarks about buffered-data above, we add special handling so that we don’t encounter the “socket.error: [Errno 48] Address already in use” error if you kill the server and restart it a moment later. You may choose one of the following two strategies:

  1. Force the socket to close immediately.
  2. Allow the socket to already be open.

(1) should be fine for logging/etc. However, this might not be a great option if you’re handling actual data. (2) should probably be the preferred strategy, but you’ll also have to be sure to implement a PID file in your application so that you can be sure that only one instance is running (assuming that’s desired).

To implement (2), use SocketServer.TCPServer instead of our custom TCPServerReusableSocket. and, add the following imports:

import socket
import struct

Then, add the following after we define httpd but before we start the server, to tell the SO_LINGER socket option to kill all buffered data immediately:

l_onoff = 1                                                                                                                                                           
l_linger = 0                                                                                                                                                          

httpd.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', l_onoff, l_linger))

You can test this using cURL, if you can’t wait to setup HTTPHandler:

$ curl -X POST -d abc=def http://localhost:8000
Test from POST!

The webserver process will show:

$ python http_server.py 
127.0.0.1 - - [19/Oct/2014 15:28:47] "POST / HTTP/1.1" 200 -
Received POST request for: /

Headers
=======
[('host', 'localhost:8000'),
 ('content-type', 'application/x-www-form-urlencoded'),
 ('content-length', '7'),
 ('accept', '*/*'),
 ('user-agent', 'curl/7.30.0')]
=======

Received
========
{'abc': ['def']}
========

Understanding logging.handlers.HTTPHandler

My own use-case for this was from a new MapReduce platform (JobX), and I wanted to potentially emit messages to another system if certain tasks were accomplished. I used the built-in webserver that we invoked, above, to see these messages from the development system.

import logging
import logging.handlers

logger = logging.getLogger(__name__)

_TARGET = 'localhost:8000'
_PATH = '/'
_VERB = 'post'

sh = logging.handlers.HTTPHandler(_TARGET, _PATH, method=_VERB)

logger.addHandler(sh)
logger.setLevel(logging.DEBUG)

logger.debug("Test message.")

This will be shown by the webserver:

127.0.0.1 - - [19/Oct/2014 15:45:02] "POST / HTTP/1.0" 200 -
Received POST request for: /

Headers
=======
[('host', 'localhost'),
 ('content-type', 'application/x-www-form-urlencoded'),
 ('content-length', '368')]
=======

Received
========
{'args': ['()'],
 'created': ['1413747902.18'],
 'exc_info': ['None'],
 'exc_text': ['None'],
 'filename': ['push_socket_log.py'],
 'funcName': ['<module>'],
 'levelname': ['DEBUG'],
 'levelno': ['10'],
 'lineno': ['17'],
 'module': ['push_socket_log'],
 'msecs': ['181.387901306'],
 'msg': ['Test message.'],
 'name': ['__main__'],
 'pathname': ['./push_socket_log.py'],
 'process': ['65486'],
 'processName': ['MainProcess'],
 'relativeCreated': ['12.6709938049'],
 'thread': ['140735262810896'],
 'threadName': ['MainThread']}
========

Note that each field is a list with one item. If you want the output to look a little nicer, alter the above to add the following to the top of the module:

import datetime

_FMT_DATETIME_STD = '%Y-%m-%d %H:%M:%S'

Then, add the __print_entry method:

    def __print_entry(self, entry):
        created_epoch = float(entry['created'][0])
        when_dt = datetime.datetime.fromtimestamp(created_epoch)
        timestamp_phrase = when_dt.strftime(_FMT_DATETIME_STD)
        where_name = entry['name'][0][:40]
        level_name = entry['levelname'][0]

        message = entry['msg'][0]

        print('%s  %40s  %9s  %s' % 
              (timestamp_phrase, where_name, level_name, message))

Then, change the last part of do_POST:

    def do_POST(self):
        self.__send_headers()

        length = int(self.headers['content-length'])
        data_raw = self.rfile.read(length)
        data = urlparse.parse_qs(data_raw)

        self.__print_entry(data)

The output will now look like:

2014-10-19 16:16:00       MR_HANDLER.HTTP.map_obfuscation_one       INFO  Socket message!
2014-10-19 16:16:00                           MR_HANDLER.HTTP      ERROR  Mapper invocation [789b7ca7fcb6cede9ae5557b2121d392469dfc26] under request [85394d5bdb34a09ffa045776cc69d1d4cd17d657] failed. HANDLER=[map_obfuscation_one]

There is one weird thing about HTTPHandler, and it’s this: Many/all of the fields will be stringified in order to serialized them. If you call the logger like logging.debug('Received arguments: [%s] [%s]', arg1, arg2), then we’ll receive Received argument: [%s] in the msg field (or rather the msg list), and the arguments as a stringified tuple like (u'abc', u'def'). To avoid dealing with this, I’ll send messages into a function that’s in charge of the notifications, and produce the final string before I send it to the logger.

The same thing applies to tracebacks. If you log an exception, you’ll only get this:

 'exc_info': ['(<type 'exceptions.NameError'>, NameError("global name 'client_id' is not defined",), <traceback object at 0x110c92878>)'],
 'exc_text': ['None'],

Again, you’ll have to concatenate this into the log-message by some intermediate function (so that the primary application logic doesn’t have to know about it, but so that you’ll still get this information).

Dynamically Compiling and Implementing a Function

Python allows you to dynamically compile things at the module-level. That’s why the compile() builtin keyword accepts sourcecode and dictionaries of locals and globals, and doesn’t provide a direct way to call a fragment of dynamic (read: textual) sourcecode with arguments (“xyz(arg1, arg2)”). You also can’t directly invoke compiled-code as a generator (where a function that uses “yield” is interpreted as a generator rather than just a function).

However, there’s a loophole, and it’s very elegant and consistent to Python. You simply have to wrap your code in a function definition, and then pull the function from the local scope. You can then call it as desired:

import hashlib
import random

def _compile(arg_names, code):
    name = "(lambda compile)"
    # Needs to start with a letter.
    id_ = 'a' + str(random.random())
    code = "def " + id_ + "(" + ', '.join(arg_names) + "):n" + 
           'n'.join(('  ' + line) for line in code.replace('r', '').split('n')) + 'n'

    c = compile(code, name, 'exec')
    locals_ = {}
    exec(c, globals(), locals_)
    return locals_[id_]

code = """
return a * b * c
"""

c = _compile(['a', 'b', 'c'], code)
print(c(1, 2, 3))

Serialize a Generator in Python

Simply inherit from the list class, and override the __iter__ method. A great example, from SO:

import json

def gen():
    yield 20
    yield 30
    yield 40

class StreamArray(list):
    def __iter__(self):
        return gen()

a = [1,2,3]
b = StreamArray()

print(json.dumps([1,a,b]))