Source code for pytan3.results

# -*- coding: utf-8 -*-
"""Result objects that deserialize responses from Tanium API."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import abc

import json
import re
import six

from six.moves import urllib
import xmltodict

from . import exceptions
from .. import utils


[docs]@six.add_metaclass(abc.ABCMeta) class Result(object): """Deserialize API responses from Tanium API."""
[docs] @classmethod @abc.abstractmethod def from_response(cls, api_objects, response, lvl="info"): """Create :obj:`Result` from :obj:`requests.Response`. Args: api_objects (:obj:`pytan3.api_objects.ApiObjects`): API Objects Container to use. response (:obj:`requests.Response`): Response object received from Tanium API request. lvl (:obj:`str`, optional): Logging level. Defaults to: "info". Returns: :obj:`Result` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def error_check(self): """Check for errors in :attr:`response_body_str`. Raises: :exc:`exceptions.ObjectExistsError` :exc:`exceptions.ObjectNotFoundError` :exc:`exceptions.ResponseError` """ raise NotImplementedError # pragma: no cover
@abc.abstractproperty def error_text(self): """Get the error text from :attr:`response_body_str`. Returns: :obj:`str` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def api_objects(self): """Get the API objects container. Returns: :obj:`pytan3.api_objects.ApiObjects` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def url(self): """Get the URL used in request. Returns: :obj:`str` """ raise NotImplementedError # pragma: no cover
@abc.abstractproperty def status_code(self): """Get the status code received in response. Returns: :obj:`int` """ raise NotImplementedError # pragma: no cover @abc.abstractproperty def method(self): """Get the method used in request. Returns: :obj:`str` """ raise NotImplementedError # pragma: no cover @abc.abstractproperty def request_body_str(self): """Get the full request body. Returns: :obj:`str` """ raise NotImplementedError # pragma: no cover @abc.abstractproperty def request_body_obj(self): """Get :attr:`request_body_str` deserialized into a python object. Returns: :obj:`object` """ raise NotImplementedError # pragma: no cover @abc.abstractproperty def request_object_obj(self): """Get the request objects from :attr:`request_body_obj`. Returns: :obj:`dict` """ raise NotImplementedError # pragma: no cover @abc.abstractproperty def response_body_str(self): """Get the full response body. Returns: :obj:`str` """ raise NotImplementedError # pragma: no cover @abc.abstractproperty def response_body_obj(self): """Get :attr:`response_body_str` deserialized into a python object. Returns: :obj:`object` """ raise NotImplementedError # pragma: no cover @abc.abstractproperty def data_obj(self): """Get the response data from :attr:`response_body_obj`. Returns: :obj:`object` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def data_api(self, **kwargs): """Get :attr:`data_obj` deserialized into an ApiModel object. Returns: :obj:`pytan3.api_models.ApiModel` """ raise NotImplementedError # pragma: no cover
@abc.abstractproperty def object_obj(self): """Get the response objects from :attr:`response_body_obj`. Returns: :obj:`dict` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def object_api(self, **kwargs): """Get :attr:`object_obj` deserialized into an ApiModel object. Returns: :obj:`pytan3.api_models.ApiModel` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def str_to_obj(self, text, src, **kwargs): """Deserialize string into a python object. Args: text (:obj:`str`): String to decode into python object src (:obj:`str`): Where text came from, used in error text. Raises: :exc:`exceptions.TextDeserializeError`: Returns: :obj:`object` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def obj_to_api(self, obj, src): """Deserialize a python object into :obj:`pytan3.api_models.ApiModel`. Args: obj (:obj:`object`): Python object to deserialize into a PyTan API object. src (:obj:`str`): Where obj came from, used in error text. Returns: :obj:`pytan3.api_models.ApiModel` """ raise NotImplementedError # pragma: no cover
[docs]class CommonMixin(object): """Shared methods common amongst all :class:`Result`."""
[docs] def __init__( self, api_objects, response_body, request_body, method, url, status_code, origin=None, lvl="info", ): """Constructor. Args: api_objects (:obj:`pytan3.api_objects.ApiObjects`): API Objects Container to use. response_body (:obj:`str`): Response body received from Tanium API. request_body (:obj:`str`): Request body sent to Tanium API. method (:obj:`str`): HTTP method used in request to Tanium API. url (:obj:`str`): URL used in request to Tanium API. status_code (:obj:`int`): Status code in response from Tanium API. origin (:obj:`requests.Response`, optional): Original response object received from Tanium API request. Defaults to: None. lvl (:obj:`str`, optional): Logging level. Defaults to: "info". """ self.log = utils.logs.get_obj_log(obj=self, lvl=lvl) """:obj:`logging.Logger`: Log for this object.""" self._api_objects = api_objects self._response_body_str = response_body self._request_body_str = request_body self._method = method self._url = url self._status_code = status_code self._cache = {} self.origin = origin """:obj:`requests.Response`: Original response object."""
[docs] def __str__(self): """Show object info. Returns: :obj:`str` """ bits = [ "api_objects={!r}".format(self.api_objects), "url={!r}".format(self.url), "method={!r}".format(self.method), "code={!r}".format(self.status_code), ] bits = "(\n {},\n)".format(",\n ".join(bits)) cls = "{c.__module__}.{c.__name__}".format(c=self.__class__) return "{cls}{bits}".format(cls=cls, bits=bits)
[docs] def __repr__(self): """Show object info. Returns: :obj:`str` """ return self.__str__()
[docs] @classmethod def from_response(cls, api_objects, response, lvl="info"): """Create Result from a requests response object. Args: api_objects (:obj:`pytan3.api_objects.ApiObjects`): API Objects Container to use. response (:obj:`requests.Response`): Response object received from Tanium API request. lvl (:obj:`str`, optional): Logging level. Defaults to: "info". Returns: :obj:`Result` """ return cls( api_objects=api_objects, response_body=response.text, request_body=response.request.body, method=response.request.method, url=response.url, status_code=response.status_code, origin=response, lvl=lvl, )
@property def api_objects(self): """Get the API objects container. Returns: :obj:`pytan3.api_objects.ApiObjects` """ return self._api_objects @property def url(self): """Get the URL used in request. Returns: :obj:`str` """ return self._url @property def status_code(self): """Get the status code received in response. Returns: :obj:`int` """ return int(self._status_code) @property def method(self): """Get the method used in request. Returns: :obj:`str` """ return self._method @property def request_body_str(self): """Get the full request body. Returns: :obj:`str` """ return self._request_body_str or "" @property def response_body_str(self): """Get the full response body. Returns: :obj:`str` """ return self._response_body_str or ""
[docs] def obj_to_api(self, api_name, obj, src): """Deserialize a python object into :obj:`pytan3.api_models.ApiModel`. Args: api_name (:obj:`str`): API name of obj. obj (:obj:`object`): Python object to deserialize into a PyTan API object. src (:obj:`str`): Where obj came from, used in error text. Returns: :obj:`pytan3.api_models.ApiModel` """ cls = self.api_objects.cls_by_name(name=api_name) with utils.tools.Timer() as t: if cls.__name__ == "ClientCount": ret = cls(count=obj) elif isinstance(obj, (list, tuple)): ret = cls(*obj) elif isinstance(obj, dict): ret = cls(**obj) # LATER(!) raise exc m = "Deserialized API name {a} obj type {t} from API into {cls!r}, took {e}" m = m.format(a=api_name, t=type(obj), cls=ret.__class__, e=t.elapsed) self.log.debug(m) return ret
[docs] def get_dict_path(self, obj, path, src): """Traverse a dict using a / seperated string. Args: obj (:obj:`dict`): Dictionary to traverse using path. path (:obj:`str`): Nested dictionary keys seperated by / to traverse in obj. src (:obj:`str`): Where obj came from, used in error text. Raises: :exc:`exceptions.DictionaryPathError`: If any error occurs while traversing path in obj. Returns: :obj:`object` """ try: ret = utils.tools.get_dict_path(obj=obj, path=path) except Exception as exc: raise exceptions.DictionaryPathError( result=self, obj=obj, path=path, src=src, exc=exc ) return ret
[docs]class Soap(CommonMixin, Result): """Deserialize SOAP API responses from Tanium API.""" DATA_ROUTES = ["GetResultInfo", "GetResultData", "GetMergedResultData"]
[docs] def __call__(self, raw=False, **kwargs): """Get :attr:`data_api` or :attr:`object_api`. Args: raw (:obj:`bool`, optional): Return the serialized object instead of the object deserialized into an ApiModel. Defaults to: False. **kwargs: rest of kwargs: passed to :meth:`object_api` or :meth:`data_api`. Notes: Will return :meth:`object_api` or :meth:`data_api` by determining if :attr:`command_request` is an object or data request. Returns: :obj:`pytan3.api_models.ApiModel` or :obj:`object` """ self.error_check() is_result_data = self.command_request in self.DATA_ROUTES if is_result_data: _raw = self.data_obj _api = self.data_api else: _raw = self.object_obj _api = self.object_api return _raw if raw else _api(**kwargs)
@property def error_map(self): """Map of status codes to regex patterns and exceptions. Notes: Keys should be :obj:`int` of a status code to check patterns of. Values should be :obj:`list` of [pattern, exc], where pattern is a regex pattern to search response body, and exc is exception to throw if pattern match is found. Raises: :exc:`exceptions.ObjectExistsError` :exc:`exceptions.ObjectNotFoundError` :exc:`exceptions.ResponseError` Returns: :obj:`dict` """ ret = {} ret[200] = [ [r"400.*already.*exists", exceptions.ObjectExistsError], [r"NotUnique", exceptions.ObjectExistsError], [r"400.*not.*found", exceptions.ObjectNotFoundError], [r"404.*not.*found", exceptions.ObjectNotFoundError], ] return ret
[docs] def error_check(self): """Check for errors in :attr:`CommonMixin.response_body_str`. Raises: :exc:`exceptions.ObjectExistsError` :exc:`exceptions.ObjectNotFoundError` :exc:`exceptions.ResponseError` """ self.error_check_response() self.error_check_code()
[docs] def error_check_response(self): """Check :attr:`Soap.command_response` for errors. Notes: For each key value pair in :attr:`error_map`, if key matches :attr:`CommonMixin.status_code` or key is None, the patterns in value are checked against response body, and if matched the patterns associated exception is thrown. """ for code, checks in self.error_map.items(): if code is None or self.status_code == code: for check in checks: pattern, error_exc = check pattern_re = re.compile(pattern, re.IGNORECASE | re.DOTALL) found = pattern_re.search(self.command_response) if found: raise error_exc(result=self, error=self.error_text) if self.command_request != self.command_response: error = "Request {req!r} does not match response:\n{resp}" error = error.format(resp=self.error_text, req=self.command_request) raise exceptions.ResponseError(self, error)
[docs] def error_check_code(self): """Check if :attr:`CommonMixin.status_code` is one of :attr:`valid_codes`. Raises: :exc:`exceptions.ResponseError` """ if self.status_code not in self.valid_codes: error = "Response status code {c} is not one of {v}, error text:\n{e}" error = error.format( c=self.status_code, v=self.valid_codes, e=self.error_text ) raise exceptions.ResponseError(result=self, error=error)
@property def valid_codes(self): """List of valid status codes. Returns: :obj:`list` of :obj:`int` """ return [200] @property def error_text(self): """Get the error text from :attr:`CommonMixin.response_body_str`. Returns: :obj:`str` """ return ", ".join([x for x in self.command_response.splitlines() if x]) @property def command_request(self): """Get the "command" element from :attr:`request_body_obj`. Returns: :obj:`str` """ obj = self.request_body_obj path = "soap:Envelope/soap:Body/t:tanium_soap_request/command" src = "SOAP API deserialized request body" return self.get_dict_path(obj=obj, path=path, src=src) @property def command_response(self): """Get the "command" element from :attr:`response_body_obj`. Returns: :obj:`str` """ obj = self.response_body_obj path = "soap:Envelope/soap:Body/t:return/command" src = "SOAP API deserialized response body" return self.get_dict_path(obj=obj, path=path, src=src) @property def request_body_obj(self): """Deserialize :attr:`CommonMixin.request_body_str` into a python object. Returns: :obj:`dict` """ key = "request_body_obj" if key not in self._cache: text = self.request_body_str src = "SOAP API request body" self._cache[key] = self.str_to_obj( text=text, src=src, try_int=True, use_dict=True, flat_attrs=True ) return self._cache[key] @property def request_object_obj(self): """Get the request objects from :attr:`request_body_obj`. Returns: :obj:`dict` """ obj = self.request_body_obj path = "soap:Envelope/soap:Body/t:tanium_soap_request/object_list" src = "SOAP API deserialized request body" return self.get_dict_path(obj=obj, path=path, src=src) @property def response_body_obj(self): """Get :attr:`CommonMixin.response_body_str` deserialized into a python object. Returns: :obj:`dict` """ key = "response_body_obj" if key not in self._cache: text = self.response_body_str src = "SOAP API response body" self._cache[key] = self.str_to_obj( text=text, src=src, try_int=True, use_dict=True, flat_attrs=True ) return self._cache[key] @property def data_xml(self): """Get the "ResultXML" element from :attr:`response_body_obj`. Returns: :obj:`str` """ obj = self.response_body_obj path = "soap:Envelope/soap:Body/t:return/ResultXML" src = "SOAP API deserialized response body" data = self.get_dict_path(obj=obj, path=path, src=src) return data or "" @property def data_obj(self): """Get the response data from :attr:`data_xml`. Returns: :obj:`object` """ key = "data_obj" if key not in self._cache: text = self.data_xml src = "'ResultXML' element from SOAP API deserialized response body" self._cache[key] = self.str_to_obj( text=text, src=src, try_int=False, use_dict=True, flat_attrs=True ) return self._cache[key]
[docs] def data_api(self, **kwargs): """Get :attr:`data_obj` deserialized into an ApiModel object. Args: **kwargs: rest of kwargs: passed to :meth:`CommonMixin.obj_to_api` Returns: :obj:`pytan3.api_models.ApiModel` """ if self.command_request not in self.DATA_ROUTES: error = "Route {req!r} is not one of {data}, not a data request?" error = error.format(req=self.command_request, data=self.DATA_ROUTES) raise exceptions.ApiWrongRequestType(result=self, error=error) kwargs["src"] = "Result data from 'ResultXML' element in SOAP response" kwargs["api_name"] = list(self.data_obj.keys())[0] kwargs["obj"] = list(self.data_obj.values())[0] return self.obj_to_api(**kwargs)
@property def object_obj(self): """Get the response objects from :attr:`response_body_obj`. Returns: :obj:`dict` """ obj = self.response_body_obj path = "soap:Envelope/soap:Body/t:return/result_object" src = "'result_object' element from SOAP API deserialized response body" data = self.get_dict_path(obj=obj, path=path, src=src) return data or {}
[docs] def object_api(self, **kwargs): """Get :attr:`object_obj` deserialized into an ApiModel object. Args: **kwargs: rest of kwargs: passed to :meth:`CommonMixin.obj_to_api` Returns: :obj:`pytan3.api_models.ApiModel` """ if self.command_request in self.DATA_ROUTES: error = "Route {route!r} is not one of {data}, not a data request?" error = error.format(route=self.command_request, data=self.DATA_ROUTES) raise exceptions.ApiWrongRequestType(result=self, error=error) kwargs["src"] = "Result object from 'result_object' element in SOAP response" kwargs["api_name"] = list(self.object_obj.keys())[0] kwargs["obj"] = list(self.object_obj.values())[0] return self.obj_to_api(**kwargs)
[docs] def str_to_obj( self, text, src, use_dict=True, try_int=True, flat_attrs=True, **kwargs ): """Deserialize string into a python object. Args: text (:obj:`str`): String to decode into python object src (:obj:`str`): Where text came from, used in error text. use_dict (:obj:`bool`): Use :obj:`dict` instead of :obj:`collections.OrderedDict` when deserializing. Defaults to: True. try_int (:obj:`bool`): Try to convert str into int when deserializing. Defaults to: True. flat_attrs (:obj:`bool`): Make attr_prefix "" and cdata_key "text". Defaults to: True. **kwargs: rest of kwargs: Passed to xmltodict.parse. Raises: :exc:`exceptions.TextDeserializeError`: If an exception is thrown from xmltodict.parse. Returns: :obj:`dict` """ if use_dict: kwargs.setdefault("dict_constructor", dict) if try_int: kwargs.setdefault("postprocessor", try_int_xml) if flat_attrs: kwargs.setdefault("attr_prefix", "") kwargs.setdefault("cdata_key", "text") with utils.tools.Timer() as t: if text: try: ret = xmltodict.parse(text, **kwargs) except Exception as exc: raise exceptions.TextDeserializeError( result=self, text=text, src=src, exc=exc ) else: ret = {} size = utils.tools.human_size(size=len(text)) m = "Finished deserializing {src} into {t}, {size} took {e}" m = m.format(size=size, src=src, t=type(ret), e=t.elapsed) self.log.debug(m) return ret
[docs]class Rest(CommonMixin, Result): """Deserialize REST API responses from Tanium API.""" MANUAL_OBJECT_ROUTES = { "audit_logs": "audit_logs", # audit_logs needs to be an AuditLogList object # without this, it will be resolved to an AuditLog object "parse_question": "parse_question_results", # Use manual API object ParseQuestionResultList } """:obj:`dict`: Manual route defs for :meth:`object_api`.""" DATA_ROUTES = { "result_info": "result_infos", "result_data": "result_sets", "merged_result_data": "merged_result_set", } """:obj:`dict`: Manual route defs for :meth:`data_api`."""
[docs] def __call__(self, raw=False, **kwargs): """Get :attr:`data_api` or :attr:`object_api`. Args: raw (:obj:`bool`, optional): Return the serialized object instead of the object deserialized into an ApiModel. Defaults to: False. **kwargs: rest of kwargs: passed to :meth:`object_api` or :meth:`data_api`. Notes: Will return :meth:`object_api` or :meth:`data_api` by determining if :attr:`RestUrlPath.route` is an object or data request. Returns: :obj:`pytan3.api_models.ApiModel` or :obj:`object` """ self.error_check() is_result_data = self.url_path.route in self.DATA_ROUTES if is_result_data: _raw = self.data_obj _api = self.data_api else: _raw = self.object_obj _api = self.object_api return _raw if raw else _api(**kwargs)
@property def error_map(self): """Map of status codes to regex patterns and exceptions. Notes: Keys should be :obj:`int` of a status code to check patterns of. Values should be :obj:`list` of [pattern, exc], where pattern is a regex pattern to search response body, and exc is exception to throw if pattern match is found. Raises: :exc:`exceptions.ObjectExistsError` :exc:`exceptions.ObjectNotFoundError` :exc:`exceptions.ResponseError` Returns: :obj:`dict` """ ret = {} ret[400] = [ [r"400.*already.*exists", exceptions.ObjectExistsError], [r"NotUnique", exceptions.ObjectExistsError], ] ret[404] = [[r".*", exceptions.ObjectNotFoundError]] return ret
[docs] def error_check(self): """Check for errors in :attr:`CommonMixin.response_body_str`. Raises: :exc:`exceptions.ObjectExistsError` :exc:`exceptions.ObjectNotFoundError` :exc:`exceptions.ResponseError` """ self.error_check_response() self.error_check_code()
[docs] def error_check_response(self): """Check :attr:`CommonMixin.response_body_str` for errors. Notes: For each key value pair in :attr:`error_map`, if key matches :attr:`CommonMixin.status_code` or key is None, the patterns in value are checked against response body, and if matched the patterns associated exception is thrown. """ for code, checks in self.error_map.items(): if code is None or self.status_code == code: for check in checks: pattern, error_exc = check pattern_re = re.compile(pattern, re.IGNORECASE | re.DOTALL) found = pattern_re.search(self.response_body_str) if found: raise error_exc(result=self, error=self.error_text)
[docs] def error_check_code(self): """Check if :attr:`CommonMixin.status_code` is one of :attr:`valid_codes`. Raises: :exc:`exceptions.ResponseError` """ if self.status_code not in self.valid_codes: error = "Response status code {c} is not one of {v}, error text:\n{e}" error = error.format( c=self.status_code, v=self.valid_codes, e=self.error_text ) raise exceptions.ResponseError(result=self, error=error)
@property def valid_codes(self): """List of valid status codes. Returns: :obj:`list` of :obj:`int` """ return [200] @property def error_text(self): """Get the error text from :attr:`CommonMixin.response_body_str`. Returns: :obj:`str` """ try: text = self.response_body_obj["text"] return ", ".join([x for x in text.splitlines() if x]) except Exception: return self.response_body_obj @property def url_path(self): """Parse :attr:`CommonMixin.url` using :class:`RestUrlPath`. Returns: (:obj:`RestUrlPath`) """ return RestUrlPath(url=self.url) @property def request_body_obj(self): """Deserialize :attr:`CommonMixin.request_body_str` into a python object. Returns: :obj:`dict` """ key = "request_body_obj" if key not in self._cache: text = self.request_body_str or "{}" src = "REST API request body" self._cache[key] = self.str_to_obj(text=text, src=src) return self._cache[key] @property def request_object_obj(self): """Get the request objects from :attr:`request_body_obj`. Returns: :obj:`dict` """ return self.request_body_obj @property def response_body_obj(self): """Get :attr:`CommonMixin.response_body_str` deserialized into a python object. Returns: :obj:`dict` """ key = "response_body_obj" if key not in self._cache: text = self.response_body_str or "{}" src = "REST API response body" self._cache[key] = self.str_to_obj(text=text, src=src) return self._cache[key] @property def data_obj(self): """Get the response data from :attr:`response_body_obj`. Returns: :obj:`dict` """ obj = self.response_body_obj path = "data" src = "'data' key from REST API deserialized response body" data = self.get_dict_path(obj=obj, path=path, src=src) return data or {}
[docs] def data_api(self, **kwargs): """Get :attr:`data_obj` deserialized into an ApiModel object. Args: **kwargs: rest of kwargs: passed to :meth:`CommonMixin.obj_to_api` Returns: :obj:`pytan3.api_models.ApiModel` """ route = self.url_path.route if route not in self.DATA_ROUTES: error = "Route {route!r} is not one of {data}, not a data request?" error = error.format(route=route, data=self.DATA_ROUTES) raise exceptions.ApiWrongRequestType(result=self, error=error) override = self.DATA_ROUTES[route] cls = self.api_objects.cls_by_name(name=override) kwargs["src"] = "Result object from 'data' key in REST response" kwargs["api_name"] = cls.API_NAME kwargs["obj"] = self.data_obj return self.obj_to_api(**kwargs)
@property def object_obj(self): """Get the response objects from :attr:`response_body_obj`. Returns: :obj:`dict` """ obj = self.response_body_obj path = "data" src = "'data' key from REST API deserialized response body" data = self.get_dict_path(obj=obj, path=path, src=src) return data or {}
[docs] def object_api(self, **kwargs): """Get :attr:`object_obj` deserialized into an ApiModel object. Args: **kwargs: rest of kwargs: passed to :meth:`CommonMixin.obj_to_api` Returns: :obj:`pytan3.api_models.ApiModel` """ """ REST notes: - given path: /api/v2/actions - if GET and no target, returns `[{}, {}]` == ActionList() - if GET and target, returns `{}` == Action() - if POST, PATCH, DELETE, returns `{}` == Action() - given path: /api/v2/audit_logs - requires GET with target, returns `{"entries": [{}, {}]}` == AuditLogList() - action_stops has no GET without selector, so can't get all """ route = self.url_path.route target = self.url_path.target if route in self.DATA_ROUTES: error = "Route {route!r} is one of {data}, not an object request?" error = error.format(route=route, data=self.DATA_ROUTES) raise exceptions.ApiWrongRequestType(result=self, error=error) if route in self.MANUAL_OBJECT_ROUTES: override = self.MANUAL_OBJECT_ROUTES[route] cls = self.api_objects.cls_by_name(name=override) kwargs["api_name"] = cls.API_NAME elif self.method == "GET" and target: # GET against target # a single item of type 'route' returned cls = self.api_objects.cls_by_name(name=route) kwargs["api_name"] = cls.API_ITEM_CLS.API_NAME elif self.method in ["POST", "PATCH", "DELETE"]: # POST/PATCH/DELETE # a single item of type 'route' returned cls = self.api_objects.cls_by_name(name=route) kwargs["api_name"] = cls.API_ITEM_CLS.API_NAME else: # no target and not POST/PATCH/DELETE # a list item type of 'route' returned cls = self.api_objects.cls_by_name(name=route) kwargs["api_name"] = cls.API_NAME src = "Result object from 'data' key in REST route {r}" src = src.format(r=route) kwargs["src"] = src kwargs["obj"] = self.object_obj return self.obj_to_api(**kwargs)
[docs] def str_to_obj(self, text, src, **kwargs): """Deserialize string into a python object. Args: text (:obj:`str`): String to decode into python object src (:obj:`str`): Where text came from, used in error text. **kwargs: rest of kwargs: Passed to :func:`json.loads`. Raises: :exc:`exceptions.TextDeserializeError`: If an exception is thrown from :func:`json.loads`. Returns: :obj:`object` """ with utils.tools.Timer() as t: try: ret = json.loads(text, **kwargs) except Exception as exc: raise exceptions.TextDeserializeError( result=self, text=text, src=src, exc=exc ) size = utils.tools.human_size(size=len(text)) m = "Finished deserializing {src} into {t}, {size} took {e}" m = m.format(size=size, src=src, t=type(ret), e=t.elapsed) self.log.debug(m) return ret
[docs]class RestUrlPath(object): """Parser to get path parts from a REST API URL."""
[docs] def __init__(self, url): """Constructor. Args: url (:obj:`str`): URL to parse. Raises: :exc:`exceptions.ModuleError`: If :attr:`version` and :attr:`route` can not be parsed from :attr:`path_parts`. """ self.url_original = url """:obj:`str`: Original URL supplied.""" self.url = urllib.parse.unquote(url) """:obj:`str`: Percent decoded version of :attr:`url_original`.""" self.url_path = urllib.parse.urlparse(self.url) """:obj:`urllib.parse.ParseResult`: Parsed version of :attr:`url`.""" self.path_parts = self.url_path.path.lstrip("/").split("/") """:obj:`list` of :obj:`str`: Parts of URL split on /.""" self.version = 0 """:obj:`int`: REST API version from :attr:`path_parts`.""" self.route = "" """:obj:`str`: REST API route from :attr:`path_parts`.""" self.target = "" """:obj:`str`: If found, target supplied for :attr:`route` in :attr:`url`.""" self.by_name = False """:obj:`bool`: If :attr:`target` is doing targeting by name or by id.""" try: # first part should be "/api" self.path_parts.pop(0) # second part should be REST API version, ala "/v2" self.version = int(self.path_parts.pop(0).lstrip("v")) # third part should be route, ala "/users" self.route = self.path_parts.pop(0) except Exception as exc: error = "Failed to parse REST URL path as {p!r} from url {u!r}, error: {e}" error = error.format(u=self.url, e=exc, p="/api/v2/route") raise exceptions.ModuleError(error) self.by_name = any(x == "by-name" for x in self.path_parts) for idx, x in enumerate(self.path_parts[:]): if x == "by-name": value = self.path_parts.pop(idx + 1) self.path_parts.pop(idx) self.target = value break elif x.isdigit(): self.target = self.path_parts.pop(idx) break
[docs] def __str__(self): """Show object info. Returns: :obj:`str` """ bits = [ "version={!r}".format(self.version), "route={!r}".format(self.route), "target={!r}".format(self.target), "by-name={!r}".format(self.by_name), "leftover_parts={!r}".format(self.path_parts), ] bits = "({})".format(", ".join(bits)) cls = "{c.__module__}.{c.__name__}".format(c=self.__class__) return "{cls}{bits}".format(cls=cls, bits=bits)
[docs] def __repr__(self): """Show object info. Returns: :obj:`str` """ return self.__str__()
[docs]def try_int_xml(path, key, value): """Parser hook for xmltodict.parse to try to convert str to int. Args: path (:obj:`list`): Path from root of document to key. key (:obj:`str`): Element name being parsed. value (:obj:`object`): Element value of key being parsed. Returns: (:obj:`str`, :obj:`object`): original key supplied and value as int if successfully converted. """ if isinstance(value, six.string_types): try: return key, int(value) except (ValueError, TypeError): return key, value return key, value