Source code for pytan3.adapters

# -*- coding: utf-8 -*-
"""Adapter objects that serialize requests to the Tanium API."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import abc
import json
import re
import six
import warnings
import xmltodict

from . import exceptions
from .. import api_models
from .. import results
from .. import utils

DEFAULT_NAME = "soap"
""":obj:`str`: Default :class:`Adapter` name to load in :func:`load`."""

DEFAULT_TYPE = "soap"
""":obj:`str`: Default :class:`Adapter` type to load in :func:`load_type`."""


[docs]@six.add_metaclass(abc.ABCMeta) class Adapter(object): """Abstract base class for all Adapters.""" @abc.abstractproperty def api_objects(self): """Get the API objects container. Returns: :obj:`pytan3.api_objects.ApiObjects` """ raise NotImplementedError # pragma: no cover @abc.abstractproperty def api_client(self): """Get the API client. Returns: :obj:`pytan3.api_clients.ApiClient` """ raise NotImplementedError # pragma: no cover @abc.abstractproperty def http_client(self): """Get the HTTP client. Returns: :obj:`pytan3.http_client.HttpClient` """ raise NotImplementedError # pragma: no cover @abc.abstractproperty def auth_method(self): """Get the Auth Method. Returns: :obj:`pytan3.auth_methods.AuthMethod` """ raise NotImplementedError # pragma: no cover
[docs] @classmethod @abc.abstractmethod def get_name(cls): """Get the ref name of this class for use by :func:`load`. Returns: :obj:`str` """ raise NotImplementedError # pragma: no cover
[docs] @classmethod @abc.abstractmethod def get_type(cls): """Get the ref type of this class for use by :func:`load_type`. Returns: :obj:`str` """ raise NotImplementedError # pragma: no cover
[docs] @classmethod @abc.abstractmethod def get_version_req(cls): """Get the min, max, and eq version requirements of this class. Notes: Dict can specify keys: "vmin", "vmax", "veq". This class method gets called by :func:`pytan3.utils.versions.version_check_obj_req` to perform version checks. Returns: :obj:`dict` """ raise NotImplementedError # pragma: no cover
@abc.abstractproperty def result_cls(cls): """Get the result deserializer class. Returns: :class:`pytan3.results.Result` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def api_get(self, obj, **kwargs): """Send an API request to get an object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. Returns: :obj:`pytan3.results.Result` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def api_add(self, obj, **kwargs): """Send an API request to add an object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. Returns: :obj:`pytan3.results.Result` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def api_delete(self, obj, **kwargs): """Send an API request to delete an object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. Returns: :obj:`pytan3.results.Result` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def api_update(self, obj, **kwargs): """Send an API request to update an object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. Returns: :obj:`pytan3.results.Result` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def api_get_audit_logs(self, type, target, **kwargs): """Send an API request to get audit logs for an object. Args: type (:obj:`str`): Type of object to get audit logs of. target (:obj:`int`): ID of object type to get audit logs for. Returns: :obj:`pytan3.results.Result` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def api_get_client_count(self, **kwargs): """Send an API request to get the client count. Returns: :obj:`pytan3.results.Result` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def api_parse_question(self, text, **kwargs): """Send an API request to parse text. Args: text (:obj:`str`): Text to parse into question objects. Returns: :obj:`pytan3.results.Result` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def api_add_parsed_question(self, obj, **kwargs): """Send an API request to add a parsed question object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. Returns: :obj:`pytan3.results.Result` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def api_get_result_info(self, obj, **kwargs): """Send an API request to get result info for an object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. Returns: :obj:`pytan3.results.Result` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def api_get_result_data(self, obj, **kwargs): """Send an API request to get result data for an object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. Returns: :obj:`pytan3.results.Result` """ raise NotImplementedError # pragma: no cover
[docs] @abc.abstractmethod def api_get_merged_result_data(self, objlist, **kwargs): """Send an API request to get merged result data for a list of objects. Args: objlist (:obj:`list`): List of API Objects to use for request. Returns: :obj:`pytan3.results.Result` """ raise NotImplementedError # pragma: no cover
[docs]class Soap(Adapter): """Tanium SOAP request adapter.""" DEFAULT_OPTIONS = {"json_pretty_print": True, "include_hashes_flag": True} """:obj:`dict`: Default options to use in :meth:`build_options_from_kwargs`.""" AUDIT_LOG_TYPES = [ "authentication", "content_set", "content_set_role", "content_set_role_privilege", "dashboard", "dashboard_group", "group", "package_spec", "plugin_schedule", "saved_action", "saved_question", "sensor", "system_setting", "user", "user_group", "white_listed_url", ] """:obj:`list` of :obj:`str`: Valid types for :meth:`api_get_audit_logs`."""
[docs] def __init__(self, api_client, api_objects, ver_check=True, lvl="info"): """Constructor. Args: api_client (:obj:`pytan3.api_clients.ApiClient`): Client to use for sending API requests. api_objects (:obj:`pytan3.api_objects.ApiObjects`): API objects container to use for this adapter. ver_check (:obj:`bool`, optional): Perform version checks against :func:`pytan3.api_clients.get_version`. Defaults to: True. lvl (:obj:`str`, optional): Logging level. Defaults to: "info". """ self.log = utils.logs.get_obj_log(obj=self, lvl=lvl) """:obj:`logging.Logger`: Log.""" self._api_objects = api_objects self._api_client = api_client check_adapter_types(self) if ver_check and any(self.get_version_req().values()): check_adapter_version(self)
[docs] def __str__(self): """Show object info. Returns: :obj:`str` """ bits = [ "type={!r}".format(self.get_type()), "api_objects={!r}".format(self.api_objects), "api_client={!r}".format(self.api_client), "http_client={!r}".format(self.http_client), "auth_method={!r}".format(self.auth_method), ] bits = "(\n {},\n)".format(",\n ".join(bits)) cls = "{c.__module__}.{c.__name__}".format(c=self.__class__) return "{cls}{bits}".format(cls=cls, bits=bits)
[docs] def __repr__(self): """Show object info. Returns: :obj:`str` """ return self.__str__()
@property def api_objects(self): """Get the API objects container. Returns: :obj:`pytan3.api_objects.ApiObjects` """ return self._api_objects @property def api_client(self): """Get the API client. Returns: :obj:`pytan3.api_clients.ApiClient` """ return self._api_client @property def http_client(self): """Get the HTTP client. Returns: :obj:`pytan3.http_client.HttpClient` """ return self.api_client.http_client @property def auth_method(self): """Get the Auth Method. Returns: :obj:`pytan3.auth_methods.AuthMethod` """ return self.api_client.auth_method
[docs] @classmethod def get_name(cls): """Get the ref name of this class for use by :func:`load`. Returns: :obj:`str` """ return "soap"
[docs] @classmethod def get_type(cls): """Get the ref type of this class for use by :func:`load_type`. Returns: :obj:`str` """ return "soap"
[docs] @classmethod def get_version_req(cls): """Get the min, max, and eq version requirements of this class. Notes: Dict can specify keys: "vmin", "vmax", "veq". This class method gets called by :func:`pytan3.utils.versions.version_check_obj_req` to perform version checks. Returns: :obj:`dict` """ return {"vmin": "", "vmax": "", "veq": ""}
@property def result_cls(cls): """Get the result deserializer class. Returns: :class:`pytan3.results.Result` """ return results.Soap
[docs] def build_options_from_kwargs(self, **kwargs): """Build an Options API object from kwargs and return the serialized form. Args: **kwargs: options_obj (:obj:`pytan3.api_models.ApiItem`): A pre-established Options object. Defaults to: new Options object from :attr:`api_objects`. rest of kwargs: Set on Options object if key is an attr on object and attrs value is None. Notes: Will set :attr:`DEFAULT_OPTIONS` as defaults to kwargs before applying values to Options object attributes. Returns: :obj:`dict` """ default_options = getattr(self, "DEFAULT_OPTIONS", {}) or {} for k, v in default_options.items(): kwargs.setdefault(k, v) opts = kwargs.pop("options_obj", self.api_objects.Options()) check_object_type(obj=opts, types=(self.api_objects.Options,)) for k in list(kwargs): if hasattr(opts, k) and getattr(opts, k, None) is None: setattr(opts, k, kwargs[k]) return opts.serialize(wrap_name=False)
[docs] def send(self, obj, cmd, **kwargs): """Build and send a SOAP API request. Args: obj (:obj:`pytan3.api_models.ApiModel`): ApiModel to serialize and send as part of request. cmd (:obj:`str`): SOAP Command to use in request. **kwargs: body_re_limit (:obj:`int`): Value to limit regex search of response body for <session> tag. Defaults to: 4000. empty (:obj:`bool`): Include attributes that have a value of None when serializing. Defaults to: False list_attrs (:obj:`bool`): Include simple attributes of :obj:`pytan3.api_models.ApiList` when serializing. Defaults to: False. exclude_attrs (:obj:`list` of :obj:`str`): Exclude these attributes when serializing. Defaults to: []. only_attrs (:obj:`list` of :obj:`str`): Include only these attributes when serializing. Defaults to: []. wrap_name (:obj:`bool`): Wrap the return in another dict whose key is set to the API name. Defaults to: True. wrap_item_attr (:obj:`bool`): Wrap list items in dict whose key is set to the API list item attribute. Defaults to: True. rest of kwargs: Passed to :meth:`build_options_from_kwargs`. Raises: :obj:`exceptions.SessionNotFoundWarning`: If the <session> tag can not be found in the response body. Returns: :obj:`pytan3.results.Result` """ limit = kwargs.pop("body_re_limit", 4000) sargs = { "only_attrs": kwargs.pop("only_attrs", []), "exclude_attrs": kwargs.pop("exclude_attrs", []), "empty": kwargs.pop("empty", False), "list_attrs": kwargs.pop("list_attrs", False), "wrap_name": kwargs.pop("wrap_name", True), "wrap_item_attr": kwargs.pop("wrap_item_attr", True), } obj = obj.serialize(**sargs) if isinstance(obj, api_models.ApiModel) else obj opts = self.build_options_from_kwargs(**kwargs) request_dict = soap_envelope(cmd=cmd, obj=obj, opts=opts) request_body = serialize_xml(obj=request_dict) response = self.api_client(data=request_body) try: auth_token = re_soap_tag(text=response.text, tag="session", limit=limit) except Exception: auth_token = "" if auth_token: self.api_client.auth_method.token = auth_token else: error = "XML tag 'session' not in {limit} characters of SOAP response body" error = error.format(limit="the first {}".format(limit) or "ALL") warnings.warn(error, exceptions.SessionNotFoundWarning) return self.result_cls.from_response( api_objects=self.api_objects, response=response, lvl=self.log.level )
[docs] def api_get(self, obj, **kwargs): """Send an API request to get an object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. **kwargs: rest of kwargs: Passed to :meth:`send`. Returns: :obj:`pytan3.results.Result` """ check_object_type( obj=obj, types=(self.api_objects.ApiItem, self.api_objects.ApiList) ) kwargs["cmd"] = "GetObject" return self.send(obj=obj, **kwargs)
[docs] def api_add(self, obj, **kwargs): """Send an API request to add an object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. **kwargs: rest of kwargs: Passed to :meth:`send`. Returns: :obj:`pytan3.results.Result` """ check_object_type(obj=obj, types=(self.api_objects.ApiItem,)) kwargs["cmd"] = "AddObject" kwargs["exclude_attrs"] = ["id"] return self.send(obj=obj, **kwargs)
[docs] def api_delete(self, obj, **kwargs): """Send an API request to delete an object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. **kwargs: rest of kwargs: Passed to :meth:`send`. Returns: :obj:`pytan3.results.Result` """ check_object_type(obj=obj, types=(self.api_objects.ApiItem,)) check_object_attrs(obj=obj, attrs=["id", "name"]) kwargs["cmd"] = "DeleteObject" return self.send(obj=obj, **kwargs)
[docs] def api_update(self, obj, **kwargs): """Send an API request to update an object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. **kwargs: rest of kwargs: Passed to :meth:`send`. Returns: :obj:`pytan3.results.Result` """ check_object_type(obj=obj, types=(self.api_objects.ApiItem,)) check_object_attrs(obj=obj, attrs=["id", "name"]) kwargs["cmd"] = "UpdateObject" return self.send(obj=obj, **kwargs)
[docs] def api_get_audit_logs(self, type, target, **kwargs): """Send an API request to get audit logs for an object. Args: type (:obj:`str`): Type of object to get audit logs of. target (:obj:`int`): ID of object type to get audit logs for. SOAP allows target of 'None' to get all objects of `type`. **kwargs: count (:obj:`int`): Limit number of audit logs returned to this. Defaults to: 1. rest of kwargs: Passed to :meth:`send`. Raises: :exc:`exceptions.InvalidTypeError`: If type is not one of :attr:`AUDIT_LOG_TYPES`. Returns: :obj:`pytan3.results.Result` """ if type not in self.AUDIT_LOG_TYPES: error = "Invalid object type {ot} - MUST be one of {at}" error = error.format(at=self.AUDIT_LOG_TYPES, ot=type) raise exceptions.InvalidTypeError(error) obj = self.api_objects.AuditLog(type="{t}_audit".format(t=type), id=target) kwargs.setdefault("audit_history_size", kwargs.pop("count", 1)) kwargs["cmd"] = "GetObject" return self.send(obj=obj, **kwargs)
[docs] def api_get_client_count(self, **kwargs): """Send an API request to get the client count. Args: **kwargs: count (:obj:`int`): Number of days to get client count for. Defaults to: 30. rest of kwargs: Passed to :meth:`send`. Returns: :obj:`pytan3.results.Result` """ obj = {"client_count": kwargs.pop("count", 30)} kwargs["cmd"] = "GetObject" return self.send(obj=obj, **kwargs)
[docs] def api_parse_question(self, text, **kwargs): """Send an API request to parse text. Args: text (:obj:`str`): Text to parse into question objects. **kwargs: rest of kwargs: Passed to :meth:`send`. Returns: :obj:`pytan3.results.Result` """ check_object_type(obj=text, types=six.string_types) obj = self.api_objects.ParseJob(question_text=text) kwargs["cmd"] = "AddObject" return self.send(obj=obj, **kwargs)
[docs] def api_add_parsed_question(self, obj, **kwargs): """Send an API request to add a parsed question object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. **kwargs: for_merge (:obj:`bool`): Value for force_computer_id_flag attr on question object. Defaults to: True. rest of kwargs: Passed to :meth:`send`. Returns: :obj:`pytan3.results.Result` """ check_object_type(obj=obj, types=(self.api_objects.ParseResultGroup,)) obj = obj.question obj.force_computer_id_flag = int(kwargs.pop("for_merge", True)) kwargs["cmd"] = "AddObject" return self.send(obj=obj, **kwargs)
[docs] def api_get_result_info(self, obj, **kwargs): """Send an API request to get result info for an object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. **kwargs: rest of kwargs: Passed to :meth:`send`. Returns: :obj:`pytan3.results.Result` """ types = ( self.api_objects.Question, self.api_objects.SavedQuestion, self.api_objects.Action, self.api_objects.SavedAction, ) check_object_type(obj=obj, types=types) check_object_attrs(obj=obj, attrs=["id", "name"]) kwargs["cmd"] = "GetResultInfo" kwargs["only_attrs"] = ["id", "name"] return self.send(obj=obj, **kwargs)
[docs] def api_get_result_data(self, obj, **kwargs): """Send an API request to get result data for an object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. **kwargs: rest of kwargs: Passed to :meth:`send`. Returns: :obj:`pytan3.results.Result` """ types = ( self.api_objects.Question, self.api_objects.SavedQuestion, self.api_objects.Action, self.api_objects.SavedAction, ) check_object_type(obj=obj, types=types) check_object_attrs(obj=obj, attrs=["id", "name"]) kwargs["cmd"] = "GetResultData" kwargs["only_attrs"] = ["id", "name"] return self.send(obj=obj, **kwargs)
[docs] def api_get_merged_result_data(self, objlist, **kwargs): """Send an API request to get merged result data for a list of objects. Args: objlist (:obj:`list`): List of API Objects to use for request. **kwargs: rest of kwargs: Passed to :meth:`send`. Returns: :obj:`pytan3.results.Result` """ list_types = ( list, tuple, self.api_objects.SavedQuestionList, self.api_objects.QuestionList, ) check_object_type(obj=objlist, types=list_types) item_types = (self.api_objects.Question, self.api_objects.SavedQuestion) pobjlist = [objlist] if isinstance(objlist, item_types) else objlist objs = {"question": [], "saved_question": []} only_attrs = ["id", "name", "index", "cache_row_id"] sargs = {"wrap_name": False, "wrap_item_attr": False, "only_attrs": only_attrs} idx = 0 for pobj in pobjlist: sobjlist = [pobj] if isinstance(pobj, item_types) else pobj for sobj in sobjlist: check_object_type(obj=sobj, types=item_types) check_object_attrs(obj=sobj, attrs=["id", "name"]) sobj.index = idx sobj.cache_row_id = getattr(sobj, "cache_row_id", None) or 0 sobj_dict = sobj.serialize(**sargs) objs_target = sobj.API_NAME objs[objs_target].append(sobj_dict) idx += 1 kwargs["cmd"] = "GetMergedResultData" return self.send(obj=objs, **kwargs)
[docs]class Rest(Adapter): """Tanium REST API request adapter.""" DEFAULT_OPTIONS = {"json_pretty_print": True, "include_hashes_flag": True} """:obj:`dict`: Default options to use in :meth:`build_options_from_kwargs`.""" AUDIT_LOG_TYPES = [ # REST routes all need to be pluralized "content_set", "content_set_role", "dashboard", "dashboard_group", "group", "package", # REST docs say "package_spec", but only "packages" route works "plugin_schedule", "saved_action", "saved_question", "sensor", "system_setting", "user", "user_group", "white_listed_url", ] """:obj:`list` of :obj:`str`: Valid types for :meth:`api_get_audit_logs`."""
[docs] def __init__(self, api_client, api_objects, ver_check=True, lvl="info"): """Constructor. Args: api_client (:obj:`pytan3.api_clients.ApiClient`): Client to use for sending API requests. api_objects (:obj:`pytan3.api_objects.ApiObjects`): API objects container to use for this adapter. ver_check (:obj:`bool`, optional): Perform version checks against :func:`pytan3.api_clients.get_version`. Defaults to: True. lvl (:obj:`str`, optional): Logging level. Defaults to: "info". """ self.log = utils.logs.get_obj_log(obj=self, lvl=lvl) """:obj:`logging.Logger`: Log.""" self._api_objects = api_objects self._api_client = api_client check_adapter_types(self) if ver_check and any(self.get_version_req().values()): check_adapter_version(self)
[docs] def __str__(self): """Show object info. Returns: :obj:`str` """ bits = [ "type={!r}".format(self.get_type()), "api_objects={!r}".format(self.api_objects), "api_client={!r}".format(self.api_client), "http_client={!r}".format(self.http_client), "auth_method={!r}".format(self.auth_method), ] bits = "(\n {},\n)".format(",\n ".join(bits)) cls = "{c.__module__}.{c.__name__}".format(c=self.__class__) return "{cls}{bits}".format(cls=cls, bits=bits)
[docs] def __repr__(self): """Show object info. Returns: :obj:`str` """ return self.__str__()
@property def api_objects(self): """Get the API objects container. Returns: :obj:`pytan3.api_objects.ApiObjects` """ return self._api_objects @property def api_client(self): """Get the API client. Returns: :obj:`pytan3.api_clients.ApiClient` """ return self._api_client @property def http_client(self): """Get the HTTP client. Returns: :obj:`pytan3.http_client.HttpClient` """ return self.api_client.http_client @property def auth_method(self): """Get the Auth Method. Returns: :obj:`pytan3.auth_methods.AuthMethod` """ return self.api_client.auth_method
[docs] @classmethod def get_name(cls): """Get the ref name of this class for use by :func:`load`. Returns: :obj:`str` """ return "rest"
[docs] @classmethod def get_type(cls): """Get the ref type of this class for use by :func:`load_type`. Returns: :obj:`str` """ return "rest"
[docs] @classmethod def get_version_req(cls): """Get the min, max, and eq version requirements of this class. Notes: Dict can specify keys: "vmin", "vmax", "veq". This class method gets called by :func:`pytan3.utils.versions.version_check_obj_req` to perform version checks. Returns: :obj:`dict` """ return {"vmin": "7.3.314.3409", "vmax": "", "veq": ""}
@property def result_cls(cls): """Get the result deserializer class. Returns: :class:`pytan3.results.Result` """ return results.Rest
[docs] def build_options_from_kwargs(self, **kwargs): """Build an Options API object from kwargs and return the serialized form. Args: **kwargs: options_obj (:obj:`pytan3.api_models.ApiItem`): A pre-established Options object. Defaults to: new Options object from :attr:`api_objects`. rest of kwargs: Set on Options object if key is an attr on object and attrs value is None. Notes: Will set :attr:`DEFAULT_OPTIONS` as defaults to kwargs before applying values to Options object attributes. Returns: :obj:`dict` """ default_options = getattr(self, "DEFAULT_OPTIONS", {}) or {} for k, v in default_options.items(): kwargs.setdefault(k, v) opts = kwargs.pop("options_obj", self.api_objects.Options()) check_object_type(obj=opts, types=(self.api_objects.Options,)) for k in list(kwargs): if hasattr(opts, k) and getattr(opts, k, None) is None: setattr(opts, k, kwargs[k]) return opts.serialize(wrap_name=False)
[docs] def api_get(self, obj, **kwargs): """Send an API request to get an object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. **kwargs: headers (:obj:`dict`): Headers to supply to request. Defaults to: {}. params (:obj:`dict`): Parameters to supply to request. Defaults to: {}. rest of kwargs: Passed to :meth:`build_options_from_kwargs`. Returns: :obj:`pytan3.results.Result` """ check_object_type( obj=obj, types=(self.api_objects.ApiItem, self.api_objects.ApiList) ) check_object_attrs(obj=obj, attrs=["id", "name"]) headers = kwargs.pop("headers", {}) params = kwargs.pop("params", {}) opts = self.build_options_from_kwargs(**kwargs) headers.setdefault("tanium-options", serialize_json(obj=opts, indent=None)) response = self.api_client( method="get", endpoint=magic_endpoint(obj=obj, auto_target=True, needs_target=False), params=params, headers=headers, ) return self.result_cls.from_response( api_objects=self.api_objects, response=response, lvl=self.log.level )
[docs] def api_add(self, obj, **kwargs): """Send an API request to add an object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. **kwargs: headers (:obj:`dict`): Headers to supply to request. Defaults to: {}. params (:obj:`dict`): Parameters to supply to request. Defaults to: {}. rest of kwargs: Passed to :meth:`build_options_from_kwargs`. Returns: :obj:`pytan3.results.Result` """ check_object_type(obj=obj, types=(self.api_objects.ApiItem,)) headers = kwargs.pop("headers", {}) params = kwargs.pop("params", {}) opts = self.build_options_from_kwargs(**kwargs) headers.setdefault("tanium-options", serialize_json(obj=opts, indent=None)) obj_dict = obj.serialize( exclude_attrs=["id"], wrap_name=False, wrap_item_attr=False ) response = self.api_client( method="post", data=serialize_json(obj=obj_dict), endpoint=magic_endpoint(obj=obj, auto_target=False, needs_target=False), params=params, headers=headers, ) return self.result_cls.from_response( api_objects=self.api_objects, response=response, lvl=self.log.level )
[docs] def api_delete(self, obj, **kwargs): """Send an API request to delete an object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. **kwargs: headers (:obj:`dict`): Headers to supply to request. Defaults to: {}. params (:obj:`dict`): Parameters to supply to request. Defaults to: {}. rest of kwargs: Passed to :meth:`build_options_from_kwargs`. Returns: :obj:`pytan3.results.Result` """ check_object_type(obj=obj, types=(self.api_objects.ApiItem,)) check_object_attrs(obj=obj, attrs=["id"]) headers = kwargs.pop("headers", {}) params = kwargs.pop("params", {}) opts = self.build_options_from_kwargs(**kwargs) headers.setdefault("tanium-options", serialize_json(obj=opts, indent=None)) response = self.api_client( method="delete", endpoint=magic_endpoint(obj=obj, auto_target=True, needs_target=True), params=params, headers=headers, ) return self.result_cls.from_response( api_objects=self.api_objects, response=response, lvl=self.log.level )
[docs] def api_update(self, obj, **kwargs): """Send an API request to update an object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. **kwargs: headers (:obj:`dict`): Headers to supply to request. Defaults to: {}. params (:obj:`dict`): Parameters to supply to request. Defaults to: {}. rest of kwargs: Passed to :meth:`build_options_from_kwargs`. Returns: :obj:`pytan3.results.Result` """ check_object_type(obj=obj, types=(self.api_objects.ApiItem,)) check_object_attrs(obj=obj, attrs=["id", "name"]) headers = kwargs.pop("headers", {}) params = kwargs.pop("params", {}) opts = self.build_options_from_kwargs(**kwargs) headers.setdefault("tanium-options", serialize_json(obj=opts, indent=None)) # LATER(!) no id if id in target, but no name if name in target, rite? obj_dict = obj.serialize( exclude_attrs=["name", "id"], wrap_name=False, wrap_item_attr=False ) response = self.api_client( method="patch", data=serialize_json(obj=obj_dict), endpoint=magic_endpoint(obj=obj, auto_target=True, needs_target=True), params=params, headers=headers, ) return self.result_cls.from_response( api_objects=self.api_objects, response=response, lvl=self.log.level )
[docs] def api_get_audit_logs(self, type, target, **kwargs): """Send an API request to get audit logs for an object. Args: type (:obj:`str`): Type of object to get audit logs of. target (:obj:`int`): ID of object type to get audit logs for. **kwargs: count (:obj:`int`): Limit number of audit logs returned to this. Defaults to: 1. headers (:obj:`dict`): Headers to supply to request. Defaults to: {}. params (:obj:`dict`): Parameters to supply to request. Defaults to: {}. rest of kwargs: Passed to :meth:`build_options_from_kwargs`. Raises: :exc:`exceptions.InvalidTypeError`: If type is not one of :attr:`AUDIT_LOG_TYPES`. Returns: :obj:`pytan3.results.Result` """ if type not in self.AUDIT_LOG_TYPES: error = "Invalid object type {ot} - MUST be one of {at}" error = error.format(at=self.AUDIT_LOG_TYPES, ot=type) raise exceptions.InvalidTypeError(error) headers = kwargs.pop("headers", {}) params = kwargs.pop("params", {}) kwargs.setdefault("audit_history_size", kwargs.pop("count", 1)) opts = self.build_options_from_kwargs(**kwargs) headers.setdefault("tanium-options", serialize_json(obj=opts, indent=None)) route = "audit_logs/{type}s".format(type=type) response = self.api_client( method="get", endpoint=build_endpoint(route=route, target=target), params=params, headers=headers, ) return self.result_cls.from_response( api_objects=self.api_objects, response=response, lvl=self.log.level )
[docs] def api_parse_question(self, text, **kwargs): """Send an API request to parse text. Args: text (:obj:`str`): Text to parse into question objects. **kwargs: headers (:obj:`dict`): Headers to supply to request. Defaults to: {}. params (:obj:`dict`): Parameters to supply to request. Defaults to: {}. rest of kwargs: Passed to :meth:`build_options_from_kwargs`. Returns: :obj:`pytan3.results.Result` """ check_object_type(obj=text, types=(six.string_types,)) headers = kwargs.pop("headers", {}) params = kwargs.pop("params", {}) opts = self.build_options_from_kwargs(**kwargs) headers.setdefault("tanium-options", serialize_json(obj=opts, indent=None)) obj_dict = {"text": text} route = "parse_question" response = self.api_client( method="post", data=serialize_json(obj=obj_dict), endpoint=build_endpoint(route=route), params=params, headers=headers, ) return self.result_cls.from_response( api_objects=self.api_objects, response=response, lvl=self.log.level )
[docs] def api_get_client_count(self, **kwargs): """Send an API request to get the client count. Args: **kwargs: headers (:obj:`dict`): Headers to supply to request. Defaults to: {}. params (:obj:`dict`): Parameters to supply to request. Defaults to: {}. rest of kwargs: Passed to :meth:`build_options_from_kwargs`. Notes: Unlike the SOAP API, "count" does not seem to be used by REST API. Returns: :obj:`pytan3.results.Result` """ headers = kwargs.pop("headers", {}) params = kwargs.pop("params", {}) opts = self.build_options_from_kwargs(**kwargs) headers.setdefault("tanium-options", serialize_json(obj=opts, indent=None)) route = "client_count" response = self.api_client( method="get", endpoint=build_endpoint(route=route), params=params, headers=headers, ) return self.result_cls.from_response( api_objects=self.api_objects, response=response, lvl=self.log.level )
[docs] def api_add_parsed_question(self, obj, **kwargs): """Send an API request to add a parsed question object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. **kwargs: for_merge (:obj:`bool`): Set option "force_computer_id_flag". Defaults to: True. headers (:obj:`dict`): Headers to supply to request. Defaults to: {}. params (:obj:`dict`): Parameters to supply to request. Defaults to: {}. rest of kwargs: Passed to :meth:`build_options_from_kwargs`. Returns: :obj:`pytan3.results.Result` """ check_object_type(obj=obj, types=(self.api_objects.ParseQuestionResult,)) headers = kwargs.pop("headers", {}) params = kwargs.pop("params", {}) obj.force_computer_id_flag = int(kwargs.pop("for_merge", True)) opts = self.build_options_from_kwargs(**kwargs) headers.setdefault("tanium-options", serialize_json(obj=opts, indent=None)) obj_dict = obj.serialize( exclude_attrs=["id"], wrap_name=False, wrap_item_attr=False ) route = "questions" response = self.api_client( method="post", data=serialize_json(obj=obj_dict), endpoint=build_endpoint(route=route), params=params, headers=headers, ) return self.result_cls.from_response( api_objects=self.api_objects, response=response, lvl=self.log.level )
[docs] def api_get_result_info(self, obj, **kwargs): """Send an API request to get result info for an object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. **kwargs: headers (:obj:`dict`): Headers to supply to request. Defaults to: {}. params (:obj:`dict`): Parameters to supply to request. Defaults to: {}. rest of kwargs: Passed to :meth:`build_options_from_kwargs`. Returns: :obj:`pytan3.results.Result` """ # LATER(!) Action and SavedAction not in REST docs! types = (self.api_objects.Question, self.api_objects.SavedQuestion) check_object_type(obj=obj, types=types) check_object_attrs(obj=obj, attrs=["id"]) headers = kwargs.pop("headers", {}) params = kwargs.pop("params", {}) opts = self.build_options_from_kwargs(**kwargs) headers.setdefault("tanium-options", serialize_json(obj=opts, indent=None)) route = "result_info/{}".format(obj.API_NAME) response = self.api_client( method="get", endpoint=build_endpoint(route=route, target=obj.id), params=params, headers=headers, ) return self.result_cls.from_response( api_objects=self.api_objects, response=response, lvl=self.log.level )
[docs] def api_get_result_data(self, obj, **kwargs): """Send an API request to get result data for an object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API Object to use for request. **kwargs: headers (:obj:`dict`): Headers to supply to request. Defaults to: {}. params (:obj:`dict`): Parameters to supply to request. Defaults to: {}. rest of kwargs: Passed to :meth:`build_options_from_kwargs`. Returns: :obj:`pytan3.results.Result` """ # LATER(!) Action and SavedAction not in REST docs! types = (self.api_objects.Question, self.api_objects.SavedQuestion) check_object_type(obj=obj, types=types) check_object_attrs(obj=obj, attrs=["id"]) headers = kwargs.pop("headers", {}) params = kwargs.pop("params", {}) opts = self.build_options_from_kwargs(**kwargs) headers.setdefault("tanium-options", serialize_json(obj=opts, indent=None)) route = "result_data/{}".format(obj.API_NAME) response = self.api_client( method="get", endpoint=build_endpoint(route=route, target=obj.id), params=params, headers=headers, ) return self.result_cls.from_response( api_objects=self.api_objects, response=response, lvl=self.log.level )
[docs] def api_get_merged_result_data(self, objlist, **kwargs): """Send an API request to get merged result data for a list of objects. Args: objlist (:obj:`list`): List of API Objects to use for request. **kwargs: headers (:obj:`dict`): Headers to supply to request. Defaults to: {}. params (:obj:`dict`): Parameters to supply to request. Defaults to: {}. rest of kwargs: Passed to :meth:`build_options_from_kwargs`. Returns: :obj:`pytan3.results.Result` """ list_types = ( list, tuple, self.api_objects.SavedQuestionList, self.api_objects.QuestionList, ) check_object_type(obj=objlist, types=list_types) item_types = (self.api_objects.Question, self.api_objects.SavedQuestion) pobjlist = [objlist] if isinstance(objlist, item_types) else objlist targets = [] for pobj in pobjlist: sobjlist = [pobj] if isinstance(pobj, item_types) else pobj for sobj in sobjlist: check_object_type(obj=sobj, types=item_types) check_object_attrs(obj=sobj, attrs=["id"]) targets.append("{}/{}".format(sobj.API_NAME, sobj.id)) headers = kwargs.pop("headers", {}) params = kwargs.pop("params", {}) opts = self.build_options_from_kwargs(**kwargs) headers.setdefault("tanium-options", serialize_json(obj=opts, indent=None)) target = "/".join(targets) route = "merged_result_data" response = self.api_client( method="get", endpoint=build_endpoint(route=route, target=target), params=params, headers=headers, ) return self.result_cls.from_response( api_objects=self.api_objects, response=response, lvl=self.log.level )
[docs]def soap_envelope(cmd, obj, opts=None): """Construct a SOAP envelope with the request command, obj, and options. Args: cmd (:obj:`str`): Command to use for request. obj (:obj:`dict`): Object(s) to use for request. options (:obj:`dict`, optional): Options to use for request. Defaults to: None. Returns: :obj:`dict` """ request = {"command": cmd, "object_list": obj, "options": opts} body = { "@xmlns:t": "urn:TaniumSOAP", "@xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", "t:tanium_soap_request": request, } env = { "@soap:encodingStyle": "http://schemas.xmlsoap.org/soap/encoding/", "@xmlns:soap": "http://schemas.xmlsoap.org/soap/envelope/", "@xmlns:xsd": "http://www.w3.org/2001/XMLSchema", "soap:Body": body, } ret = {"soap:Envelope": env} return ret
[docs]def re_soap_tag(text, tag, limit=4000, pattern=r"<{t}>(.*?)</{t}>"): """Search for tag in text[:limit] using pattern. Args: text (:obj:`str`): Text to search for pattern. tag (:obj:`str`): Tag name to use in pattern as 't'. limit (:obj:`int`, optional): Length to limit text to when searching for pattern. Defaults to: 4000. pattern (:obj:`str`, optional): Pattern to use when searching for tag. Defaults to: r'<{e}>(.*?)</{e}>' Notes: Given text is 4 GB and pattern is expected at top of text: * if head is None and pattern not found: 131 seconds * if head is None and pattern found: 0 seconds * if head is 4000 and pattern not found: 0 seconds * if head is 4000 and pattern found: 0 seconds Returns: :obj:`str` """ pattern_txt = pattern.format(t=tag) pattern_re = re.compile(pattern_txt, re.IGNORECASE | re.DOTALL) text_limit = text[:limit] match = pattern_re.search(text_limit) return match.group(1) if match else ""
[docs]def magic_endpoint(obj, target=None, auto_target=True, needs_target=False): """Build a REST API endpoint from an API object. Args: obj (:obj:`pytan3.api_models.ApiModel`): API object to get route and target from in order to build endpoint. target (:obj:`str`, optional): Manually provided target. Defaults to: None. auto_target (:obj:`bool`, optional): Try to identify the target from obj by getting "id" or "name" attributes if their values are not None. Defaults to: True. needs_target (:obj:`bool`, optional): Throw an exception if a target is not automatically determined from obj. Defaults to: False. Raises: :exc:`exceptions.ModuleError`: If needs_target is True and resolved target is None. Returns: :obj:`str` """ route = obj.API_NAME # if it's an ApiItem, all REST routes need the plural route, not the singular if isinstance(obj, api_models.ApiItem): route = obj.API_LIST_CLS.API_NAME if auto_target and target is None: if getattr(obj, "id", None) is not None: target = "{t}".format(t=obj.id) elif getattr(obj, "name", None) is not None: target = "by-name/{t}".format(t=obj.name) if needs_target and target is None: error = "A target is required for {o} and neither 'id' nor 'name' is set!" error = error.format(o=obj) raise exceptions.ModuleError(error) return build_endpoint(route, target)
[docs]def build_endpoint(route, target=None): """Build a REST endpoint string by joining route and target. Args: route (:obj:`str`): Route part of endpoint. target (:obj:`str`, optional): Target part of endpoint. Defaults to None. Returns: :obj:`str` """ target = "" if target is None else "/{}".format(target) return "{}{}".format(route, target)
[docs]def check_adapter_types(adapter): """Check :meth:`Adapter.get_type` against api_client and objects type. Args: adapter (:obj:`Adapter`): Adapter to perform type checking on. Raises: :exc:`exceptions.TypeMismatchError`: If :meth:`Adapter.get_type` is not equal to :attr:`pytan3.api_objects.ApiObjects.module_type` or :meth:`pytan3.api_clients.ApiClient.get_type`. """ if adapter.get_type() != adapter.api_objects.module_type: error = "{objects} does not match type of {adapter}" error = error.format(objects=adapter.api_objects, adapter=adapter) raise exceptions.TypeMismatchError(error) if adapter.get_type() != adapter.api_client.get_type(): error = "{client} does not match type of {adapter}" error = error.format(client=adapter.api_client, adapter=adapter) raise exceptions.TypeMismatchError(error)
[docs]def check_adapter_version(adapter): """Check :meth:`Adapter.get_version_req` against api_client and objects version. Args: adapter (:obj:`Adapter`): Adapter to perform version checking on. Raises: :exc:`pytan3.utils.exceptions.VersionMismatchError`: If the version requirements from :meth:`Adapter.get_version_req` fail to match :attr:`pytan3.api_clients.ApiClient.version` or :attr:`pytan3.api_objects.ApiObjects.module_version`. """ utils.versions.version_check_obj_req( version=adapter.api_client.version, src=adapter.api_client.url, obj=adapter ) utils.versions.version_check_obj_req( version=adapter.api_objects.module_version, src=adapter.api_objects, obj=adapter )
[docs]def check_object_type(obj, types): """Check if an obj is an instance of types. Args: obj (:obj:`object`): Object to check against types. types (:obj:`tuple` of :obj:`type`): Types to check against obj. Raises: :exc:`exceptions.InvalidTypeError`: If type of obj is not on of types. """ if not isinstance(obj, types): error = "Invalid object type {ot} - MUST be one of {at}" error = error.format(at=types, ot=type(obj)) raise exceptions.InvalidTypeError(error)
[docs]def check_object_attrs(obj, attrs): """Check if any attributes of an obj are set. Args: obj (:obj:`pytan3.api_models.ApiModel`): API object to check. attrs (:obj:`list` of :obj:`str`): Attributes to check on obj. Raises: :exc:`exceptions.EmptyAttributeError`: If none of the attributes in attrs on obj are not set to None. """ if isinstance(obj, api_models.ApiItem): if not any(getattr(obj, x, None) is not None for x in attrs): error = "No attributes in {a} defined on {o}" error = error.format(a=attrs, o=obj) raise exceptions.EmptyAttributeError(error)
[docs]def serialize_xml(obj, **kwargs): """Encode python object into an XML string. Args: obj (:obj:`object`): Python object to encode into a string. **kwargs: full_document (:obj:`bool`): Include xml stanza at top. Defaults to: True. pretty (:obj:`bool`): Indent the output doc. Defaults to: True. rest of kwargs: Passed to xmltodict.unparse. Returns: :obj:`str` """ kwargs.setdefault("full_document", True) kwargs.setdefault("pretty", True) return xmltodict.unparse(obj, **kwargs)
[docs]def serialize_json(obj, **kwargs): """Encode python object into a JSON string. Args: obj (:obj:`object`): Python object to encode into a string. **kwargs: indent (:obj:`int`): Indent spacing for prettifying. Defaults to: 2. rest of kwargs: Passed to :func:`json.dumps`. Returns: :obj:`str` """ kwargs.setdefault("indent", 2) return json.dumps(obj, **kwargs)
[docs]def load_type(obj=DEFAULT_TYPE): """Get a :class:`Adapter` by type from :meth:`Adapter.get_type`. Args: obj (:obj:`str`, optional): Type of Adapter. Defaults to: :data:`DEFAULT_TYPE`. Raises: :exc:`exceptions.ModuleError`: Unable to find a valid :class:`Adapter` with the supplied type. Returns: :class:`Adapter` """ exp_cls = Adapter classes = exp_cls.__subclasses__() for cls in classes: if cls.get_type() == obj: return cls valids = list({x.get_type() for x in classes}) error = "\n ".join( ["", "{obj!r} is not a valid type of {cls}, try one of:", "types: {valids}"] ) error = error.format(obj=obj, cls=exp_cls, valids=valids) raise exceptions.ModuleError(error)
[docs]def load(obj=DEFAULT_NAME): """Get a :class:`Adapter` by name from :meth:`Adapter.get_name`. Args: obj (:obj:`str` or :obj:`Adapter` or :class:`Adapter`, optional): Adapter object, class, or name Adapter. Defaults to: :data:`DEFAULT_NAME`. Raises: :exc:`exceptions.ModuleError`: Unable to find a valid :class:`Adapter` with the supplied name. Returns: :class:`Adapter` """ exp_cls = Adapter classes = exp_cls.__subclasses__() if isinstance(obj, exp_cls): return obj.__class__ if callable(obj) and issubclass(obj, exp_cls): return obj if isinstance(obj, six.string_types): for cls in classes: if cls.get_name() == obj: return cls vnames = [x.get_name() for x in classes] error = "\n ".join( ["", "{obj!r} is not a valid {cls}, try one of:", "names: {vn}"] ) error = error.format(obj=obj, cls=exp_cls, vn=vnames) raise exceptions.ModuleError(error)