Supports returning the full DN for the user as well as a list of groups that they are a member of:
import logging import ldap import collections # Install: # # apt: libsasl2-dev # pip: python-ldap # _USER_QUERY = '(&(objectClass=USER)(sAMAccountName={username}))' _GROUP_QUERY = '(&(objectClass=GROUP)(cn={group_name}))' _LOGGER = logging.getLogger(__name__) _USER = \ collections.namedtuple( '_USER', [ 'dn', 'attributes', 'groups', ]) class NotFoundException(Exception): pass class LdapAdapter(object): def __init__(self, host_and_port, username, password, base_dn): self.__host_and_port = host_and_port self.__username = username self.__password = password self.__base_dn = base_dn self.__raw_resource = None @property def __resource(self): if self.__raw_resource is None: self.__raw_resource = self.__auth() return self.__raw_resource def __auth(self): conn = ldap.initialize('ldap://' + self.__host_and_port) conn.protocol_version = 3 conn.set_option(ldap.OPT_REFERRALS, 0) try: conn.simple_bind_s(self.__username, self.__password) except ldap.INVALID_CREDENTIALS: # Pinned, for the future. raise except ldap.SERVER_DOWN: # Pinned, for the future. raise except ldap.LDAPError: _LOGGER.error("LDAP error content:\n{}".format(e.message)) if issubclass(e.message.__class__, dict) is True and \ 'desc' in e.message: raise Exception("LDAP: {}".format(e.message['desc'])) raise else: return conn def get_dn_by_username(self, username): """Return user information. See _USER.""" uf = _USER_QUERY.format(username=username) results_raw = \ self.__resource.search_s( self.__base_dn, ldap.SCOPE_SUBTREE, uf) if not results_raw: raise NotFoundException(username) results = [] for dn, attributes in results_raw: if dn is None: continue u = _USER( dn=dn, attributes=attributes, groups=attributes['memberOf']) results.append(u) assert \ len(results) == 1, \ "More than one result was found for user [{}], which doesn't " \ "make sense: {}".format(username, results) return results[0] def get_group_members(self, group_name): """Return a list of DNs.""" gf = _GROUP_QUERY.format(group_name=group_name) results_raw = \ self.__resource.search_s( self.__base_dn, ldap.SCOPE_SUBTREE, gf) if not results_raw: raise NotFoundException(group_name) collections = [] for dn, attributes in results_raw: if dn is None: continue collections.append(attributes['member']) if not collections: raise NotFoundException(group_name) assert \ len(collections) == 1, \ "Too many sets of results returned: {}".format(collections) return collections[0]