Source code for pytan3.auth_methods

# -*- coding: utf-8 -*-
"""Authentication methods for the Tanium API."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import abc
import datetime
import json
import six
import re

from . import exceptions
from .. import utils
from .. import api_clients
from .. import auth_store

DEFAULT_NAME = "credentials"
""":obj:`str`: Default :class:`AuthMethod` name to load in :func:`load`."""

[docs]@six.add_metaclass(abc.ABCMeta) class AuthMethod(object): """Abstract base class for all AuthMethods."""
[docs] @classmethod @abc.abstractmethod def get_name(cls): """Get the ref name of this class for use by :func:`load`. Returns: :obj:`str` """ raise NotImplementedError # pragma: no cover
[docs] @classmethod @abc.abstractmethod def get_version_req(cls): """Get the min, max, and eq version requirements of this class. Notes: Dict can specify keys: "vmin", "vmax", "veq". This class method gets called by :func:`pytan3.utils.versions.version_check_obj_req` to perform version checks. Returns: :obj:`dict` """ raise NotImplementedError # pragma: no cover
[docs] @classmethod @abc.abstractmethod def get_args(cls): """Get all arguments used by this class. Returns: :obj:`list` """ raise NotImplementedError # pragma: no cover
[docs] @classmethod @abc.abstractmethod def get_args_required(cls): """Get all arguments used by this class that are required. Returns: :obj:`list` """ raise NotImplementedError # pragma: no cover
[docs] @classmethod @abc.abstractmethod def get_args_secure(cls): """Get all arguments used by this class that need their prompt input hidden. Returns: :obj:`list` """ raise NotImplementedError # pragma: no cover
[docs] @classmethod @abc.abstractmethod def from_store(cls, store, **kwargs): """Create an instance of this class from an :obj:`pytan3.auth_store.AuthStore`. Args: store (:obj:`pytan3.auth_store.AuthStore`): AuthStore to get AuthMethod arguments from. Returns: :obj:`AuthMethod` """ raise NotImplementedError # pragma: no cover
@abc.abstractproperty def http_client(self): """Get the HTTP Client for this object. Returns: :obj:`pytan3.http_client.HttpClient` """ raise NotImplementedError # pragma: no cover @abc.abstractproperty def token(self): """Get or set a token. Notes: If no token received, :meth:`login` to get one. If token expired, :meth:`login` to get a new one. If token has been received, :meth:`validate` to validate token. If token fails validation, :meth:`login` to get a new one. Returns: :obj:`str` """ raise NotImplementedError # pragma: no cover @abc.abstractproperty def token_headers(self): """Get dict with :attr:`token` for use in headers. Returns: :obj:`dict` """ raise NotImplementedError # pragma: no cover @abc.abstractproperty def uid(self): """Get user ID for token if a token has been received. Returns: :obj:`str` """ raise NotImplementedError # pragma: no cover @abc.abstractproperty def logged_in(self): """Check if a token has been received. Returns: :obj:`bool` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def login(self, **kwargs): """Send a login request to receive a token. Raises: :exc:`exceptions.LoginError`: If status code in response is not 200. Returns: :obj:`str` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def logout(self, **kwargs): """Send a logout request to revoke a token. Raises: :exc:`exceptions.NotLoggedInError`: If :attr:`logged_in` is False. :exc:`exceptions.LogoutError`: If status code in response is not 200. Returns: None """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def logout_all(self, **kwargs): """Send a logout request to revoke all tokens associated with this token. Raises: :exc:`exceptions.NotLoggedInError`: If :attr:`logged_in` is False. :exc:`exceptions.LogoutError`: If status code in response is not 200. Returns: None """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def validate(self, **kwargs): """Send a validate request to check that token is still valid. Raises: :exc:`exceptions.NotLoggedInError`: If :attr:`logged_in` is False. :exc:`exceptions.InvalidToken`: If status code in response is not 200. Returns: :obj:`str` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def create_store(self, **kwargs): """Create an instance of :obj:`pytan3.auth_store.AuthStore` from this object. Returns: :obj:`pytan3.auth_store.AuthStore` """ raise NotImplementedError # pragma: no cover
[docs]class CommonMixin(object): """Shared methods common amongst all :class:`AuthMethod`."""
[docs] def __init__( self, http_client, login_timeout=5, logout_timeout=5, expires_after=295, ver_check=True, lvl="info", ): """Constructor. Args: http_client (:obj:`pytan3.http_client.HttpClient`): Object for sending HTTP requests. login_timeout (:obj:`int`, optional): Timeout for login and validate responses in seconds. Defaults to: 5. logout_timeout (:obj:`int`, optional): Timeout for logout responses in seconds. Defaults to: 5. expires_after (:obj:`int`, optional): Life of received tokens in seconds. Defaults to: 295. ver_check (:obj:`bool`, optional): Perform version checks against the platform version from :func:`pytan3.api_clients.get_version` using :func:`pytan3.utils.versions.version_check_obj_req`. Defaults to: True. lvl (:obj:`str`, optional): Logging level for this object. Defaults to: "info". """ self.log = utils.logs.get_obj_log(obj=self, lvl=lvl) """:obj:`logging.Logger`: Log for this object.""" self.revalidate_after = 5 """:obj:`int`: Revalidate token if :attr:`last_used` is higher than this.""" self._http_client = http_client self._login_timeout = login_timeout self._logout_timeout = logout_timeout self._expires_after = expires_after self._last_used = None self._token = None self.token = None if ver_check and any(self.get_version_req().values()): utils.versions.version_check_obj_req( version=api_clients.get_version(self.http_client), src=self.http_client.url, obj=self, )
[docs] def __str__(self): """Show object info. Returns: :obj:`str` """ last_used = or "NEVER" expires_in = self._expires_after - (self.last_used or 0) expires_in = "EXPIRED" if self.expired else bits = [ "logged_in={!r}".format(self.logged_in), "last_used={!r}".format(last_used), "expires_in={!r}".format(expires_in), ] bits = "({})".format(", ".join(bits)) cls = "{c.__module__}.{c.__name__}".format(c=self.__class__) return "{cls}{bits}".format(cls=cls, bits=bits)
[docs] def __repr__(self): """Show object info. Returns: :obj:`str` """ return self.__str__()
[docs] @classmethod def from_store(cls, store, **kwargs): """Create an instance of this class from an :obj:`pytan3.auth_store.AuthStore`. Args: store (:obj:`pytan3.auth_store.AuthStore`): AuthStore to get AuthMethod arguments from. **kwargs: Rest of kwargs passed to :class:`AuthMethod`. Returns: :obj:`AuthMethod` """ method_cls = load(store.method) args = method_cls.get_args() req_args = method_cls.get_args_required() kwargs.setdefault("http_client", store.http_client) kwargs.update({k: store.get(k, required=k in req_args) for k in args}) return method_cls(**kwargs)
@property def http_client(self): """Get the HTTP Client for this object. Returns: :obj:`pytan3.http_client.HttpClient` """ return self._http_client @property def token(self): """Get a token. Notes: If :attr:`logged_in` is False, :meth:`login` to get one. If :attr:`expired` is True, :meth:`login` to get a new one. If :attr:`logged_in` is True, and :attr:`last_used` is older than :attr:`revalidate_after`, :meth:`validate` to re-validate token. If :meth:`validate` fails validation, :meth:`login` to get a new one. Returns: :obj:`str` """ if not self.logged_in: self.login(cause="Initial login") elif self.expired: self.login(cause="Re-login due to expired token") else: if self.last_used is None or self.last_used >= self.revalidate_after: try: self.validate(cause="Validate existing token") except exceptions.InvalidToken: m = "Token for User ID {uid!r} no longer valid, getting new one" m = m.format(uid=self.uid) self.log.debug(m) self.login(cause="Re-login due to invalid token") return self._token @token.setter def token(self, value): """Set the token to value and :attr:`last_used` to now. Args: value (:obj:`str`): Value to set _token to """ self._token = value if value is None: self._last_used = None else: self._last_used = datetime.datetime.utcnow() @property def token_headers(self): """Get dict with :attr:`token` for use in headers. Returns: :obj:`dict` """ return {"session": self.token} @property def uid(self): """Get user ID for token if a token has been received. Returns: :obj:`str` """ return int((self._token or self.token).split("-")[0]) @property def logged_in(self): """Check if a token has been received. Returns: :obj:`bool` """ return True if self._token else False @property def expired(self): """Check if token has expired. Returns: :obj:`bool` """ return self.last_used is None or self.last_used >= self._expires_after @property def last_used(self): """Get the number of seconds since token was issued or last used. Returns: :obj:`int` """ # is_none = self._last_used is None return ( None if self._last_used is None else )
[docs] def login(self, **kwargs): """Send a login request to receive a token. Args: **kwargs: cause (:obj:`str`): String to explain purpose of request. Defaults to: "Get new token". Raises: :exc:`exceptions.LoginError`: If status code in response is not 200. Returns: :obj:`str` """ r = self.http_client( path="/auth", method="post", headers={k: v for k, v in self._headers.items() if v}, b64_headers=self._b64_headers, timeout=self._login_timeout, cause=kwargs.pop("cause", "Get new token"), ) if r.status_code != 200: raise exceptions.LoginError(auth_method=self, response=r) self.token = r.text m = "Token received for User ID {uid!r} from {url!r}" m = m.format(uid=self.uid, url=r.url) self.log.debug(m) return r.text
[docs] def logout(self, **kwargs): """Send a logout request to revoke a token. Args: **kwargs: cause (:obj:`str`): String to explain purpose of request. Defaults to: "Revoke token". headers (:obj:`dict`): Headers to send in request. Defaults to: {}. Raises: :exc:`exceptions.NotLoggedInError`: If :attr:`logged_in` is False. :exc:`exceptions.LogoutError`: If status code in response is not 200. Returns: None """ if not self.logged_in: raise exceptions.NotLoggedInError(auth_method=self) r = self.http_client( path="/auth", method="post", headers={"session": self._token, "logout": "0"}, b64_headers=self._b64_headers, timeout=self._logout_timeout, cause=kwargs.pop("cause", "Revoke token"), ) if r.status_code != 200: raise exceptions.LogoutError(auth_method=self, response=r) m = "Token revoked for User ID {uid!r} from {url!r}" m = m.format(uid=self.uid, url=r.url) self.log.debug(m) self.token = None return None
[docs] def logout_all(self, **kwargs): """Send a logout request to revoke all tokens associated with this token. Args: **kwargs: cause (:obj:`str`): String to explain purpose of request. Defaults to: "Revoke all tokens". headers (:obj:`dict`): Headers to send in request. Defaults to: {}. Raises: :exc:`exceptions.NotLoggedInError`: If :attr:`logged_in` is False. :exc:`exceptions.LogoutError`: If status code in response is not 200. Returns: None """ if not self.logged_in: raise exceptions.NotLoggedInError(auth_method=self) r = self.http_client( path="/auth", method="post", headers={"session": self._token, "logout": "1"}, b64_headers=self._b64_headers, timeout=self._logout_timeout, cause=kwargs.pop("cause", "Revoke all tokens"), ) if r.status_code != 200: raise exceptions.LogoutError(auth_method=self, response=r) m = "All tokens revoked for User ID {uid!r} from {url!r}" m = m.format(uid=self.uid, url=r.url) self.log.debug(m) self.token = None return None
[docs] def validate(self, **kwargs): """Send a validate request to check that token is still valid. Args: **kwargs: cause (:obj:`str`): String to explain purpose of request. Defaults to: "Validate token". headers (:obj:`dict`): Headers to send in request. Defaults to: {}. Raises: :exc:`exceptions.NotLoggedInError`: If :attr:`logged_in` is False. :exc:`exceptions.InvalidToken`: If status code in response is not 200. Returns: :obj:`str` """ if not self.logged_in: raise exceptions.NotLoggedInError(auth_method=self) r = self.http_client( path="/auth", method="post", headers={"session": self._token}, timeout=self._login_timeout, cause=kwargs.pop("cause", "Validate token"), ) if r.status_code != 200: raise exceptions.InvalidToken(auth_method=self, response=r) m = "Token validated for User ID {uid!r} from {url!r}" m = m.format(uid=self.uid, url=r.url) self.log.debug(m) self.token = r.text return r.text
[docs] def create_store(self, **kwargs): """Create an instance of :obj:`pytan3.auth_store.AuthStore` from this object. Returns: :obj:`pytan3.auth_store.AuthStore` """ kwargs.setdefault("http_client", self.http_client) kwargs.setdefault("src", "method") store = auth_store.AuthStore(**kwargs) store.method = self for key in self.get_args(): store.set(key=key, value=self._headers[key]) return store
[docs]class Credentials(CommonMixin, AuthMethod): """Method that uses credentials to interact with the '/auth' API."""
[docs] def __init__( self, http_client, username, password, domain="", secondary="", login_timeout=5, logout_timeout=5, expires_after=295, ver_check=True, lvl="info", ): """Constructor. Args: http_client (:obj:`pytan3.http_client.HttpClient`): HTTP client. username (:obj:`str`): Header to pass to /auth API. password (:obj:`str`): Header to pass to /auth API. domain (:obj:`str`, optional): Header to pass to /auth API. Defaults to: "" secondary (:obj:`str`, optional): Header to pass to /auth API. Defaults to: "" login_timeout (:obj:`int`, optional): Timeout for login and validate responses in seconds. Defaults to: 5. logout_timeout (:obj:`int`, optional): Timeout for logout responses in seconds. Defaults to: 5. expires_after (:obj:`int`, optional): Life of received tokens in seconds. Defaults to: 295. ver_check (:obj:`bool`, optional): Perform version checks against the platform version from :func:`pytan3.api_clients.get_version` using :func:`pytan3.utils.versions.version_check_obj_req`. Defaults to: True. lvl (:obj:`str`, optional): Logging level for this object. Defaults to: "info". """ self._headers = { "username": username, "password": password, "domain": domain, "secondary": secondary, } self._b64_headers = ["username", "password"] super(Credentials, self).__init__( http_client=http_client, login_timeout=login_timeout, logout_timeout=logout_timeout, expires_after=expires_after, ver_check=ver_check, lvl=lvl, )
[docs] @classmethod def get_name(cls): """Get the ref name of this class for use by :func:`load`. Returns: :obj:`str` """ return "credentials"
[docs] @classmethod def get_version_req(cls): """Get the min, max, and eq version requirements of this class. Notes: Dict can specify keys: "vmin", "vmax", "veq". This class method gets called by :func:`pytan3.utils.versions.version_check_obj_req` to perform version checks. Returns: :obj:`dict` """ return {"vmin": "", "vmax": "", "veq": ""}
[docs] @classmethod def get_args(cls): """Get all arguments used by this class. Returns: :obj:`list` """ return ["username", "password", "secondary", "domain"]
[docs] @classmethod def get_args_required(cls): """Get all arguments used by this class that are required. Returns: :obj:`list` """ return ["username", "password"]
[docs] @classmethod def get_args_secure(cls): """Get all arguments used by this class that need their prompt input hidden. Returns: :obj:`list` """ return ["password"]
[docs]class SessionId(CommonMixin, AuthMethod): """Method that uses session id to interact with the '/auth' API."""
[docs] def __init__( self, http_client, session, login_timeout=5, logout_timeout=5, expires_after=295, ver_check=True, lvl="info", ): """Constructor. Args: http_client (:obj:`pytan3.http_client.HttpClient`): HTTP client. session (:obj:`str`): Header to pass to /auth API. login_timeout (:obj:`int`, optional): Timeout for login and validate responses in seconds. Defaults to: 5. logout_timeout (:obj:`int`, optional): Timeout for logout responses in seconds. Defaults to: 5. expires_after (:obj:`int`, optional): Life of received tokens in seconds. Defaults to: 295. ver_check (:obj:`bool`, optional): Perform version checks against the platform version from :func:`pytan3.api_clients.get_version` using :func:`pytan3.utils.versions.version_check_obj_req`. Defaults to: True. lvl (:obj:`str`, optional): Logging level for this object. Defaults to: "info". """ self._headers = {"session": session} self._b64_headers = [] super(SessionId, self).__init__( http_client=http_client, login_timeout=login_timeout, logout_timeout=logout_timeout, expires_after=expires_after, ver_check=ver_check, lvl=lvl, ) self.token = session
[docs] @classmethod def get_name(cls): """Get the ref name of this class for use by :func:`load`. Returns: :obj:`str` """ name = "session_id" return name
[docs] @classmethod def get_version_req(cls): """Get the min, max, and eq version requirements of this class. Notes: Dict can specify keys: "vmin", "vmax", "veq". This class method gets called by :func:`pytan3.utils.versions.version_check_obj_req` to perform version checks. Returns: :obj:`dict` """ return {"vmin": "", "vmax": "", "veq": ""}
[docs] @classmethod def get_args(cls): """Get all arguments used by this class. Returns: :obj:`list` """ return ["session"]
[docs] @classmethod def get_args_required(cls): """Get all arguments used by this class that are required. Returns: :obj:`list` """ return ["session"]
[docs] @classmethod def get_args_secure(cls): """Get all arguments used by this class that need their prompt input hidden. Returns: :obj:`list` """ return []
@property def token(self): """Get a token. Notes: If :attr:`CommonMixin.logged_in` is False, can not login to get a new one with just a session. If :attr:`CommonMixin.expired` is True, can not login to get a new one with just a session. If :attr:`CommonMixin.logged_in` is True, and :attr:`CommonMixin.last_used` is older than :attr:`CommonMixin.revalidate_after`, :meth:`CommonMixin.validate` to re-validate token. If :meth:`CommonMixin.validate` fails validation, can not login to get a new one with just a session. Returns: :obj:`str` """ if self.last_used is None or self.last_used >= self.revalidate_after: self.validate(cause="Validate existing token") return self._token @token.setter def token(self, value): """Set the token to value and :attr:`CommonMixin.last_used` to now. Args: value (:obj:`str`): Value to set _token to """ self._token = value if value is None: self._last_used = None self._last_validated = None else: self._last_used = datetime.datetime.utcnow() self._last_validated = datetime.datetime.utcnow()
[docs] def login(self, **kwargs): """Send a login request to receive a token. Raises: :exc:`exceptions.LoginError`: If status code in response is not 200. Returns: :obj:`str` """ error = "Unable to perform a login with a session ID, can only validate" raise NotImplementedError(error)
[docs]class RestCredentials(CommonMixin, AuthMethod): """Method that uses credentials to interact with the REST 'session' API."""
[docs] def __init__( self, http_client, username, password, domain="", secondary="", login_timeout=5, logout_timeout=5, expires_after=295, ver_check=True, lvl="info", ): """Constructor. Args: http_client (:obj:`pytan3.http_client.HttpClient`): HTTP client. username (:obj:`str`): Header to pass to /api/v2/session REST API. password (:obj:`str`): Header to pass to /api/v2/session REST API. domain (:obj:`str`, optional): Header to pass to /api/v2/session REST API. Defaults to: "" secondary (:obj:`str`, optional): Header to pass to /api/v2/session REST API. Defaults to: "" login_timeout (:obj:`int`, optional): Timeout for login and validate responses in seconds. Defaults to: 5. logout_timeout (:obj:`int`, optional): Timeout for logout responses in seconds. Defaults to: 5. expires_after (:obj:`int`, optional): Life of received tokens in seconds. Defaults to: 295. ver_check (:obj:`bool`, optional): Perform version checks against the platform version from :func:`pytan3.api_clients.get_version` using :func:`pytan3.utils.versions.version_check_obj_req`. Defaults to: True. lvl (:obj:`str`, optional): Logging level. Defaults to: "info". """ self._headers = { "username": username, "password": password, "domain": domain, "secondary": secondary, } self._b64_headers = ["username", "password"] super(RestCredentials, self).__init__( http_client=http_client, login_timeout=login_timeout, logout_timeout=logout_timeout, expires_after=expires_after, ver_check=ver_check, lvl=lvl, )
[docs] @classmethod def get_name(cls): """Get the ref name of this class for use by :func:`load`. Returns: :obj:`str` """ return "rest_credentials"
[docs] @classmethod def get_version_req(cls): """Get the min, max, and eq version requirements of this class. Notes: Dict can specify keys: "vmin", "vmax", "veq". This class method gets called by :func:`pytan3.utils.versions.version_check_obj_req` to perform version checks. Returns: :obj:`dict` """ return {"vmin": "7.3.314.3409", "vmax": "", "veq": ""}
[docs] @classmethod def get_args(cls): """Get all arguments used by this class. Returns: :obj:`list` """ return ["username", "password", "secondary", "domain"]
[docs] @classmethod def get_args_required(cls): """Get all arguments used by this class that are required. Returns: :obj:`list` """ return ["username", "password"]
[docs] @classmethod def get_args_secure(cls): """Get all arguments used by this class that need their prompt input hidden. Returns: :obj:`list` """ return ["password"]
@property def rest_version(self): """Version of REST API for this method. Returns: :obj:`int` """ return 2
[docs] def login(self, **kwargs): """Send a login request to receive a token. Args: **kwargs: cause (:obj:`str`): String to explain purpose of request. Defaults to: "Get new token". Raises: :exc:`exceptions.LoginError`: If status code in response is not 200. Returns: :obj:`str` """ r = self.http_client( path="/api/v{v}/{e}".format(v=self.rest_version, e="session/login"), method="post", data=json.dumps({k: v for k, v in self._headers.items() if v}, indent=2), timeout=self._login_timeout, cause=kwargs.pop("cause", "Get new token"), ) if r.status_code != 200: raise exceptions.LoginError(auth_method=self, response=r) token = r.json()["data"]["session"] self.token = token m = "Token received for User ID {uid!r} from {url!r}" m = m.format(uid=self.uid, url=r.url) self.log.debug(m) return token
[docs] def logout(self, **kwargs): """Send a logout request to revoke a token. Args: **kwargs: cause (:obj:`str`): String to explain purpose of request. Defaults to: "Revoke token". Raises: :exc:`exceptions.NotLoggedInError`: If :attr:`CommonMixin.logged_in` is False. :exc:`exceptions.LogoutError`: If status code in response is not 200. Returns: None """ if not self.logged_in: raise exceptions.NotLoggedInError(auth_method=self) r = self.http_client( path="/api/v{v}/{e}".format(v=self.rest_version, e="session/logout"), method="post", data=json.dumps({"session": self._token}), timeout=self._login_timeout, cause=kwargs.pop("cause", "Revoke token"), ) if r.status_code != 200: raise exceptions.LogoutError(auth_method=self, response=r) m = "Token revoked for User ID {uid!r} from {url!r}" m = m.format(uid=self.uid, url=r.url) self.log.debug(m) self.token = None return None
[docs] def logout_all(self, **kwargs): """Send a logout request to revoke all tokens associated with this token. Raises: :exc:`NotImplementedError`: REST 'session' endpoint does not have a logout all target. """ error = "REST API 'session' endpoint does not have a target for logout all." raise NotImplementedError(error)
[docs] def validate(self, **kwargs): """Send a validate request to check that token is still valid. Args: **kwargs: cause (:obj:`str`): String to explain purpose of request. Defaults to: "Validate token". Raises: :exc:`exceptions.NotLoggedInError`: If :attr:`CommonMixin.logged_in` is False. :exc:`exceptions.InvalidToken`: If status code in response is not 200. Returns: :obj:`str` """ if not self.logged_in: raise exceptions.NotLoggedInError(auth_method=self) r = self.http_client( path="/api/v{v}/{e}".format(v=self.rest_version, e="session/validate"), method="post", data=json.dumps({"session": self._token}), timeout=self._login_timeout, cause=kwargs.pop("cause", "Validate token"), ) if r.status_code != 200: raise exceptions.InvalidToken(auth_method=self, response=r) token = r.json()["data"]["session"] m = "Token validated for User ID {uid!r} from {url!r}" m = m.format(uid=self.uid, url=r.url) self.log.debug(m) self.token = token return token
[docs]def load(obj=DEFAULT_NAME): """Get an AuthMethod class. Args: obj (:obj:`str` or :obj:`AuthMethod` or :class:`AuthMethod` or :obj:`pytan3.auth_store.AuthStore`, optional): AuthStore object or AuthMethod object, class, or name of AuthMethod. Defaults to: :data:`DEFAULT_NAME`. Raises :exc:`exceptions.ModuleError`: If obj is not a valid type, name, obj, or cls. Returns: :class:`AuthMethod` """ if isinstance(obj, auth_store.AuthStore): return obj.method if isinstance(obj, AuthMethod): return obj.__class__ if callable(obj) and issubclass(obj, AuthMethod): return obj if isinstance(obj, six.string_types): for method_cls in AuthMethod.__subclasses__(): if method_cls.get_name() == obj: return method_cls valids = [x.get_name() for x in AuthMethod.__subclasses__()] error = "{obj!r} is not a valid {cls}, try one of:\n {valids}" error = error.format(obj=obj, cls=AuthMethod, valids=valids) raise exceptions.ModuleError(error)
[docs]def validate_token(token): """Validate that a token is properly formed. Args: token (:obj:`str`): Token to validate Returns: :obj:`bool` """ return re.match(r"(\d+-\d+-)\w+", token) is not None