Source code for pytan3.auth_store

# -*- coding: utf-8 -*-
"""Secure storage for Authentication methods."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import json
import six
import warnings

from . import exceptions
from .. import utils
from .. import auth_methods

DEFAULT_NAME = "credentials"
""":obj:`str`: Default :class:`pytan3.auth_methods.AuthMethod` name to load in
:obj:`AuthStore`.
"""

STORE_FILE = utils.tools.get_env(
    key="PYTAN_STORE_FILE", default="{http_client.parsed_url.hostname}.store"
)
""":obj:`str`: Store filename template filename to use in :class:`AuthStore`.

Will use OS Environment variable "PYTAN_STORE_FILE" if set.
"""

STORE_SECRET = utils.tools.get_env(
    key="PYTAN_STORE_SECRET", default="B####VjNyMWx5ITFAbV9QcjB0M2N0M2QmSDFkZDNu"
)  # Default key to use for :obj:`AuthStore`.


[docs]class AuthStore(object): """Secure storage for :class:`pytan3.auth_methods.AuthMethod`."""
[docs] def __init__( self, http_client, method=DEFAULT_NAME, secret=None, data=None, src="init", lvl="info", ): """Constructor. Args: http_client (:obj:`pytan3.http_client.HttpClient`): HTTP client. method (:obj:`str` or :class:`pytan3.auth_methods.AuthMethod`, optional): AuthMethod to use for this object. Defaults to: :data:`DEFAULT_NAME`. secret (:obj:`str`, optional): Encryption key. Will use STORE_SECRET as default if None. Defaults to: None. data (:obj:`dict`, optional): Initialize data store with dict. Defaults to: None. src (:obj:`str`, optional): Where this store came from. Defaults to: "init". lvl (:obj:`str`, optional): Logging level. Defaults to: "info". """ secret = STORE_SECRET if secret is None else secret self._log = utils.logs.get_obj_log(obj=self, lvl=lvl) self._http_client = http_client self.__data = {} self.__data.update(data or {}) self.src = src self.secret = secret self.method = method
[docs] def __str__(self): """Show object info. Returns: :obj:`str` """ bits = ["from={!r}".format(self.src)] bits = "({})".format(", ".join(bits)) cls = "{c.__module__}.{c.__name__}".format(c=self.__class__) return "{cls}{bits}".format(cls=cls, bits=bits)
[docs] def __repr__(self): """Show object info. Returns: :obj:`str` """ return self.__str__()
@property def http_client(self): """Get the HTTP Client for this object. Returns: :obj:`pytan3.http_client.HttpClient` """ return self._http_client
[docs] def set(self, key, value): """Set key value in data. Args: key (:obj:`str`): Key name to set in data. value (:obj:`str`): Value to set for key in data. """ self.__data[key] = value
[docs] def get(self, key, required=True): """Get key value from data. Args: key (:obj:`str`): Key to get, required (:obj:`bool`, optional): Raise error if key not found in store. Defaults to: True. Raises: :exc:`exceptions.ModuleError`: If key is required and not found. Returns: :obj:`str` """ try: return self.__data[key] except KeyError: if required: error = "No required key named {k!r} set for this AuthStore!" error = error.format(k=key) raise exceptions.ModuleError(error) else: return ""
@property def secret(self): """Get the secret for this store. Returns: :obj:`str` """ return self._build_key(http_client=self.http_client, secret=self.__secret) @secret.setter def secret(self, value): """Set the secret for this store. Args: value (:obj:`str`): secret to set. """ self.__secret = value @property def method(self): """Get the method class for this store. Returns: :class:`pytan3.auth_methods.AuthMethod` """ return auth_methods.load(self.get("METHOD", True)) @method.setter def method(self, method=DEFAULT_NAME): """Set the method name for this store. Args: method (:obj:`str` or :class:`pytan3.auth_methods.AuthMethod`, optional): AuthMethod to use for this object. Defaults to: :data:`DEFAULT_NAME`. """ self.set("METHOD", auth_methods.load(method).get_name())
[docs] def create_method(self, **kwargs): """Create an :obj:`pytan3.auth_methods.AuthMethod` from this store. Args: **kwargs: Rest of kwargs passed to :func:`pytan3.auth_methods.AuthMethod.from_store`. Returns: :obj:`pytan3.auth_methods.AuthMethod` """ return self.method.from_store(store=self, **kwargs)
[docs] def to_stream(self, stream=None): """Write this stores encrypted data to a file like object. Args: stream (:obj:`io.IOBase`, optional): Object to write store to. If None, create and return a :obj:`six.StringIO`. Defaults to: None. Returns: :obj:`io.IOBase` """ if stream is None: stream = six.StringIO() data = self._encrypt( data=self.__data, http_client=self.http_client, secret=self.__secret ) stream.write(data) stream.seek(0) return stream
[docs] def to_path( self, path=None, path_sub="stores", path_file=STORE_FILE, overwrite=False ): """Write this stores encrypted data to a path. Args: path (:obj:`str` or :obj:`pathlib.Path`, optional): Storage directory to use. If empty, resolve path via :func:`pytan3.utils.tools.get_storage_dir`. Defaults to: None. path_sub (:obj:`str`, optional): Sub directory under path that should contain path_file. Defaults to: "stores" path_file (:obj:`str`, optional): Filename to use for store file under path / path_sub. Defaults to: :data:`STORE_FILE` overwrite (:obj:`bool`, optional): If True, if store_file exists, overwrite and throw warning. If False, if store_file exists, do not overwrite and throw exception. If None, if store_file exists, do not overwrite and throw warning. Defaults to: False. Raises: :exc:`exceptions.ModuleError`: If path / path_sub / path_file exists and overwrite is False :exc:`exceptions.ModuleWarning`: If path / path_sub / path_file exists and overwrite is None or True. Returns: :obj:`pathlib.Path`: Absolute full path where store file was written. """ path = utils.tools.get_storage_dir(path=path, path_sub=path_sub, mkdir=True) path = path / path_file.format(http_client=self.http_client) if path.is_file(): pre = "overwrite is {overwrite!r} and store file {path!r} already exists" pre = pre.format(overwrite=overwrite, path=format(path)) if overwrite is False: error = "{}, not overwriting".format(pre) raise exceptions.ModuleError(error) elif overwrite is None: error = "{}, not overwriting".format(pre) warnings.warn(error, exceptions.ModuleWarning) return path else: error = "{}, overwriting".format(pre) warnings.warn(error, exceptions.ModuleWarning) with path.open(mode="wt") as stream: stream = self.to_stream(stream=stream) path.chmod(0o600) return path
[docs] def to_string(self): """Write this stores encrypted data to a string. Returns: :obj:`str` """ stream = six.StringIO() stream = self.to_stream(stream=stream) data = stream.getvalue() stream.close() return data
[docs] @classmethod def from_stream(cls, http_client, stream, secret=None, src="stream", lvl="info"): """Create store from encrypted data in a file like object. Args: http_client (:obj:`pytan3.http_client.HttpClient`): HTTP client. stream (:obj:`io.IOBase`): File like object to read from. secret (:obj:`str`, optional): Decryption key. Will use STORE_SECRET as default if None. Defaults to: None. src (:obj:`str`, optional): Where this store came from. Defaults to: "stream". lvl (:obj:`str`, optional): Logging level for this object. Defaults to: "info". Returns: :obj:`AuthStore` """ secret = STORE_SECRET if secret is None else secret stream.seek(0) data = cls._decrypt(data=stream.read(), http_client=http_client, secret=secret) return cls(http_client=http_client, data=data, secret=secret, src=src, lvl=lvl)
[docs] @classmethod def from_string(cls, http_client, string, secret=None, lvl="info"): """Create store from encrypted data in a string. Args: http_client (:obj:`pytan3.http_client.HttpClient`): HTTP client. string (:obj:`str`): String to read from. secret (:obj:`str`, optional): Decryption key. Will use STORE_SECRET as default if None. Defaults to: None. lvl (:obj:`str`, optional): Logging level for this object. Defaults to: "info". Returns: :obj:`AuthStore` """ secret = STORE_SECRET if secret is None else secret stream = six.StringIO(string) store = cls.from_stream( http_client=http_client, stream=stream, secret=secret, src="string", lvl=lvl ) stream.close() return store
[docs] @classmethod def from_path( cls, http_client, path=None, path_sub="stores", path_file=STORE_FILE, secret=None, lvl="info", ): """Create store from encrypted data in a file. Args: http_client (:obj:`pytan3.http_client.HttpClient`): HTTP client. path (:obj:`str` or :obj:`pathlib.Path`, optional): Storage directory to use. If empty, resolve path via :func:`pytan3.utils.tools.get_storage_dir`. Defaults to: None. path_sub (:obj:`str`, optional): Sub directory under path that should contain path_file. Defaults to: "stores" path_file (:obj:`str`, optional): Filename to use for store file under path / path_sub. Defaults to: :data:`STORE_FILE` secret (:obj:`str`, optional): Decryption key. Will use STORE_SECRET as default if None. Defaults to: None. lvl (:obj:`str`, optional): Logging level for this object. Defaults to: "info". Raises: (:obj:`exceptions.ModuleError`): If path does not exist as a file. Returns: :obj:`AuthStore` """ secret = STORE_SECRET if secret is None else secret path = utils.tools.get_storage_dir(path=path, path_sub=path_sub, mkdir=False) path = path / path_file.format(http_client=http_client) if not path.is_file(): error = "File {path!r} does not exist, unable to read AuthStore data!" error = error.format(path=format(path)) raise exceptions.ModuleError(error) with path.open(mode="rt") as stream: store = cls.from_stream( http_client=http_client, stream=stream, secret=secret, src="path", lvl=lvl, ) return store
# LATER(!) figure out once prompting land entered again # @classmethod # def from_prompts( # cls, # http_client, # method=DEFAULT_NAME, # secret=STORE_SECRET, # prompt_method=True, # lvl="info", # ): # """Natch.""" # promptness = utils.prompts.promptness # if prompt_method: # ask = "Pick an authentication method:" # choices = [x.get_name() for x in AuthMethod.__subclasses__()] # method = promptness.ask_choice(ask, choices=choices, default=method) # ask = "Use Custom Secret? " # ask_secret = promptness.ask_bool(ask, default=False) # if ask_secret: # ask = "Custom secret:" # secret = promptness.ask_str(ask, secure=True) # method_cls = load(method) # method_args = method_cls.get_args() # req_args = method_cls.get_args_required() # sec_args = method_cls.get_args_secure() # store = cls(http_client=http_client, secret=secret, src="prompt", lvl=lvl) # store.method = method_cls # for arg in method_args: # ask = "Provide {!r}".format(arg) # value = promptness.ask_str( # ask, default=None if arg in req_args else "", secure=arg in sec_args # ) # store.set(key=arg, value=value) # return store
[docs] @classmethod def _decrypt(cls, data, http_client, secret=None): """Decrypt data using secret. Args: data (:obj:`str`): Data to decrypt. http_client (:obj:`pytan3.http_client.HttpClient`): HTTP client. secret (:obj:`str`, optional): Decryption key. Will use STORE_SECRET as default if None. Defaults to: None. Returns: :obj:`str` """ secret = STORE_SECRET if secret is None else secret key = cls._build_key(http_client=http_client, secret=secret) data = six.ensure_text(data)[7:] data = utils.crypt.decrypt(data=data, key=key) return json.loads(data)
[docs] @classmethod def _encrypt(cls, data, http_client, secret=None): """Encrypt data using secret. Args: data (:obj:`str`): Data to encrypt. http_client (:obj:`pytan3.http_client.HttpClient`): HTTP client. secret (:obj:`str`, optional): Decryption key. Will use STORE_SECRET as default if None. Defaults to: None. Returns: :obj:`str` """ secret = STORE_SECRET if secret is None else secret key = cls._build_key(http_client=http_client, secret=secret) data = json.dumps(data) data = utils.crypt.encrypt(data=data, key=key) pre = "::CSC::" if secret == STORE_SECRET else "::CCC::" return "{}{}".format(pre, six.ensure_text(data))
[docs] @classmethod def _build_key(cls, http_client, secret=None): """Build an encryption key by combining an SSL cert PEM and a secret. Args: http_client (:obj:`pytan3.http_client.HttpClient`): HTTP client. secret (:obj:`str`, optional): Decryption key. Will use STORE_SECRET as default if None. Defaults to: None. Returns: :obj:`str` """ secret = STORE_SECRET if secret is None else secret if secret.startswith("B####"): secret = utils.tools.b64_decode(secret[5:]) return "{}__{}".format(http_client.certify.store.pem, secret)