Cog: Evaluate Arbitrary Python Fragments in a Template Document

You provide any text document with embedded fragments of Python, and Cog renders it:

using namespace std;

#include <iostream>

int main() {
    cout << "Hello Earth." << endl;


/*[[[cog

print("""\
cout << "Hello Mars." << endl;
printf("Goodbye Pluto.\\n");
""")

]]]*/

//[[[end]]]


  return 0;
}

Notice that newlines have to be escaped.

Translate:

$ cog.py hello_world.cpp.cog > hello_world.cpp

Output:

using namespace std;

#include <iostream>

int main() {
    cout << "Hello Earth." << endl;


/*[[[cog

print("""\
cout << "Hello Mars." << endl;
printf("Goodbye Pluto.\\n");
""")

]]]*/
cout << "Hello Mars." << endl;
printf("Goodbye Pluto.\n");

//[[[end]]]


  return 0;
}

General build and run:

$ g++ -o hello_world hello_world.cpp
$ ./hello_world
Hello Earth.
Hello Mars.
Goodbye Pluto.
Advertisements

Using XML-RPC with Magento

Sure, we could use a cushy SOAP library to communicate with Magento, but maybe you’d want to capitalize on the cacheability of XML-RPC, instead. Sure, we could use an XML-RPC library, but that would be less fun and, as engineers, we like knowing how stuff works. Magento is not for the faint of heart and knowing how to communicate with it at the low-level might be useful at some point.

import os
import json

import xml.etree.ElementTree as ET

import requests

# http://xmlrpc.scripting.com/spec.html

_HOSTNAME = os.environ['MAGENTO_HOSTNAME']
_USERNAME = os.environ['MAGENTO_USERNAME']
_PASSWORD = os.environ['MAGENTO_PASSWORD']

_URL = "http://" + _HOSTNAME + "/api/xmlrpc"

_HEADERS = {
    'Content-Type': 'text/xml',
}

def _pretty_print(results):
    print(json.dumps(
            results, 
            sort_keys=True,
            indent=4, 
            separators=(',', ': ')))

def _send_request(payload):
    r = requests.post(_URL, data=payload, headers=_HEADERS)
    r.raise_for_status()

    root = ET.fromstring(r.text)
    return root

def _send_array(session_id, method_name, args):

    data_parts = []
    for (type_name, value) in args:
        data_parts.append('<value><' + type_name + '>' + str(value) + '</' + type_name + '></value>')

    payload = """\
<?xml version='1.0'?>
<methodCall>
    <methodName>call</methodName>
    <params>
        <param>
            <value><string>""" + session_id + """\
</string></value>
        </param>
        <param>
            <value><string>""" + method_name + """\
</string></value>
        </param>
        <param>
            <value>
                <array>
                    <data>
                        """ + ''.join(data_parts) + """
                    </data>
                </array>
            </value>
        </param>
    </params>
</methodCall>
"""

    return _send_request(payload)

def _send_struct(session_id, method_name, args):
    struct_parts = []

    for (type_name, argument_name, argument_value) in args:
        struct_parts.append("<member><name>" + argument_name + "</name><value><" + type_name + ">" + str(argument_value) + "</" + type_name + "></value></member>")

    payload = """\
<?xml version='1.0'?>
<methodCall>
    <methodName>call</methodName>
    <params>
        <param>
            <value><string>""" + session_id + """\
</string></value>
        </param>
        <param>
            <value><string>""" + method_name + """\
</string></value>
        </param>
        <param>
            <value>
                <struct>
                    """ + ''.join(struct_parts) + """
                </struct>
            </value>
        </param>
    </params>
</methodCall>
"""

    return _send_request(payload)

def _send_login(args):
    param_parts = []
    for (type_name, value) in args:
        param_parts.append('<param><value><' + type_name + '>' + value + '</' + type_name + '></value></param>')

    payload = """\
<?xml version="1.0"?>
<methodCall>
    <methodName>login</methodName>
    <params>""" + ''.join(param_parts) + """\
</params>
</methodCall>
"""

    return _send_request(payload)


class XmlRpcFaultError(Exception):
    pass

def _distill(value_node):
    type_node = value_node[0]
    type_name = type_node.tag

    if type_name == 'nil':
        return None
    elif type_name in ('int', 'i4'):
        return int(type_node.text)
    elif type_name == 'boolean':
        return bool(type_node.text)
    elif type_name == 'double':
        return float(type_node.text)
    elif type_name == 'struct':
        values = {}
        for member_node in type_node:
            key = member_node.find('name').text

            value_node = member_node.find('value')
            value = _distill(value_node)

            values[key] = value

        return values
    elif type_name == 'array':
        flat = []
        for i, child_value_node in enumerate(type_node.findall('data/value')):
            flat.append(_distill(child_value_node))

        return flat
    elif type_name in ('string', 'dateTime.iso8601', 'base64'):
        return type_node.text
    else:
        raise ValueError("Invalid type: [{0}] [{1}]".format(type_name, type_node))

def _parse_response(root):
    if root.find('fault') is not None:
        for e in root.findall('fault/value/struct/member'):
            if e.find('name').text == 'faultString':
                message = e.find('value/string').text
                raise XmlRpcFaultError(message)

        raise ValueError("Malformed fault response")

    value_node = root.find('params/param/value')
    result = _distill(value_node)

    return result

def _main():
    args = [
        ('string', _USERNAME),
        ('string', _PASSWORD),
    ]

    root = _send_login(args)
    session_id = _parse_response(root)

    resource_name = 'catalog_product.info'

    args = [
        ('int', 'productId', '314'),
    ]

    root = _send_struct(session_id, resource_name, args)
    result = _parse_response(root)
    _pretty_print(result)

if __name__ == '__main__':
    _main()

Output:

{
    "apparel_type": "33",
    "categories": [
        "13"
    ],
    "category_ids": [
        "13"
    ],
    "color": "27",
    "country_of_manufacture": null,
    "created_at": "2013-03-05T00:48:15-05:00",
    "custom_design": null,
    "custom_design_from": null,
    "custom_design_to": null,
    "custom_layout_update": null,
    "description": "Two sash, convertible neckline with front ruffle detail. Unhemmed, visisble seams. Hidden side zipper. Unlined. Wool/elastane. Hand wash.",
    "fit": null,
    "gender": "94",
    "gift_message_available": null,
    "gift_wrapping_available": null,
    "gift_wrapping_price": null,
    "group_price": [],
    "has_options": "0",
    "image_label": null,
    "is_recurring": "0",
    "length": "82",
    "meta_description": null,
    "meta_keyword": null,
    "meta_title": null,
    "minimal_price": null,
    "msrp": null,
    "msrp_display_actual_price_type": "4",
    "msrp_enabled": "2",
    "name": "Convertible Dress",
    "news_from_date": "2013-03-01 00:00:00",
    "news_to_date": null,
    "occasion": "29",
    "old_id": null,
    "options_container": "container1",
    "page_layout": "one_column",
    "price": "340.0000",
    "product_id": "314",
    "recurring_profile": null,
    "required_options": "0",
    "set": "13",
    "short_description": "This all day dress has a flattering silhouette and a convertible neckline to suit your mood. Wear tied and tucked in a sailor knot, or reverse it for a high tied feminine bow.",
    "size": "72",
    "sku": "wsd017",
    "sleeve_length": "45",
    "small_image_label": null,
    "special_from_date": null,
    "special_price": null,
    "special_to_date": null,
    "status": "1",
    "tax_class_id": "2",
    "thumbnail_label": null,
    "tier_price": [],
    "type": "simple",
    "type_id": "simple",
    "updated_at": "2014-03-08 08:31:20",
    "url_key": "convertible-dress",
    "url_path": "convertible-dress-418.html",
    "visibility": "1",
    "websites": [
        "1"
    ],
    "weight": "1.0000"
}

You may download the code here.

Converting Infix Expressions to Postfix in Python

A simplified Python algorithm for converting infix expressions to postfix expressions using Dijkstra’s “shunting-yard” algorithm. We omit support for functions and their arguments but support parenthesis as expected. For the purpose of this example, we support simple mathematical expressions.

OP_LPAREN = '('
OP_RPAREN = ')'
OP_MULTIPLY = '*'
OP_DIVIDE = '/'
OP_ADD = '+'
OP_SUBTRACT = '-'

OPERATORS_S = set([
    OP_MULTIPLY, 
    OP_DIVIDE, 
    OP_ADD, 
    OP_SUBTRACT, 
    OP_LPAREN, 
    OP_RPAREN,
])

PRECEDENCE = {
    OP_MULTIPLY: 7,
    OP_DIVIDE: 7,
    OP_ADD: 5,
    OP_SUBTRACT: 5,
    OP_LPAREN: 1,
    OP_RPAREN: 1,
}

LEFT_ASSOCIATIVE_S = set([
    OP_MULTIPLY,
    OP_DIVIDE,
    OP_ADD, 
    OP_SUBTRACT, 
    OP_LPAREN, 
    OP_RPAREN,
])

def _convert(expression_phrase):
    expression_phrase = expression_phrase.replace(' ', '')

    stack = []
    output = []
    for c in expression_phrase:
        if c not in OPERATORS_S:
            # It's an operand.
            output += [c]
        elif c not in (OP_LPAREN, OP_RPAREN):
            # It's an operator. Pop-and-add all recent operators that win over 
            # the current operator via precendence/associativity.

            current_prec = PRECEDENCE[c]
            is_left_assoc = c in LEFT_ASSOCIATIVE_S
            while len(stack):
                top_value = stack[-1]
                top_prec = PRECEDENCE[top_value]

                if is_left_assoc is True and current_prec <= top_prec or \
                   is_left_assoc is False and current_prec < top_prec:
                    stack.pop()
                    output += [top_value]
                else:
                    break

            stack.append(c)

        elif c == OP_LPAREN:
            # It's a left paren.

            stack.append(c)
        else: #if c == OP_RPAREN:
            # It's a right paren. Pop-and-add everything since the last left 
            # paren.

            found = False
            while len(stack):
                top_value = stack.pop()
                if top_value == OP_LPAREN:
                    found = True
                    break

                output += [top_value]

            if found is False:
                raise ValueError("Mismatched parenthesis (1).")

    if stack and stack[-1] in (OP_LPAREN, OP_RPAREN):
        raise ValueError("Mismatched parenthesis (2).")

    # Flush everything left on the stack.
    while stack:
        c = stack.pop()
        output += [c]

    return ' '.join(output)

def _main():
    infix_phrase = 'a * (b * c) / d * (e + f + g) * h - i'
    print(infix_phrase)

    postfix_phrase = _convert(infix_phrase)
    print(postfix_phrase)

if __name__ == '__main__':
    _main()

Output:

a * (b * c) / d * (e + f + g) * h - i
a b c * * d / e f + g + * h * i -

You may download the code here.

Accessing Sales History in Magento From Python With XML-RPC

Install the python-magento package and simply pass in database table columns with some criteria. I had difficulty figuring out the structure of the filters because there’s so little documentation/examples anywhere online and, where there are, they all differ format/version.

So, here’s an example:

import magento
import json

_USERNAME = "apiuser"
_PASSWORD = "password"
_HOSTNAME = "dev.bugatchi.com"
_PORT = 80

m = magento.MagentoAPI(_HOSTNAME, _PORT, _USERNAME, _PASSWORD)

filters = {
    'created_at': {
        'from': '2013-05-29 12:38:43',
        'to': '2013-05-29 12:55:33',
    },
}

l = m.sales_order_invoice.list(filters)
print(json.dumps(l))

Output:

[
    {
        "created_at": "2013-05-29 12:38:43",
        "grand_total": "447.1400",
        "increment_id": "100000036",
        "invoice_id": "38",
        "order_currency_code": "USD",
        "order_id": "186",
        "state": "2"
    },
    {
        "created_at": "2013-05-29 12:52:44",
        "grand_total": "333.2100",
        "increment_id": "100000037",
        "invoice_id": "39",
        "order_currency_code": "USD",
        "order_id": "185",
        "state": "2"
    },
    {
        "created_at": "2013-05-29 12:55:33",
        "grand_total": "432.2600",
        "increment_id": "100000038",
        "invoice_id": "40",
        "order_currency_code": "USD",
        "order_id": "184",
        "state": "2"
    }
]

Note that debugging is fairly simple since a failure will return the failed query (unless the server is configured not to). So, you can use that to feel out many of the column names and comparison operators.

Split a Media File by a List of Time Offsets

We’ll split a single audio file containing the whole Quake soundtrack using SplitMedia.

The list file:

0:00:00 Quake Theme
0:05:08 Aftermath
0:07:34 The Hall of Souls
...
1:08:21 Scourge of Armagon 4
1:11:34 Scourge of Armagon 5
...
1:39:57 Dissolution of Eternity 6 
1:43:01 Dissolution of Eternity 7
1:46:07 Dissolution of Eternity 8

The command:

$ splitmedia Quake\ Soundtrack.m4a list_file.quake quake_output
OFF 000:00:00.000 DUR 000308.000 01_QuakeTheme.m4a
OFF 000:05:08.000 DUR 000146.000 02_Aftermath.m4a
OFF 000:07:34.000 DUR 000500.000 03_TheHallofSouls.m4a
...
OFF 001:08:21.000 DUR 000193.000 14_ScourgeofArmagon4.m4a
OFF 001:11:34.000 DUR 000193.000 15_ScourgeofArmagon5.m4a
...
OFF 001:39:57.000 DUR 000184.000 24_DissolutionofEternity6.m4a
OFF 001:43:01.000 DUR 000186.000 25_DissolutionofEternity7.m4a
OFF 001:46:07.000 DUR 000000.000 26_DissolutionofEternity8.m4a

Create a Video From Your Processing Sketch (Using the IDE)

A quick example to show how to create a video from Processing 2.0 . Here, I’m using the Python mode. We write each frame out to a file and then use the built-in Movie Maker to create a QuickTime video (you can also attach audio if you desire).

Example code:

def setup():
    size(500, 500)
    fill(0)

def draw():
    background(255, 255, 255)

    if mousePressed:
        ellipse(mouseX, mouseY, 80, 80)

    saveFrame("frames/frame-#####.png")

For those that aren’t familiar, this just configures the canvas and then constantly clears the canvas with each redraw. If you’re pressing the mouse-button, it’ll draw a circle whereever the cursor is. After the redraw, it’ll capture one PNG image. It’ll implicitly create the “frames” folder if it doesn’t already exist.

Now, we open Movie Maker:

Open Move Maker

Click on the top “Choose…” button to elect your “frames” directory (or whatever you called it):

Dialog

Click “Create Movie…”, select your video-file name/path, and watch it go:

Make Movie

The final result (in my case):

Final Result

For a library-based approach, look into GSVideo.

Drawing to a Video Using OpenCV and Python

I ran into a considerable amount of difficulty writing a video-file using OpenCV (under Python). Almost every video-writing example on the Internet is only concerned with capturing from a webcam, and, even for the relevant examples, I kept getting an empty/insubstantial file.

In order to write a video-file, you need to declare the FOURCC code that you require. I prefer H.264, so I [unsuccessfully] gave it “H264”. I also heard somewhere that since H.264 is actually the standard, I needed to use “X264” to refer to the codec. This didn’t work either. I also tried “XVID” and “DIVX”. I eventually resorted to trying to pass (-1), as this will allegedly prompt you to make a choice (thereby showing you what options are available). Naturally, no prompt was given and yet it still seemed to execute to the end. There doesn’t appear to be a way to show the available codecs. I was out of options.

It turns out that you still have one or more raw-format codecs available. For example, “8BPS” and “IYUV” are available. MJPEG (“MJPG”) also ended-up working, too. This is the best option (so that we can get compression).

It’s important to note that the nicer codecs might’ve not been available simply due to dependencies. At one point, I reinstalled OpenCV (using Brew) with the “–with-ffmpeg” option. This seemed to pull-down XVID and other codecs. However, I still had the same problems. Note that, since this was installed at the time that I tested “MJPG”, the latter may actually require the former.

Code, using MJPEG:

import cv2
import cv
import numpy as np

_CANVAS_WIDTH = 500
_CANVAS_HEIGHT = 500
_COLOR_DEPTH = 3
_CIRCLE_RADIUS = 40
_STROKE_THICKNESS = -1
_VIDEO_FPS = 1

def _make_image(x, y, b, g, r):
    img = np.zeros((_CANVAS_WIDTH, _CANVAS_HEIGHT, _COLOR_DEPTH), np.uint8)
    position = (x, y)
    color = (b, g, r)
    cv2.circle(img, position, _CIRCLE_RADIUS, color, _STROKE_THICKNESS)

    return img

def _make_video(filepath):
    # Works without FFMPEG.
    #fourcc = cv.FOURCC('8', 'B', 'P', 'S')

    # Works, but we don't have a viewer for it.
    #fourcc = cv.CV_FOURCC('i','Y','U', 'V')

    # Works (but might require FFMPEG).
    fourcc = cv.CV_FOURCC('M', 'J', 'P', 'G')

    # Prompt. This never works, though (the prompt never shows).
    #fourcc = -1

    w = cv2.VideoWriter(
            filepath,
            fourcc,
            _VIDEO_FPS,
            (_CANVAS_WIDTH, _CANVAS_HEIGHT))

    img = _make_image(100, 100, 0, 0, 255)
    w.write(img)

    img = _make_image(200, 200, 0, 255, 0)
    w.write(img)

    img = _make_image(300, 300, 255, 0, 0)
    w.write(img)

    w.release()

if __name__ == '__main__':
    _make_video('video.avi')