Source code for pytan3.http_client

# -*- coding: utf-8 -*-
"""Client for making HTTP requests."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import cert_human
import datetime
import json
import re
import requests
import six
import warnings

from . import exceptions
from .. import utils
from .. import version

CERT_FILE = "{http_client.parsed_url.hostname}.pem"
""":obj:`str`: Certificate filename template for :meth:`Certify.__call__`."""

RECORD_FILE = "%Y_%m_%dT%H_%M_%S_%fZ.json"
""":obj:`str`: Record filename template for :meth:`HttpClient.response_hook_disk`."""

SHOW_CERT = "no"
""":obj:`bool`: Default prompt value for "Show full certificate info" in
:meth:`Certify.verify_hook`.

Will use OS Environment variable "PYTAN_SHOW_CERT" if set.
"""

SHOW_CHAIN = "no"
""":obj:`bool`: Default prompt value for "Show certificate chain info" in
:meth:`Certify.verify_hook`.

Will use OS Environment variable "PYTAN_SHOW_CHAIN" if set.
"""

CERT_VALID = "no"
""":obj:`bool`: Default prompt value for "Consider certificate valid" in
:meth:`Certify.verify_hook`.

Will use OS Environment variable "PYTAN_CERT_VALID" if set.
"""

cert_human.enable_urllib3_patch()
# enable urllib3 patch to attach certs to requests response objects


[docs]class HttpClient(object): """Convenience class for requests package.""" CLEAN_HOOKS = { r"/auth": ["clean_hook_auth"], r"/api/v\d?/session": ["clean_hook_auth_rest"], } """:obj:`dict`: URL regex matches for cleaning bodies in cleaning hooks.""" CLEAN_AUTH_KEYS = ["username", "password", "domain", "secondary"] """:obj:`list` of :obj:`str`: Keys to clean in cleaning hooks.""" CLEAN_HIDE = "###HIDDEN###" """:obj:`str`: Text to use to hide values in cleaning hooks."""
[docs] def __init__(self, url, timeout=5, lvl="info"): """Constructor. Args: url (:obj:`str`): URL to use. timeout (:obj:`int`, optional): Connect timeout to use for requests. Defaults to: 5. lvl (:obj:`str`, optional): Logging level for this object. Defaults to: "info". """ self.log = utils.logs.get_obj_log(obj=self, lvl=lvl) """:obj:`logging.Logger`: Log for this object.""" self.timeout = timeout """:obj:`int`: Connect timeout used for all requests.""" self.parsed_url = self.parse_url(url) """:obj:`UrlParser`: Parsed version of URL.""" self.last_request = None """:obj:`requests.PreparedRequest`: Last request sent.""" self.last_response = None """:obj:`requests.Response`: Last response received.""" self.history = [] """:obj:`list`: History of all responses received. Used by :meth:`HttpClient.response_hook_history`. """ self.record_path = None """:obj:`pathlib.Path`: Path to store response recordings to.""" self.session = requests.Session() """:obj:`requests.Session`: Requests session object.""" self.session.hooks = {"response": []} self.session.hooks["response"].append(self.response_hook_log_debug) self.session.hooks["response"].append(self.response_hook_clean) self.certify = Certify(http_client=self) """:obj:`Certify`: Certificate Magic object."""
@property def url(self): """Get the URL string from :attr:`HttpClient.parsed_url`. Returns: :obj:`str` """ return self.parsed_url.url
[docs] def __str__(self): """Show object info. Returns: :obj:`str` """ cert = self.certify.using_custom_cert cert = cert.name if cert else "None" bits = ["url={!r}".format(self.url), "cert={!r}".format(cert)] bits = "({})".format(", ".join(bits)) cls = "{c.__module__}.{c.__name__}".format(c=self.__class__) return "{cls}{bits}".format(cls=cls, bits=bits)
[docs] def __repr__(self): """Show object info. Returns: :obj:`str` """ return self.__str__()
[docs] def __call__( self, method="get", path="", data=None, timeout=5, params=None, headers=None, b64_headers=None, verify=True, **kwargs ): """Create, prepare a request, and then send it. Args: method (:obj:`str`, optional): Method to use. Defaults to: "get". path (:obj:`str`, optional): Path to append to :attr:`HttpClient.url` for this request. Defaults to: "". data (:obj:`str`, optional): Data to send with POST. Defaults to: None. timeout (:obj:`int`, optional): Response timeout. Defaults to: 5. params (:obj:`dict`, optional): URL parameters. Defaults to: None. headers (:obj:`dict`, optional): Headers. Defaults to: None. b64_headers (:obj:`list` of :obj:`str`, optional): Headers to base 64 encode. Defaults to: None. verify (:obj:`bool` or :obj:`str`, optional): Enable/Disable SSL certificate validation using built in CAs, or a path to custom cert. Defaults to: True. **kwargs: cause (:obj:`str`): String to explain purpose of request. Defaults to: "". Notes: If verify is True or None, verification is done using default/built in CA. OS env vars $REQUESTS_CA_BUNDLE, and $CURL_CA_BUNDLE are used if set and trust_env is True, and if trust_env is False session's verify is used. If verify is False, no verification is done. This overrides OS env and session's verify for this request and no verification is done at all. Don't do this. If verify is a str, verification is done with PEM file at path. This overrides OS env and session's verify for this request. Caveat: If previous request made with session and close has not been called on session, the verify of the previous request will be used no matter what is supplied here. Returns: :obj:`requests.Response` """ cause = kwargs.pop("cause", "") b64 = b64_headers or [] h = {} h.update(headers or {}) h.setdefault("User-Agent", self.user_agent) h.update({k: utils.tools.b64_encode(v) for k, v in h.items() if k in b64}) req_args = {} req_args["url"] = requests.compat.urljoin(self.url, path) req_args["method"] = method req_args["data"] = data req_args["headers"] = h req_args["params"] = params send_args = {} send_args["timeout"] = (self.timeout, timeout) send_args.update( self.session.merge_environment_settings( url=self.url, proxies=None, # rely on OS env proxies, then self.session.proxies stream=None, # not using verify=verify, cert=None, # rely on client cert set in self.session.cert ) ) request = requests.Request(**req_args) prequest = self.last_request = self.session.prepare_request(request=request) prequest.datetime = datetime.datetime.utcnow() prequest.cause = cause size = utils.tools.human_size(size=len(prequest.body or "")) m = "sent: url={p.url!r}, method={p.method!r}, size={size}" m = m.format(p=prequest, size=size) self.log.debug(m) r = self.last_response = self.session.send(prequest, **send_args) r.datetime = datetime.datetime.utcnow() r.cause = cause return r
[docs] def control_hook(self, enable, hook): """Add or remove a hook from :attr:`HttpClient.session`. Args: enable (:obj:`bool`): True: Add hook to session. False: Remove hook from session. hook (:obj:`object`): Hook function to add/remove to session. Returns: :obj:`bool`: True/False if hook was actually removed/added. """ ret = False self.session.hooks = getattr(self.session, "hooks", {}) or {} self.session.hooks["response"] = self.session.hooks.get("response", []) if enable: if hook not in self.session.hooks["response"]: self.session.hooks["response"].append(hook) ret = True else: if hook in self.session.hooks["response"]: self.session.hooks["response"].remove(hook) ret = True return ret
[docs] def control_hook_record_disk(self, enable=False, path=None, path_sub="records"): """Add or remove session hook for recording responses to disk. Args: enable (:obj:`bool`, optional): Add/remove :meth:`HttpClient.response_hook_disk` to session hooks. Defaults to: False. path (:obj:`str` or :obj:`pathlib.Path`, optional): Storage directory to use. If empty, resolve path via :func:`pytan3.utils.tools.get_storage_dir`. Defaults to: None. path_sub (:obj:`str`, optional): Sub directory under path to save records to. Defaults to: "records" """ path = utils.tools.get_storage_dir(path=path, path_sub=path_sub, mkdir=True) self.record_path = path if enable: done = self.control_hook(enable=True, hook=self.response_hook_disk) if done: m = "Started recording responses to {path!r}" m = m.format(path=format(path)) self.log.debug(m) else: done = self.control_hook(enable=False, hook=self.response_hook_disk) if done: m = "Stopped recording responses to {path!r}" m = m.format(path=format(path)) self.log.debug(m)
[docs] def control_hook_record_history(self, enable=False): """Add or remove session hook for recording in local class. Args: enable (:obj:`bool`, optional): Add/remove :meth:`HttpClient.response_hook_history` to session hooks. Defaults to: False. """ if enable: done = self.control_hook(enable=True, hook=self.response_hook_history) if done: m = "Started recording responses to {c!r}" m = m.format(c=format(self)) self.log.debug(m) else: done = self.control_hook(enable=False, hook=self.response_hook_history) if done: m = "Stopped recording responses to {c!r}" m = m.format(c=format(self)) self.log.debug(m)
[docs] def response_hook_log_debug(self, response, *args, **kwargs): """Response hook to log info. Args: response (:obj:`requests.Response`): Response to process. *args: Unused, yet supplied by :obj:`requests.Session`. **kwargs: Unused, yet supplied by :obj:`requests.Session`. Returns: :obj:`requests.Response` """ m = ( "rcvd: url={response.url!r}, method={request.method!r}, size={size}, " "status={response.status_code!r}, reason={response.reason!r}, " "elapsed={response.elapsed}, cause={cause!r}" ) m = m.format( response=response, request=response.request, size=utils.tools.human_size(len(response.text or "")), cause=getattr(response, "cause", ""), ) self.log.debug(m) return response
[docs] def response_hook_clean(self, response, *args, **kwargs): """Response hook to add cleaned versions of request/response body and headers. Args: response (:obj:`requests.Response`): Response to process. *args: Unused, yet supplied by :obj:`requests.Session`. **kwargs: Unused, yet supplied by :obj:`requests.Session`. Returns: :obj:`requests.Response` """ response.clean_headers = response.headers or {} response.clean_body = response.text or "" response.request.clean_headers = response.request.headers or {} response.request.clean_body = response.request.body or "" for url, hooks in self.CLEAN_HOOKS.items(): if not re.search(url, response.url): continue for hook in hooks: if isinstance(hook, six.string_types): hook = getattr(self, hook, None) if callable(hook): response = hook(http_client=self, response=response) return response
[docs] @staticmethod def clean_hook_auth_rest(http_client, response): """Clean hook to hide auth keys in a response from REST session route. Args: http_client (:obj:`HttpClient`): Object to get :attr:`HttpClient.CLEAN_AUTH_KEYS` and :attr:`HttpClient.CLEAN_HIDE` from. response (:obj:`requests.Response`): Object to add clean_body attribute to. Returns: :obj:`requests.Response` """ keys = http_client.CLEAN_AUTH_KEYS hidden = http_client.CLEAN_HIDE body = json.loads(response.request.clean_body) body = {k: hidden if k in keys else v for k, v in body.items()} response.request.clean_body = json.dumps(body, indent=2) return response
[docs] @staticmethod def clean_hook_auth(http_client, response): """Clean hook to hide auth headers in a response from /auth API. Args: http_client (:obj:`HttpClient`): Object to get :attr:`HttpClient.CLEAN_AUTH_KEYS` and :attr:`HttpClient.CLEAN_HIDE` from. response (:obj:`requests.Response`): Object to add clean_headers attribute to. Returns: :obj:`requests.Response` """ keys = http_client.CLEAN_AUTH_KEYS hidden = http_client.CLEAN_HIDE headers = response.request.clean_headers headers = {k: hidden if k in keys else v for k, v in headers.items()} response.request.clean_headers = headers return response
[docs] def response_hook_history(self, response, *args, **kwargs): """Response hook to add response to :attr:`HttpClient.history`. Args: response (:obj:`requests.Response`): Response to process. *args: Unused, yet supplied by :obj:`requests.Session`. **kwargs: Unused, yet supplied by :obj:`requests.Session`. """ self.history = getattr(self, "history", []) self.history.append(response)
[docs] def response_hook_disk(self, response, *args, **kwargs): """Response hook to write response to :attr:`HttpClient.record_path`. Args: response (:obj:`requests.Response`): Response to serialize and write to disk. *args: Unused, yet supplied by :obj:`requests.Session`. **kwargs: Unused, yet supplied by :obj:`requests.Session`. Returns: :obj:`requests.Response` """ self.record_path.mkdir(mode=0o700, parents=True, exist_ok=True) now = datetime.datetime.utcnow() record_file = getattr(response, "datetime", now).strftime(RECORD_FILE) record_path = self.record_path / record_file response.record_path = record_path with record_path.open("wb" if six.PY2 else "w") as fh: json.dump(obj=self.serialize_response(response), fp=fh, indent=2) m = "JSON record of response written to {path!r}" m = m.format(path=format(record_path)) self.log.debug(m) return response
[docs] def serialize_response(self, response): """Churn a response into JSON serializeable format. Args: response (:obj:`requests.Response`): Response object to churn. Returns: :obj:`dict` """ request = response.request now = datetime.datetime.utcnow() ret = { "url": response.url, "url_path": request.path_url, "method": request.method, "elapsed": format(response.elapsed), "code": response.status_code, "cause": getattr(response.request, "cause", ""), "request": { "headers": dict(getattr(request, "clean_headers", request.headers)), "body": getattr(request, "clean_body", request.body), "datetime": format(getattr(request, "datetime", now)), }, "response": { "headers": dict(getattr(response, "clean_headers", response.headers)), "body": getattr(response, "clean_body", response.text), "datetime": format(getattr(response, "datetime", now)), }, } return ret
[docs] def parse_url(self, url): """Parse a URL using UrlParser. Args: url (:obj:`str`): URL to parse. Returns: :obj:`UrlParser` """ parsed_url = UrlParser(url=url, default_scheme="https") m = "Parsed url {old!r} into {new!r} using {parsed}" m = m.format(old=url, new=parsed_url.url, parsed=parsed_url) self.log.debug(m) return parsed_url
@property def user_agent(self): """Build a user agent string for use in headers. Returns: :obj:`str` """ return "{pkg}.{name}/{ver}".format( pkg=__name__.split(".")[0], name=self.__class__.__name__, ver=version.__version__, )
[docs]class Certify(object): """Certificate verification magic."""
[docs] def __init__(self, http_client, lvl="info"): """Constructor. Args: http_client (:obj:`HttpClient`): Client to use for getting certificates and configuring the verify attribute on :attr:`HttpClient.session` lvl (:obj:`str`, optional): Logging level for this object. Defaults to: "info". """ self.log = utils.logs.get_obj_log(obj=self, lvl=lvl) """:obj:`logging.Logger`: Logger for this object.""" self.http_client = http_client """:obj:`HttpClient`: Client for this object.""" self.using_custom_cert = False """:obj:`bool`: If we are using a cert from disk or not."""
[docs] def __call__( self, path=None, path_sub="certs", path_file=CERT_FILE, verify_hook=None, overwrite=False, lvl=None, ): """Validate, find, or get certificate for URL. Args: path (:obj:`str` or :obj:`pathlib.Path`, optional): Storage directory to use. If empty, resolve path via :func:`pytan3.utils.tools.get_storage_dir`. Defaults to: None. path_sub (:obj:`str`, optional): Sub directory under path that should contain path_file. Defaults to: "certs" path_file (:obj:`str`, optional): Filename to use for cert file under path / path_sub. Defaults to: :attr:`CERT_FILE` verify_hook (:obj:`callable` or :obj:`False`): A callable used to verify a SSL cert from URL before writing it to disk. Only used if default cert is invalid and path/path_sub/path_file does not exist. If False, cert is written to disk without running any verify hook. If None, uses :meth:`Certify.verify_hook` as verify hook. If callable, called with args: store, store_chain, and parsed_url. Defaults to: None. overwrite (:obj:`bool`, optional): Overwrite cert at path if already exists. Defaults to: False. lvl (:obj:`str`, optional): If not None, change logging level for this object. Defaults to: None. Raises: :obj:`exceptions.CertificateNotFoundWarning`: If cert at URL is not valid using default cert validation, and no cert can be found at path. """ utils.logs.set_level(lvl or self.log.level, self.log) if self.check_default(): self.using_custom_cert = False return path = utils.tools.get_storage_dir(path=path, path_sub=path_sub, mkdir=False) path = path / path_file.format(http_client=self.http_client) path_is_file = path.is_file() if path_is_file: self.check_path(path=path) self.using_custom_cert = path return error = "\n".join( [ "Unable to find certificate file for URL {url!r} at path {path!r}", "Will try to get certificate, prompt for verification, then write it.", ] ) error = error.format(url=self.http_client.url, path=format(path)) warnings.warn(error, exceptions.CertificateNotFoundWarning) if verify_hook is not False: if not verify_hook or not callable(verify_hook): verify_hook = self.verify_hook verify_hook( store=self.store, store_chain=self.store_chain, parsed_url=self.http_client.parsed_url, ) self.write_pem(path=path, overwrite=overwrite) self.using_custom_cert = path
[docs] def __str__(self): """Show object info. Returns: :obj:`str` """ bits = ["custom_cert={!r}".format(format(self.using_custom_cert))] bits = "({})".format(", ".join(bits)) cls = "{c.__module__}.{c.__name__}".format(c=self.__class__) return "{cls}{bits}".format(cls=cls, bits=bits)
[docs] def __repr__(self): """Show object info. Returns: :obj:`str` """ return self.__str__()
[docs] def write_pem(self, path, overwrite=False): """Write a certificate in PEM format to disk. Args: path (:obj:`str` or :obj:`pathlib.Path`): Path to write PEM certificate to. overwrite (:obj:`bool`, optional): Overwrite cert at path if already exists. Defaults to: False. Returns: :obj:`pathlib.Path` """ path = self.store.to_path(path=path, overwrite=overwrite) m = "Wrote certificate for {url!r} to {path!r}" m = m.format(url=self.http_client.url, path=format(path)) self.log.debug(m) self.check_path(path=path) return path
[docs] @staticmethod def verify_hook(store, store_chain, parsed_url): """Verify cert by prompting user, default hook. Args: store (:obj:`cert_human.CertStore`): Store from :attr:`Certify.store`. store_chain (:obj:`cert_human.CertChainStore`): Chain store from :attr:`Certify.store_chain`. parsed_url (:obj:`str`): Parsed URL from :attr:`HttpClient.parsed_url`. Raises: :exc:`exceptions.CertificateInvalidError`: If user replies No to validity prompt. """ promptness = utils.prompts.Promptness() text = "\n{{f.yellow}}Validating certificate from URL: {url!r}{{s.reset}}\n" text = text.format(url=parsed_url.url) promptness.spew(text=promptness.prepare(text=text)) text = "{f.cyan}Brief certificate info:{s.reset}\n" promptness.spew(text=promptness.prepare(text=text)) promptness.spew(text=store.dump_str_info) show_all = promptness.ask_bool( text="Show full certificate info?", default=SHOW_CERT, env_var="PYTAN_SHOW_CERT", ) if show_all: text = "{f.cyan}Full certificate info:{s.reset}\n" promptness.spew(text=promptness.prepare(text=text)) promptness.spew(text=store.dump_str) show_chain = promptness.ask_bool( text="Show certificate chain?", default=SHOW_CHAIN, env_var="PYTAN_SHOW_CHAIN", ) if show_chain: text = "{f.cyan}Certificate chain:{s.reset}\n" promptness.spew(text=promptness.prepare(text=text)) promptness.spew(text=store_chain.dump_str_info) valid = promptness.ask_bool( text="Is this certificate valid?", default=CERT_VALID, env_var="PYTAN_CERT_VALID", ) if valid: return error = "User said certificate from URL {u!r} is invalid! Certificate:\n{c}" error = error.format(u=parsed_url.url, c=store.dump_str_info) raise exceptions.CertificateInvalidError(error)
@property def store(self): """Get CertStore for URL. Returns: :obj:`cert_human.CertStore` """ self._fetch_stores return self._store @property def store_chain(self): """Get CertChainStore for URL. Returns: :obj:`cert_human.CertChainStore` """ self._fetch_stores return self._store_chain @property def _fetch_stores(self): """Get the CertStore and CertChainStore for URL. Returns: (:obj:`cert_human.CertStore`, :obj:`cert_human.CertChainStore`) """ attrs = ["_store", "_store_chain"] if not any(getattr(self, x, None) for x in attrs): with warnings.catch_warnings(): warnings.filterwarnings("ignore") # close all ctx so previous verify not used self.http_client.session.close() cause = "Get certs for certify stores" r = self.http_client(verify=False, cause=cause) # close all ctx so future verify does not use False self.http_client.session.close() self._store = cert_human.CertStore.from_response(response=r) self._store_chain = cert_human.CertChainStore.from_response(response=r) return (self._store, self._store_chain)
[docs] def check_default(self): """Check if cert for URL is valid without setting a specific path. Returns: :obj:`bool` """ # close all ctx so previous verify not used self.http_client.session.close() m = "Checking that default certificate is valid for URL {url!r}." m = m.format(url=self.http_client.url) self.log.debug(m) m = "Certificate Info for {url!r}:\n{info}" m = m.format(url=self.http_client.url, info=self.store.dump_str_info) self.log.debug(m) cause = "Test default cert of peer" try: self.http_client(verify=True, cause=cause) # close all ctx so future verify do not use True self.http_client.session.close() m = "Default certificate for {url!r} is valid, not setting custom cert." m = m.format(url=self.http_client.url) self.log.debug(m) return True except requests.exceptions.SSLError as exc: m = "Default certificate for {url!r} is not valid, will find/get one." m = m.format(url=self.http_client.url) self.log.debug(m) m = "Default certificate error: {exc}".format(exc=exc) self.log.debug(m) return False
[docs] def check_path(self, path): """Check if cert at path is valid for URL. Args: path (:obj:`str` or :obj:`pathlib.Path`): Path to PEM certificate file. Notes: If validation is successful, the verify attribute on :attr:`HttpClient.session` will be set to path. Raises: :exc:`exceptions.CertificateInvalidError`: If path fails verification. """ # close all ctx so previous verify not used self.http_client.session.close() m = "Checking that certificate at {path!r} is valid for URL {url!r}." m = m.format(path=format(path), url=self.http_client.url) self.log.debug(m) cause = "Test custom cert at {!r}".format(format(path)) try: self.http_client(verify=format(path), cause=cause) except requests.exceptions.SSLError as exc: # close all ctx so future ctx dont use test verify self.http_client.session.close() m = "Certificate error: {exc}".format(exc=exc) self.log.debug(m) error = "Certificate at {path!r} is not valid for URL {url!r}" error = error.format(path=format(path), url=self.http_client.url) raise exceptions.CertificateInvalidError(error) m = ( "Certificate at {path!r} is valid for URL {url!r}, " "will use for SSL verification." ) m = m.format(path=format(path), url=self.http_client.url) self.log.debug(m) self.http_client.session.verify = format(path) # close all ctx so future ctx use new verify self.http_client.session.close()
[docs]class UrlParser(object): """Parse a URL and ensure it has the neccessary bits."""
[docs] def __init__(self, url, default_scheme=""): """Constructor. Args: url (:obj:`str`): URL to parse default_scheme (:obj:`str`, optional): If no scheme in URL, use this. Defaults to: "" Raises: :exc:`exceptions.ModuleError`: If parsed URL winds up without a hostname, port, or scheme. """ self._init_url = url """:obj:`str`: Initial URL provided.""" self._init_scheme = default_scheme """:obj:`str`: Default scheme provided.""" self._init_parsed = requests.compat.urlparse(url) """:obj:`urllib.parse.ParseResult`: First pass of parsing URL.""" self.parsed = self.reparse( parsed=self._init_parsed, default_scheme=default_scheme ) """:obj:`urllib.parse.ParseResult`: Second pass of parsing URL.""" for part in ["hostname", "port", "scheme"]: if not getattr(self.parsed, part, None): error = "\n".join( [ "", "Parsed into: {pstr}", "URL format should be like: scheme://hostname:port", "No {part} provided in URL {url!r}", ] ) error = error.format(part=part, url=url, pstr=self.parsed_str) raise exceptions.ModuleError(error)
[docs] def __str__(self): """Show object info. Returns: :obj:`str` """ bits = ["parsed={!r}".format(self.parsed_str)] bits = "({})".format(", ".join(bits)) cls = "{c.__module__}.{c.__name__}".format(c=self.__class__) return "{cls}{bits}".format(cls=cls, bits=bits)
[docs] def __repr__(self): """Show object info. Returns: :obj:`str` """ return self.__str__()
@property def hostname(self): """Hostname part from :attr:`UrlParser.parsed`. Returns: :obj:`str` """ return self.parsed.hostname @property def port(self): """Port part from :attr:`UrlParser.parsed`. Returns: :obj:`int` """ return int(self.parsed.port) @property def scheme(self): """Scheme part from :attr:`UrlParser.parsed`. Returns: :obj:`str` """ return self.parsed.scheme @property def url(self): """Get scheme, hostname, and port from :attr:`UrlParser.parsed`. Returns: :obj:`str` """ return self.unparse_base(p=self.parsed) @property def url_full(self): """Get full URL from :attr:`UrlParser.parsed`. Returns: :obj:`str` """ return self.unparse_all(p=self.parsed) @property def parsed_str(self): """Create string of :attr:`UrlParser.parsed`. Returns: :obj:`str` """ parsed = getattr(self, "parsed", None) attrs = [ "scheme", "netloc", "hostname", "port", "path", "params", "query", "fragment", ] vals = ", ".join( [ "{a}={v!r}".format(a=a, v="{}".format(getattr(parsed, a, "")) or "") for a in attrs ] ) return vals
[docs] def make_netloc(self, host, port): """Create netloc from host and port. Args: host (:obj:`str`): Host part to use in netloc. port (:obj:`str`): Port part to use in netloc. Returns: :obj:`str` """ netloc = ":".join([host, port]) if port else host return netloc
[docs] def reparse(self, parsed, default_scheme=""): """Reparse a parsed URL into a parsed URL with values fixed. Args: parsed (:obj:`urllib.parse.ParseResult`): Parsed URL to reparse. default_scheme (:obj:`str`, optional): If no scheme in URL, use this. Defaults to: "" Returns: :obj:`urllib.parse.ParseResult` """ scheme, netloc, path, params, query, fragment = parsed host = parsed.hostname port = format(parsed.port or "") if not netloc and scheme and path and path.split("/")[0].isdigit(): """For case: >>> urllib.parse.urlparse('host:443/') ParseResult( scheme='host', netloc='', path='443/', params='', query='', fragment='' ) """ host = scheme # switch host from scheme to host port = path.split("/")[0] # remove / from path and assign to port path = "" # empty out path scheme = default_scheme netloc = ":".join([host, port]) if not netloc and path: """For cases: >>> urllib.parse.urlparse('host:443') ParseResult( scheme='', netloc='', path='host:443', params='', query='', fragment='' ) >>> urllib.parse.urlparse('host') ParseResult( scheme='', netloc='', path='host', params='', query='', fragment='' ) """ netloc, path = path, netloc if ":" in netloc: host, port = netloc.split(":", 1) netloc = ":".join([host, port]) if port else host else: host = netloc scheme = scheme or default_scheme if not scheme and port: if format(port) == "443": scheme = "https" elif format(port) == "80": scheme = "http" if not port: if scheme == "https": port = "443" netloc = self.make_netloc(host, port) elif scheme == "http": port = "80" netloc = self.make_netloc(host, port) pass2 = requests.compat.urlunparse( (scheme, netloc, path, params, query, fragment) ) ret = requests.compat.urlparse(pass2) return ret
[docs] def unparse_base(self, p): """Unparse a parsed URL into just the scheme, hostname, and port parts. Args: p (:obj:`urllib.parse.ParseResult`): Parsed URL to unparse. Returns: :obj:`str` """ # only unparse self.parsed into url with scheme and netloc return requests.compat.urlunparse((p.scheme, p.netloc, "", "", "", ""))
[docs] def unparse_all(self, p): """Unparse a parsed URL with all the parts. Args: p (:obj:`urllib.parse.ParseResult`): Parsed URL to unparse. Returns: :obj:`str` """ return requests.compat.urlunparse(p)