Python: Retrieve User Info from LDAP

Supports returning the full DN for the user as well as a list of groups that they are a member of:

import logging
import ldap
import collections

# Install:
#
# apt: libsasl2-dev
# pip: python-ldap
#

_USER_QUERY = '(&(objectClass=USER)(sAMAccountName={username}))'
_GROUP_QUERY = '(&(objectClass=GROUP)(cn={group_name}))'

_LOGGER = logging.getLogger(__name__)

_USER = \
    collections.namedtuple(
        '_USER', [
            'dn',
            'attributes',
            'groups',
        ])


class NotFoundException(Exception):
    pass


class LdapAdapter(object):
    def __init__(self, host_and_port, username, password, base_dn):
        self.__host_and_port = host_and_port
        self.__username = username
        self.__password = password
        self.__base_dn = base_dn

        self.__raw_resource = None

    @property
    def __resource(self):
        if self.__raw_resource is None:
            self.__raw_resource = self.__auth()

        return self.__raw_resource

    def __auth(self):
        conn = ldap.initialize('ldap://' + self.__host_and_port)
        conn.protocol_version = 3
        conn.set_option(ldap.OPT_REFERRALS, 0)

        try:
            conn.simple_bind_s(self.__username, self.__password)
        except ldap.INVALID_CREDENTIALS:
            # Pinned, for the future.

            raise
        except ldap.SERVER_DOWN:
            # Pinned, for the future.

            raise
        except ldap.LDAPError:
            _LOGGER.error("LDAP error content:\n{}".format(e.message))

            if issubclass(e.message.__class__, dict) is True and \
               'desc' in e.message:
                raise Exception("LDAP: {}".format(e.message['desc']))

            raise
        else:
            return conn

    def get_dn_by_username(self, username):
        """Return user information. See _USER."""

        uf = _USER_QUERY.format(username=username)
        results_raw = \
            self.__resource.search_s(
                self.__base_dn,
                ldap.SCOPE_SUBTREE,
                uf)

        if not results_raw:
            raise NotFoundException(username)

        results = []
        for dn, attributes in results_raw:
            if dn is None:
                continue

            u = _USER(
                    dn=dn,
                    attributes=attributes,
                    groups=attributes['memberOf'])

            results.append(u)

        assert \
            len(results) == 1, \
            "More than one result was found for user [{}], which doesn't " \
            "make sense: {}".format(username, results)

        return results[0]

    def get_group_members(self, group_name):
        """Return a list of DNs."""

        gf = _GROUP_QUERY.format(group_name=group_name)
        results_raw = \
            self.__resource.search_s(
                self.__base_dn,
                ldap.SCOPE_SUBTREE,
                gf)

        if not results_raw:
            raise NotFoundException(group_name)

        collections = []
        for dn, attributes in results_raw:
            if dn is None:
                continue

            collections.append(attributes['member'])

        if not collections:
            raise NotFoundException(group_name)

        assert \
            len(collections) == 1, \
            "Too many sets of results returned: {}".format(collections)

        return collections[0]
Advertisements

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s