# -*- coding: utf-8 -*-
"""PyTan prompting module."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import colorama
import getpass
import logging
import os
import six
import sys
import re
colorama.init()
[docs]class ColorWrap(object):
"""Wrapper for colorama."""
MODE_MAP = {"FORE": colorama.Fore, "BACK": colorama.Back, "STYLE": colorama.Style}
""":obj:`dict`: Map of modes this wrapper can expose."""
MODE = ""
""":obj:`str`: Mode to wrap around colorama."""
[docs] def __init__(self, mode=""):
"""Constructor.
Args:
mode (:obj:`str`, optional):
One of keys in :attr:`MODE_MAP`.
Defaults to: "".
"""
self.MODE = mode
""":obj:`str`: Mode to wrap around colorama."""
[docs] def __getattr__(self, attr):
"""Get an attribute from colorama.
Args:
attr (:obj:`str`):
If :attr:`MODE` is in `MODE_MAP`, get uppercased attr from MODE.
Returns an empty string if MODE is not in MODE_MAP or attr is not
available in MODE_CLS from MODE_MAP[MODE].
Returns:
:obj:`str`
"""
mode = super(ColorWrap, self).__getattribute__("MODE")
mode_map = super(ColorWrap, self).__getattribute__("MODE_MAP")
mode_cls = mode_map.get(mode.upper(), None)
attr = "RESET_ALL" if attr.upper() == "RESET" else attr.upper()
color = getattr(mode_cls, attr, "")
return color
[docs]class Promptness(object):
"""Prompt utility class."""
YES_VALUES = ["y.*", "true", "1"]
""":obj:`list` of :obj:`str`: Valid regex values for truthy-ness."""
NO_VALUES = ["n.*", "false", "0"]
""":obj:`list` of :obj:`str`: Valid regex values for falsey-ness."""
USE_COLOR = True
""":obj:`bool`: Use color in :meth:`prepare`."""
OVERRIDES = None
""":obj:`dict`: Default overrides for :meth:`Promptness.ask_dict`."""
TMPLS = {
"prompt": ("\n{{f.cyan}}{text}{{s.reset}} [default: {default}]: "),
"value": "{{s.reset}}{{f.green}}'{value}'{{s.reset}}",
"secure_value": "{{s.reset}}{{f.red}}..HIDDEN..{{s.reset}}",
"env_value": (
"\n{{f.green}}OS Environment Variable {env!r} has value {env_value}, "
"{{f.green}}{action} override default {default}{{s.reset}}."
),
"warn_notty": (
"\n\n{{f.red}}No TTY on stream {stream}, using "
"default {default!r} for {text!r}{{s.reset}}\n"
),
"warn_option_invalid": (
"\n-- {{f.red}}Value must be one of: {options!r}{{s.reset}}\n"
),
"warn_nodefault": (
"\n-- {{f.red}}No default defined, value required!{{s.reset}}"
),
"warn_bool_invalid": (
"\n-- {{f.red}}Value must be one of {yes_values!r} or {no_values!r}"
"{{s.reset}}"
),
"warn_int_invalid": ("\n-- {{f.red}}Value must be a valid number{{s.reset}}"),
"warn_text_invalid": (
"\n-- {{f.red}}Value did not match {validate!r}{{s.reset}}"
),
"options": ("\nOptions:{options}\n"),
"option": ("\n {{f.BLUE}}{opt}{{s.reset}}"),
}
""":obj:`dict`: String templates used throughout."""
[docs] def __init__(self, input_stream=sys.stdin, output_stream=sys.stderr, lvl="info"):
"""Constructor.
Args:
input_stream (:obj:`io.IOBase`, optional):
Stream to get input from.
Defaults to: :obj:`sys.stdin`.
output_stream (:obj:`io.IOBase`, optional):
Stream to send output to.
Defaults to: :obj:`sys.stderr`.
lvl (:obj:`str`, optional):
Logging level for this object.
Defaults to: "info".
"""
self.input_stream = input_stream
""":obj:`io.IOBase`: Stream to get input from."""
self.output_stream = output_stream
""":obj:`io.IOBase`: Stream to send output to."""
self.log = logging.getLogger(__name__)
""":obj:`logging.Logger`: Log for this object."""
self.log.setLevel(getattr(logging, lvl.upper()))
[docs] def __str__(self):
"""Show object info.
Returns:
:obj:`str`
"""
bits = [
"input={!r}".format(stream_name(self.input_stream)),
"output={!r}".format(stream_name(self.output_stream)),
]
bits = "({})".format(", ".join(bits))
cls = "{c.__module__}.{c.__name__}".format(c=self.__class__)
return "{cls}{bits}".format(cls=cls, bits=bits)
[docs] def __repr__(self):
"""Show object info.
Returns:
:obj:`str`
"""
return self.__str__()
[docs] def spew(self, text, **kwargs):
"""Print output to stream.
Args:
text (:obj:`str`):
String to print.
**kwargs:
stream (:obj:`io.IOBase`):
Stream print to.
Defaults to: :attr:`output_stream`.
"""
stream = kwargs.get("stream", self.output_stream)
args = {"end": "", "file": stream}
if six.PY3:
args["flush"] = True
print(text, **args)
[docs] def prepare(self, text, **kwargs):
"""Format text with colors.
Args:
text (:obj:`str`):
String to format.
**kwargs:
use_color (:obj:`bool`):
True: Replace fore, back, and style with color codes.
False: Replace fore, back, and style with empty vals.
Defaults to: :attr:`USE_COLOR`.
"""
use_color = kwargs.get("use_color", self.USE_COLOR)
fore = ColorWrap(mode="fore" if use_color else "")
back = ColorWrap(mode="back" if use_color else "")
style = ColorWrap(mode="style" if use_color else "")
return text.format(f=fore, b=back, s=style)
[docs] def get_prompter(self, secure=False):
"""Get a prompt function.
Args:
secure (:obj:`bool`, optional):
Return getpass.getpass instead of input.
Defaults to: False.
Returns:
:obj:`object`
"""
return getpass.getpass if secure else six.moves.input
[docs] def prompt(self, text, default=None, **kwargs):
"""Prompt for a value.
Args:
text (:obj:`str`):
Prompt text from parent for error/warning messages.
default (:obj:`object`, optional):
Default value to use if no value supplied.
Defaults to: None.
**kwargs:
secure (:obj:`bool`, optional):
If value should have its input hidden or not.
Defaults to: False.
check_tty (:obj:`bool`):
Skip TTY checks for input/output stream.
Defaults to: False.
env_var (:obj:`str`):
Replace default with contents of this OS Env.
Defaults to: "".
rest of kwargs:
Passed to :meth:`spew` and :meth:`prepare`.
Raises:
:exc:`NoTtyError`:
If the input or output stream is not attached to a console
and check_tty is False.
Returns:
:obj:`str`
"""
secure = kwargs.pop("secure", False)
check_tty = kwargs.pop("check_tty", True)
env_var = kwargs.pop("env_var", "")
def_tmpl = "secure_value" if secure and default is not None else "value"
def_tmpl = self.TMPLS[def_tmpl]
def_text = def_tmpl.format(value="" if default is None else default)
if env_var:
env_val = os.environ.get(env_var, "")
action = "will" if env_val else "will not"
env_val_tmpl = "secure_value" if secure and env_val else "value"
env_val_tmpl = self.TMPLS[env_val_tmpl]
env_val_text = env_val_tmpl.format(value=env_val)
prompt_env_tmpl = self.TMPLS["env_value"]
prompt_env_text = prompt_env_tmpl.format(
env=env_var, env_value=env_val_text, default=def_text, action=action
)
prompt_env_text = self.prepare(text=prompt_env_text, **kwargs)
self.spew(text=prompt_env_text, **kwargs)
default = env_val if env_val else default
streams = [self.input_stream, self.output_stream]
for stream in streams:
if check_tty and not isatty(stream=stream):
if default is None:
raise NoTtyError(stream=stream, text=text)
w = self.TMPLS["warn_notty"].format(
stream=stream_name(stream), default=default, text=text
)
w = self.prepare(text=w, **kwargs)
self.spew(text=w, **kwargs)
return default
prompt_tmpl = self.TMPLS["prompt"]
prompt_text = prompt_tmpl.format(text=text, default=def_text)
prompt_text = self.prepare(text=prompt_text, **kwargs)
self.spew(text=prompt_text, **kwargs)
v = self.get_prompter(secure=secure)("").strip()
return format(default) if not v and default is not None else v
[docs] def ask_choice(self, text, choices, default=None, attempts=5, **kwargs):
"""Prompt user to select from a list of choices.
Args:
text (:obj:`str`):
Text to use when prompting.
choices (:obj:`list` of :obj:`str`):
List of choices for user to pick from.
default (:obj:`str`, optional):
Default value to use if no value supplied.
Defaults to: None.
attempts (:obj:`int`, optional):
Number of attempts to allow empty/invalid input.
Defaults to: 5.
**kwargs:
Passed to :meth`prepare` and :meth:`spew`.
Raises:
:exc:`InvalidValueError`:
If value supplied is not one of choices.
:exc:`EmptyValueError`:
If no value supplied and no default.
Returns:
:obj:`str`
"""
options = [format(x).lower() for x in choices]
options_cr = "".join(self.TMPLS["option"].format(opt=x) for x in options)
options_csv = ", ".join(options)
pre_text = self.TMPLS["options"].format(options=options_cr)
pre_text = self.prepare(text=pre_text, **kwargs)
invalid_text = self.TMPLS["warn_option_invalid"].format(options=options_csv)
invalid_text = self.prepare(text=invalid_text, **kwargs)
nodefault_text = self.TMPLS["warn_nodefault"].format()
nodefault_text = self.prepare(text=nodefault_text, **kwargs)
invalid = False
for i in range(attempts):
self.spew(text=pre_text, **kwargs)
value = self.prompt(text=text, default=default, **kwargs)
if value:
if value.lower() in options:
return choices[options.index(value.lower())]
self.spew(text=invalid_text, **kwargs)
invalid = True
elif default is None:
self.spew(text=nodefault_text, **kwargs)
if invalid:
raise InvalidValueError(text=text, attempts=attempts)
else:
raise EmptyValueError(text=text, attempts=attempts)
[docs] def ask_bool(self, text, default=None, attempts=5, **kwargs):
"""Prompt user to provide yes or no.
Args:
text (:obj:`str`):
Text to use when prompting.
default (:obj:`bool` or :obj:`str`, optional):
Default value to use if no value supplied.
Defaults to: None.
attempts (:obj:`int`, optional):
Number of attempts to allow empty/invalid input.
Defaults to: 5.
**kwargs:
rest of kwargs:
Passed to :meth:`prepare` and :meth:`spew`.
Raises:
:exc:`InvalidValueError`:
If value supplied is not a valid yes/no string.
:exc:`EmptyValueError`:
If no value supplied and no default.
Returns:
:obj:`bool`
"""
text += " (boolean)"
invalid_text = self.TMPLS["warn_bool_invalid"].format(
yes_values=", ".join(self.YES_VALUES), no_values=", ".join(self.NO_VALUES)
)
invalid_text = self.prepare(text=invalid_text, **kwargs)
nodefault_text = self.TMPLS["warn_nodefault"].format()
nodefault_text = self.prepare(text=nodefault_text, **kwargs)
invalid = False
for i in range(attempts):
value = self.prompt(text=text, default=default, **kwargs)
if value:
if any(re.match(p, value, re.IGNORECASE) for p in self.YES_VALUES):
return True
if any(re.match(p, value, re.IGNORECASE) for p in self.NO_VALUES):
return False
self.spew(text=invalid_text, **kwargs)
invalid = True
elif default is None:
self.spew(text=nodefault_text, **kwargs)
if invalid:
raise InvalidValueError(text=text, attempts=attempts)
else:
raise EmptyValueError(text=text, attempts=attempts)
[docs] def ask_int(self, text, default=None, attempts=5, **kwargs):
"""Prompt user to provide yes or no.
Args:
text (:obj:`str`):
Text to use when prompting.
default (:obj:`int`, optional):
Default value to use if no value supplied.
Defaults to: None.
attempts (:obj:`int`, optional):
Number of attempts to allow empty/invalid input.
Defaults to: 5.
**kwargs:
rest of kwargs:
Passed to :meth:`prompt` and :meth:`prepare`.
Raises:
:exc:`InvalidValueError`:
If value supplied is not an int.
:exc:`EmptyValueError`:
If no value supplied and no default.
Returns:
:obj:`int`
"""
text += " (integer)"
invalid_text = self.TMPLS["warn_int_invalid"].format()
invalid_text = self.prepare(text=invalid_text, **kwargs)
nodefault_text = self.TMPLS["warn_nodefault"].format()
nodefault_text = self.prepare(text=nodefault_text, **kwargs)
invalid = False
for i in range(attempts):
value = self.prompt(text=text, default=default, **kwargs)
if value:
if format(value).isdigit():
return int(value)
self.spew(text=invalid_text, **kwargs)
invalid = True
elif default is None:
self.spew(text=nodefault_text, **kwargs)
if invalid:
raise InvalidValueError(text=text, attempts=attempts)
else:
raise EmptyValueError(text=text, attempts=attempts)
[docs] def ask_str(self, text, default=None, attempts=5, **kwargs):
"""Prompt user to provide yes or no.
Args:
text (:obj:`str`):
Text to use when prompting.
default (:obj:`str`, optional):
Default value to use if no value supplied.
Defaults to: "".
attempts (:obj:`int`, optional):
Number of attempts to allow empty/invalid input.
Defaults to: 5.
**kwargs:
empty_ok (:obj:`bool`, optional):
Empty input is allowed.
Defaults to: False.
validate (:obj:`str`, optional):
Regex string to validate value.
Defaults to: "".
rest of kwargs:
Passed to :meth:`prompt` and :meth:`prepare`.
Raises:
:exc:`EmptyValueError`:
If no value supplied and no default and not empty_ok.
Returns:
:obj:`str`
"""
empty_ok = kwargs.pop("empty_ok", False)
validate = kwargs.pop("validate", "")
text += " (string)"
invalid_text = self.TMPLS["warn_text_invalid"].format(validate=validate)
invalid_text = self.prepare(text=invalid_text, **kwargs)
nodefault_text = self.TMPLS["warn_nodefault"].format()
nodefault_text = self.prepare(text=nodefault_text, **kwargs)
invalid = False
for i in range(attempts):
value = self.prompt(text=text, default=default, **kwargs)
if value:
if validate:
match = re.search(validate, value, re.IGNORECASE)
if match:
return value
self.spew(text=invalid_text, **kwargs)
invalid = True
else:
return value
elif empty_ok:
return value
elif default is None:
self.spew(text=nodefault_text, **kwargs)
continue
if invalid:
raise InvalidValueError(text=text, attempts=attempts)
else:
raise EmptyValueError(text=text, attempts=attempts)
# LATER(!) automagic ask method for method
[docs] def ask_dict(self, asks, overrides=None):
"""Prompt for input from a list of dict.
Args:
asks (:obj:`list` of :obj:`dict`):
List of dicts with valid kwargs for the various ask methods.
overrides: (:obj:`dict`, optional):
Dictionary of key / value pairs to over ride the defaults of asks.
Defaults to: None.
Notes:
Keys for ask dict in asks:
"method", required:
Ask method to use for this ask dict.
"key", required:
Key to store value returned from ask method in return dict.
"text", required:
String to use for prompting.
"choices", required for :meth:`ask_choice`:
List of valid choices for user to pick.
"secure", optional for :meth:`ask_str`:
Use prompt that hides user input while typing.
"default", optional:
Default value to use if user does not provide one.
"attempts", optional:
Number of attempts to allow empty/invalid input.
"check_tty", optional:
Skip TTY checks for input/output stream.
Returns:
:obj:`dict`
"""
over = {}
over.update(self.OVERRIDES or {})
over.update(overrides or {})
values = {}
# need_keys = ["key", "method", "text"]
for ask in asks:
# if not any(key in ask for key in need_keys):
# error = "Must provide keys {keys!r} in {ask!r}"
# error = error.format(keys=need_keys, ask=ask)
# raise PromptError(text=error, attempts=0)
default = ask.get("default", None)
ask["default"] = over.get(ask["key"], default)
values[ask["key"]] = getattr(self, ask["method"])(**ask)
return values
[docs]class PromptError(Exception):
"""Parent exception for any errors when prompting.
Thrown by:
:meth:`Promptness.ask_dict`
"""
msg = "{text}"
[docs] def __init__(self, text, attempts):
"""Constructor.
Args:
text (:obj:`str`):
Text to insert into exception message
attempts (:obj:`int`, optional):
Number of attempts tried before this exception
"""
msg = self.msg.format(text=text, attempts=attempts)
super(PromptError, self).__init__(msg)
[docs]class InvalidValueError(PromptError):
"""Thrown when a user provides an invalid value.
Thrown by:
:meth:`Promptness.ask_choice`
:meth:`Promptness.ask_bool`
:meth:`Promptness.ask_int`
"""
msg = "Invalid value supplied for prompt {text!r} after {attempts} attempts"
[docs]class EmptyValueError(PromptError):
"""Thrown when no default defined and user provides no value.
Thrown by:
:meth:`Promptness.ask_choice`
:meth:`Promptness.ask_bool`
:meth:`Promptness.ask_int`
:meth:`Promptness.ask_str`
"""
msg = "No value supplied for prompt {text!r} after {attempts} attempts"
[docs]class NoTtyError(PromptError):
"""Thrown when a TTY is not attached to a console.
Thrown by:
:meth:`Promptness.prompt`
"""
[docs] def __init__(self, stream, text):
"""Constructor.
Args:
stream (:obj:`io.IOBase`):
Stream that was not attached to a console.
text (:obj:`str`):
Text to insert into exception message
"""
stmpl = "{stream}, {name}, {istty}".format
streams = [sys.stdin, sys.stdout, sys.stderr]
streams = [
stmpl(stream=s, name=stream_name(s), istty=isatty(s)) for s in streams
]
msg = "\n".join(
[
"No TTY on stream {s} and no default value for {text!r}, all streams:",
" " + "\n ".join(streams),
]
)
msg = msg.format(s=stream_name(stream), text=text)
super(PromptError, self).__init__(msg)
[docs]def stream_name(stream):
"""Get the name of a stream.
Args:
stream (:obj:`io.IOBase`):
Stream to get name of.
Returns:
:obj:`str`
"""
try:
return stream.name
except Exception:
return stream.__class__.__name__
[docs]def isatty(stream):
"""Check if a stream is attached to a console.
Args:
stream (:obj:`io.IOBase`):
Stream to check.
Returns:
:obj:`bool`
"""
return stream.isatty() if hasattr(stream, "isatty") else False
promptness = Promptness()
""":obj:`Promptness`: Pre-established object for easy usage."""