Python: Substitute Values Into YAML During Load

There’s definitely a case to be made for automatically and efficiently applying environment variables or another set of replacements into a YAML-based config or set of instructions on load. This example uses PyYAML. We’ll use Python’s built-in string templating to replace tokens like “$NAME” with values from a dictionary. It will fail, as it should, if the name is not in the given dictionary.

import string
import yaml

def load_yaml(f, context):

    def string_constructor(loader, node):

        t = string.Template(node.value)
        value = t.substitute(context)

        return value


    l = yaml.SafeLoader
    l.add_constructor('tag:yaml.org,2002:str', string_constructor)

    token_re = string.Template.pattern
    l.add_implicit_resolver('tag:yaml.org,2002:str', token_re, None)

    x = yaml.load(f, Loader=l)
    return x

y = """\
aa: bb
cc: dd $EE ff
"""

context = {
    'EE': '123',
}

d = waw.utility.load_yaml(y, context)
print(d)

Output:

{'aa': 'bb', 'cc': 'dd 123 ff'}

Python: Command-Line Completion for argparse

argcomplete provides very useful functionality that you will basically get for free with just a couple of steps.

Implementation

Put some markup below the shebang of your frontend script in the form of a comment:

#!/usr/bin/env python
# PYTHON_ARGCOMPLETE_OK

The BASH-completion script argcomplete will basically identify and scan any script with a Python shebang that is used with BASH-completion. This entails actually running the script. In order to minimize how much time is spent loading scripts that don’t actually use argcomplete, the completion script will ignore anything that does not have this comment directly following the shebang.

Next, add and import for the argcomplete package and run argcomplete.autocomplete(parser) after you have configured your command-line parameters but before your call to parser.parse_args() (where parser is an instance of argparse.ArgumentParser). This function will produce command-line configuration metadata and then terminate.

That is it. Note that it is not practical to assume that everyone who uses your script will have argcomplete installed. They may not be using BASH (BASH is the only well-supported shell at this time), they may not be using a supported OS, and/or any commercial environments that adopt your tools may be server environments that have no use for command-line completion and refuse to support it. Therefore, you should wrap the import with a try-except for ImportError and then only call argcomplete.autocomplete if you were able to import the package.

Installation

To install autocomplete, the simplest route is to merely do a “sudo pip install argcomplete” and then call “activate-global-python-argcomplete” (this is a physical script likely installed to /usr/local/bin. This only has to be done once and will install a non-project-specific script that will work for any script that is equipped to use argcomplete. For other configuration and semantics, see the project page.

Custom String Template Format in Python

This might be necessary if you, for example, want to apply your own set of replacements to a string argument that will be passed to you by another mechanism that applies its own set of replacements.

This example supposes that you might want to use square-brackets instead of the standard curly-brackets.

import string
import re

_FIELD_RE = re.compile(r'\[([a-zA-Z0-9_]+)\]')

class CustomReplacer(string.Formatter):
    def parse(self, s):
        last_stop_index = None
        for m in _FIELD_RE.finditer(s):
            token_name = m.group(1)

            start_index, stop_index = m.span()

            if start_index == 0:
                prefix_fragment = ''
            elif last_stop_index is None:
                prefix_fragment = s[:start_index]
            else:
                prefix_fragment = s[last_stop_index:start_index]

            last_stop_index = stop_index

            yield prefix_fragment, token_name, '', None

cr = CustomReplacer()
template = 'aa [name]      bb [name2] cc dd [name3]'

replacements = {
    'name': 'howard',
    'name2': 'mark',
    'name3': 'james',
}

output = cr.format(template, **replacements)
print(output)

Output:

aa howard      bb mark cc dd james

Note that no formatting is supported with our custom replacer (though it could be added, with more work). If any formatting specifiers are provided, they will fail the regular-expression match and be ignored.

Measure Internet Speed from CLI

Use speedtest-cli:

$ curl -s https://raw.githubusercontent.com/sivel/speedtest-cli/master/speedtest.py | python -
Retrieving speedtest.net configuration...
Testing from Comcast Cable (73.1.128.16)...
Retrieving speedtest.net server list...
Selecting best server based on ping...
Hosted by Broadwave (Fort Lauderdale, FL) [43.78 km]: 22.155 ms
Testing download speed................................................................................
Download: 232.72 Mbit/s
Testing upload speed......................................................................................................
Upload: 10.07 Mbit/s

Python: Parsing XML and Retaining the Comments

By default, Python’s built-in ElementTree module strips comments as it reads them. The solution is just obscure enough to be hard to find.

import xml.etree.ElementTree as ET

class _CommentedTreeBuilder(ET.TreeBuilder):
    def comment(self, data):
        self.start('!comment', {})
        self.data(data)
        self.end('!comment')

def parse(filepath):
    ctb = _CommentedTreeBuilder()
    xp = ET.XMLParser(target=ctb)
    tree = ET.parse(filepath, parser=xp)

    root = tree.getroot()
    # ...

When enumerating the parsed nodes, the comments will have a tag-name of “!comment”.

Repo: How to Parse and Use a Manifest Directly From Python

Repo is a tool from AOSP (Android) that allows you to manage a vast hierarchy of individual Git repositories. It’s basically a small Python tool that adds some abstraction around Git commands. The manifest that controls the project tree is written in XML, can include submanifests, can assign projects into different groups (so you do not have to clone all of them every time), can include additional command primitives to do file copies and tweak how the manifests are loaded, etc. The manifest is written against a basic specification but, still, it is a lot easier to find a way to avoid doing this yourself.

You can access the built-in manifest-parsing functionality directly from the Repo tool. We can also use the version of the tool that’s embedded directly in the Repo tree.

For example, to load a manifest:

/tree/.repo/repo$ python
>>> import manifest_xml
>>> xm = manifest_xml.XmlManifest('/tree/.repo')

Obviously, you’ll be [temporarily] manipulating the sys.path to load this from your integration.

To explore, you can play with the “projects” (list of project objects) and “paths” properties (a dictionary of paths to project objects).

Number of projects:

>>> print(len(xm.projects))
878
>>> print(len(xm.paths))
878

paths is a dictionary.

A project object looks like:

>>> p = xm.projects[0]
>>> p


>>> dir(p)
['AbandonBranch', 'AddAnnotation', 'AddCopyFile', 'AddLinkFile', 'CheckoutBranch', 'CleanPublishedCache', 'CurrentBranch', 'Derived', 'DownloadPatchSet', 'Exists', 'GetBranch', 'GetBranches', 'GetCommitRevisionId', 'GetDerivedSubprojects', 'GetRegisteredSubprojects', 'GetRemote', 'GetRevisionId', 'GetUploadableBranch', 'GetUploadableBranches', 'HasChanges', 'IsDirty', 'IsRebaseInProgress', 'MatchesGroups', 'PostRepoUpgrade', 'PrintWorkTreeDiff', 'PrintWorkTreeStatus', 'PruneHeads', 'StartBranch', 'Sync_LocalHalf', 'Sync_NetworkHalf', 'UncommitedFiles', 'UploadForReview', 'UserEmail', 'UserName', 'WasPublished', '_ApplyCloneBundle', '_CheckDirReference', '_CheckForSha1', '_Checkout', '_CherryPick', '_CopyAndLinkFiles', '_ExtractArchive', '_FastForward', '_FetchArchive', '_FetchBundle', '_GetSubmodules', '_GitGetByExec', '_InitAnyMRef', '_InitGitDir', '_InitHooks', '_InitMRef', '_InitMirrorHead', '_InitRemote', '_InitWorkTree', '_IsValidBundle', '_LoadUserIdentity', '_Rebase', '_ReferenceGitDir', '_RemoteFetch', '_ResetHard', '_Revert', '_UpdateHooks', '__class__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_allrefs', '_getLogs', '_gitdir_path', '_revlist', '_userident_email', '_userident_name', 'annotations', 'bare_git', 'bare_objdir', 'bare_ref', 'clone_depth', 'config', 'copyfiles', 'dest_branch', 'enabled_repo_hooks', 'getAddedAndRemovedLogs', 'gitdir', 'groups', 'is_derived', 'linkfiles', 'manifest', 'name', 'objdir', 'old_revision', 'optimized_fetch', 'parent', 'rebase', 'relpath', 'remote', 'revisionExpr', 'revisionId', 'shareable_dirs', 'shareable_files', 'snapshots', 'subprojects', 'sync_c', 'sync_s', 'upstream', 'work_git', 'working_tree_dirs', 'working_tree_files', 'worktree']

The relative path for the project:

>>> path = p.relpath
>>> xm.paths[path]

The revision for the project:

>>> p.revisionExpr
u'master'

The remote for the project:

>>> p.GetRemote('origin').url
u'ssh://gerrit.company.com:2537/android/platform/external/lzma'

You can also get a config object representing the Git config for the bare archive of the project:

>>> p.config


>>> dir(p.config)
['ForRepository', 'ForUser', 'GetBoolean', 'GetBranch', 'GetRemote', 'GetString', 'GetSubSections', 'Global', 'Has', 'HasSection', 'SetString', 'UrlInsteadOf', '_ForUser', '_Global', '_Read', '_ReadGit', '_ReadJson', '_SaveJson', '__class__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_branches', '_cache', '_cache_dict', '_do', '_json', '_remotes', '_section_dict', '_sections', 'defaults', 'file']

>>> p.config.file
u'/tree/.repo/projects/external/lzma.git/config'

An example of how to efficiently establish a tree of projects to paths:

_MAPPING_CACHE = {}

def get_repo_project_to_path_mapping(path):
    try:
        return _MAPPING_CACHE[path]
    except KeyError:
        pass

    repo_meta_path = os.path.join(path, '.repo')
    repo_tool_path = os.path.join(repo_meta_path, 'repo')

    if repo_tool_path not in sys.path:
        sys.path.insert(0, repo_tool_path)

    import manifest_xml

    xm = manifest_xml.XmlManifest(repo_meta_path)
    project_to_path_mapping = {}
    for path, p in xm.paths.items():
        project_to_path_mapping[str(p.name)] = str(path)

    _MAPPING_CACHE[path] = project_to_path_mapping
    return project_to_path_mapping

Cog: Evaluate Arbitrary Python Fragments in a Template Document

You provide any text document with embedded fragments of Python, and Cog renders it:

using namespace std;

#include <iostream>

int main() {
    cout << "Hello Earth." << endl;


/*[[[cog

print("""\
cout << "Hello Mars." << endl;
printf("Goodbye Pluto.\\n");
""")

]]]*/

//[[[end]]]


  return 0;
}

Notice that newlines have to be escaped.

Translate:

$ cog.py hello_world.cpp.cog > hello_world.cpp

Output:

using namespace std;

#include <iostream>

int main() {
    cout << "Hello Earth." << endl;


/*[[[cog

print("""\
cout << "Hello Mars." << endl;
printf("Goodbye Pluto.\\n");
""")

]]]*/
cout << "Hello Mars." << endl;
printf("Goodbye Pluto.\n");

//[[[end]]]


  return 0;
}

General build and run:

$ g++ -o hello_world hello_world.cpp
$ ./hello_world
Hello Earth.
Hello Mars.
Goodbye Pluto.

Using XML-RPC with Magento

Sure, we could use a cushy SOAP library to communicate with Magento, but maybe you’d want to capitalize on the cacheability of XML-RPC, instead. Sure, we could use an XML-RPC library, but that would be less fun and, as engineers, we like knowing how stuff works. Magento is not for the faint of heart and knowing how to communicate with it at the low-level might be useful at some point.

import os
import json

import xml.etree.ElementTree as ET

import requests

# http://xmlrpc.scripting.com/spec.html

_HOSTNAME = os.environ['MAGENTO_HOSTNAME']
_USERNAME = os.environ['MAGENTO_USERNAME']
_PASSWORD = os.environ['MAGENTO_PASSWORD']

_URL = "http://" + _HOSTNAME + "/api/xmlrpc"

_HEADERS = {
    'Content-Type': 'text/xml',
}

def _pretty_print(results):
    print(json.dumps(
            results, 
            sort_keys=True,
            indent=4, 
            separators=(',', ': ')))

def _send_request(payload):
    r = requests.post(_URL, data=payload, headers=_HEADERS)
    r.raise_for_status()

    root = ET.fromstring(r.text)
    return root

def _send_array(session_id, method_name, args):

    data_parts = []
    for (type_name, value) in args:
        data_parts.append('<value><' + type_name + '>' + str(value) + '</' + type_name + '></value>')

    payload = """\
<?xml version='1.0'?>
<methodCall>
    <methodName>call</methodName>
    <params>
        <param>
            <value><string>""" + session_id + """\
</string></value>
        </param>
        <param>
            <value><string>""" + method_name + """\
</string></value>
        </param>
        <param>
            <value>
                <array>
                    <data>
                        """ + ''.join(data_parts) + """
                    </data>
                </array>
            </value>
        </param>
    </params>
</methodCall>
"""

    return _send_request(payload)

def _send_struct(session_id, method_name, args):
    struct_parts = []

    for (type_name, argument_name, argument_value) in args:
        struct_parts.append("<member><name>" + argument_name + "</name><value><" + type_name + ">" + str(argument_value) + "</" + type_name + "></value></member>")

    payload = """\
<?xml version='1.0'?>
<methodCall>
    <methodName>call</methodName>
    <params>
        <param>
            <value><string>""" + session_id + """\
</string></value>
        </param>
        <param>
            <value><string>""" + method_name + """\
</string></value>
        </param>
        <param>
            <value>
                <struct>
                    """ + ''.join(struct_parts) + """
                </struct>
            </value>
        </param>
    </params>
</methodCall>
"""

    return _send_request(payload)

def _send_login(args):
    param_parts = []
    for (type_name, value) in args:
        param_parts.append('<param><value><' + type_name + '>' + value + '</' + type_name + '></value></param>')

    payload = """\
<?xml version="1.0"?>
<methodCall>
    <methodName>login</methodName>
    <params>""" + ''.join(param_parts) + """\
</params>
</methodCall>
"""

    return _send_request(payload)


class XmlRpcFaultError(Exception):
    pass

def _distill(value_node):
    type_node = value_node[0]
    type_name = type_node.tag

    if type_name == 'nil':
        return None
    elif type_name in ('int', 'i4'):
        return int(type_node.text)
    elif type_name == 'boolean':
        return bool(type_node.text)
    elif type_name == 'double':
        return float(type_node.text)
    elif type_name == 'struct':
        values = {}
        for member_node in type_node:
            key = member_node.find('name').text

            value_node = member_node.find('value')
            value = _distill(value_node)

            values[key] = value

        return values
    elif type_name == 'array':
        flat = []
        for i, child_value_node in enumerate(type_node.findall('data/value')):
            flat.append(_distill(child_value_node))

        return flat
    elif type_name in ('string', 'dateTime.iso8601', 'base64'):
        return type_node.text
    else:
        raise ValueError("Invalid type: [{0}] [{1}]".format(type_name, type_node))

def _parse_response(root):
    if root.find('fault') is not None:
        for e in root.findall('fault/value/struct/member'):
            if e.find('name').text == 'faultString':
                message = e.find('value/string').text
                raise XmlRpcFaultError(message)

        raise ValueError("Malformed fault response")

    value_node = root.find('params/param/value')
    result = _distill(value_node)

    return result

def _main():
    args = [
        ('string', _USERNAME),
        ('string', _PASSWORD),
    ]

    root = _send_login(args)
    session_id = _parse_response(root)

    resource_name = 'catalog_product.info'

    args = [
        ('int', 'productId', '314'),
    ]

    root = _send_struct(session_id, resource_name, args)
    result = _parse_response(root)
    _pretty_print(result)

if __name__ == '__main__':
    _main()

Output:

{
    "apparel_type": "33",
    "categories": [
        "13"
    ],
    "category_ids": [
        "13"
    ],
    "color": "27",
    "country_of_manufacture": null,
    "created_at": "2013-03-05T00:48:15-05:00",
    "custom_design": null,
    "custom_design_from": null,
    "custom_design_to": null,
    "custom_layout_update": null,
    "description": "Two sash, convertible neckline with front ruffle detail. Unhemmed, visisble seams. Hidden side zipper. Unlined. Wool/elastane. Hand wash.",
    "fit": null,
    "gender": "94",
    "gift_message_available": null,
    "gift_wrapping_available": null,
    "gift_wrapping_price": null,
    "group_price": [],
    "has_options": "0",
    "image_label": null,
    "is_recurring": "0",
    "length": "82",
    "meta_description": null,
    "meta_keyword": null,
    "meta_title": null,
    "minimal_price": null,
    "msrp": null,
    "msrp_display_actual_price_type": "4",
    "msrp_enabled": "2",
    "name": "Convertible Dress",
    "news_from_date": "2013-03-01 00:00:00",
    "news_to_date": null,
    "occasion": "29",
    "old_id": null,
    "options_container": "container1",
    "page_layout": "one_column",
    "price": "340.0000",
    "product_id": "314",
    "recurring_profile": null,
    "required_options": "0",
    "set": "13",
    "short_description": "This all day dress has a flattering silhouette and a convertible neckline to suit your mood. Wear tied and tucked in a sailor knot, or reverse it for a high tied feminine bow.",
    "size": "72",
    "sku": "wsd017",
    "sleeve_length": "45",
    "small_image_label": null,
    "special_from_date": null,
    "special_price": null,
    "special_to_date": null,
    "status": "1",
    "tax_class_id": "2",
    "thumbnail_label": null,
    "tier_price": [],
    "type": "simple",
    "type_id": "simple",
    "updated_at": "2014-03-08 08:31:20",
    "url_key": "convertible-dress",
    "url_path": "convertible-dress-418.html",
    "visibility": "1",
    "websites": [
        "1"
    ],
    "weight": "1.0000"
}

You may download the code here.

Converting Infix Expressions to Postfix in Python

A simplified Python algorithm for converting infix expressions to postfix expressions using Dijkstra’s “shunting-yard” algorithm. We omit support for functions and their arguments but support parenthesis as expected. For the purpose of this example, we support simple mathematical expressions.

OP_LPAREN = '('
OP_RPAREN = ')'
OP_MULTIPLY = '*'
OP_DIVIDE = '/'
OP_ADD = '+'
OP_SUBTRACT = '-'

OPERATORS_S = set([
    OP_MULTIPLY, 
    OP_DIVIDE, 
    OP_ADD, 
    OP_SUBTRACT, 
    OP_LPAREN, 
    OP_RPAREN,
])

PRECEDENCE = {
    OP_MULTIPLY: 7,
    OP_DIVIDE: 7,
    OP_ADD: 5,
    OP_SUBTRACT: 5,
    OP_LPAREN: 1,
    OP_RPAREN: 1,
}

LEFT_ASSOCIATIVE_S = set([
    OP_MULTIPLY,
    OP_DIVIDE,
    OP_ADD, 
    OP_SUBTRACT, 
    OP_LPAREN, 
    OP_RPAREN,
])

def _convert(expression_phrase):
    expression_phrase = expression_phrase.replace(' ', '')

    stack = []
    output = []
    for c in expression_phrase:
        if c not in OPERATORS_S:
            # It's an operand.
            output += [c]
        elif c not in (OP_LPAREN, OP_RPAREN):
            # It's an operator. Pop-and-add all recent operators that win over 
            # the current operator via precendence/associativity.

            current_prec = PRECEDENCE[c]
            is_left_assoc = c in LEFT_ASSOCIATIVE_S
            while len(stack):
                top_value = stack[-1]
                top_prec = PRECEDENCE[top_value]

                if is_left_assoc is True and current_prec <= top_prec or \
                   is_left_assoc is False and current_prec < top_prec:
                    stack.pop()
                    output += [top_value]
                else:
                    break

            stack.append(c)

        elif c == OP_LPAREN:
            # It's a left paren.

            stack.append(c)
        else: #if c == OP_RPAREN:
            # It's a right paren. Pop-and-add everything since the last left 
            # paren.

            found = False
            while len(stack):
                top_value = stack.pop()
                if top_value == OP_LPAREN:
                    found = True
                    break

                output += [top_value]

            if found is False:
                raise ValueError("Mismatched parenthesis (1).")

    if stack and stack[-1] in (OP_LPAREN, OP_RPAREN):
        raise ValueError("Mismatched parenthesis (2).")

    # Flush everything left on the stack.
    while stack:
        c = stack.pop()
        output += [c]

    return ' '.join(output)

def _main():
    infix_phrase = 'a * (b * c) / d * (e + f + g) * h - i'
    print(infix_phrase)

    postfix_phrase = _convert(infix_phrase)
    print(postfix_phrase)

if __name__ == '__main__':
    _main()

Output:

a * (b * c) / d * (e + f + g) * h - i
a b c * * d / e f + g + * h * i -

You may download the code here.