Source code for pykube.http

"""
HTTP request related code.
"""
import base64
import datetime
import json
import logging
import os
import shlex
import subprocess
from typing import Optional

try:
    import google.auth
    from google.auth.transport.requests import Request as GoogleAuthRequest

    google_auth_installed = True
except ImportError:
    google_auth_installed = False
try:
    from requests_oauthlib import OAuth2Session

    oidc_auth_installed = True
except ImportError:
    oidc_auth_installed = False

import requests.adapters

from http import HTTPStatus
from urllib.parse import urlparse

from .exceptions import HTTPError, PyKubeError
from .utils import jsonpath_installed, jsonpath_parse, join_url_path
from .config import KubeConfig

from . import __version__

DEFAULT_HTTP_TIMEOUT = 10  # seconds
EXPIRY_SKEW_PREVENTION_DELAY = datetime.timedelta(minutes=5)
UTC = datetime.timezone.utc
LOG = logging.getLogger(__name__)


[docs]class KubernetesHTTPAdapter(requests.adapters.HTTPAdapter): # _do_send: the actual send method of HTTPAdapter # it can be overwritten in unit tests to mock the actual HTTP calls _do_send = requests.adapters.HTTPAdapter.send def __init__(self, kube_config: KubeConfig, **kwargs): self.kube_config = kube_config super().__init__(**kwargs) def _persist_credentials(self, config, opts): user_name = config.contexts[config.current_context]["user"] user = [u["user"] for u in config.doc["users"] if u["name"] == user_name][0] auth_config = user["auth-provider"].setdefault("config", {}) auth_config.update(opts) config.persist_doc() config.reload() def _auth_gcp(self, request, token, expiry, config): original_request = request.copy() credentials = google.auth.default( scopes=[ "https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/userinfo.email", ] )[0] credentials.token = token credentials.expiry = expiry should_persist = not credentials.valid auth_request = GoogleAuthRequest() credentials.before_request( auth_request, request.method, request.url, request.headers ) if should_persist and config: auth_opts = { "access-token": credentials.token, "expiry": credentials.expiry, } self._persist_credentials(config, auth_opts) def retry(send_kwargs): credentials.refresh(auth_request) response = self.send(original_request, **send_kwargs) if response.ok and config: auth_opts = { "access-token": credentials.token, "expiry": credentials.expiry, } self._persist_credentials(config, auth_opts) return response return retry def _is_valid_jwt(self, token): """Validate JWT token for correctness and near expiration""" if not token: return False reserved_characters = frozenset(["=", "+", "/"]) if any(char in token for char in reserved_characters): # Invalid jwt, as it contains url-unsafe chars return False parts = token.split(".") if len(parts) != 3: # Not a valid JWT return False padding = (4 - len(parts[1]) % 4) * "=" if len(padding) == 3: # According to spec, 3 padding characters cannot occur # in a valid jwt # https://tools.ietf.org/html/rfc7515#appendix-C return False jwt_attributes = json.loads( base64.b64decode(parts[1] + padding).decode("utf-8") ) expire = jwt_attributes.get("exp") # allow missing exp, but deny tokens that are about to expire soon return expire is None or ( datetime.datetime.fromtimestamp(expire, tz=UTC) - EXPIRY_SKEW_PREVENTION_DELAY ) > datetime.datetime.utcnow().replace(tzinfo=UTC) def _refresh_oidc_token(self, config): if not oidc_auth_installed: raise ImportError( "missing dependencies for OIDC token refresh support " "(try pip install pykube-ng[oidc]" ) auth_config = config.user["auth-provider"]["config"] if "idp-certificate-authority" in auth_config: verify = auth_config["idp-certificate-authority"].filename() else: verify = None oauth = OAuth2Session() discovery = oauth.get( f"{auth_config['idp-issuer-url']}/.well-known/openid-configuration", verify=verify, timeout=DEFAULT_HTTP_TIMEOUT, withhold_token=True, ) if discovery.status_code != HTTPStatus.OK: raise PyKubeError( f"Failed to discover OpenID token endpoint - " f"HTTP {discovery.status_code}: {discovery.text}" ) discovery = discovery.json() refresh = oauth.refresh_token( token_url=discovery["token_endpoint"], refresh_token=auth_config["refresh-token"], client_id=auth_config["client-id"], client_secret=auth_config.get("client-secret"), verify=verify, timeout=DEFAULT_HTTP_TIMEOUT, ) auth_opts = { "id-token": refresh["id_token"], "refresh-token": refresh["refresh_token"], } self._persist_credentials(config, auth_opts)
[docs] def send(self, request, **kwargs): if "kube_config" in kwargs: config = kwargs.pop("kube_config") else: config = self.kube_config _retry_attempt = kwargs.pop("_retry_attempt", 0) retry_func = self._setup_request_auth(config, request, kwargs) self._setup_request_certificates(config, request, kwargs) response = self._do_send(request, **kwargs) _retry_status_codes = {HTTPStatus.UNAUTHORIZED} if ( response.status_code in _retry_status_codes and retry_func and _retry_attempt < 2 ): send_kwargs = {"_retry_attempt": _retry_attempt + 1, "kube_config": config} send_kwargs.update(kwargs) return retry_func(send_kwargs=send_kwargs) return response
def _setup_request_auth(self, config, request, kwargs): """ Set up authorization for the request. Return an optional function to use as a retry manager if the initial request fails with an unauthorized error. """ if "Authorization" in request.headers: # request already has some auth header (e.g. Bearer token) # don't modify/overwrite it return None if config.user.get("token"): request.headers["Authorization"] = "Bearer {}".format(config.user["token"]) return None if "exec" in config.user: exec_conf = config.user["exec"] api_version = exec_conf["apiVersion"] if api_version == "client.authentication.k8s.io/v1alpha1": cmd_env_vars = dict(os.environ) for env_var in exec_conf.get("env") or []: cmd_env_vars[env_var["name"]] = env_var["value"] output = subprocess.check_output( [exec_conf["command"]] + exec_conf["args"], env=cmd_env_vars ) parsed_out = json.loads(output) token = parsed_out["status"]["token"] else: raise NotImplementedError( f"auth exec api version {api_version} not implemented" ) request.headers["Authorization"] = "Bearer {}".format(token) return None if config.user.get("username") and config.user.get("password"): request.prepare_auth((config.user["username"], config.user["password"])) return None if "auth-provider" in config.user: auth_provider = config.user["auth-provider"] if auth_provider.get("name") == "gcp": dependencies = [google_auth_installed, jsonpath_installed] if not all(dependencies): raise ImportError( "missing dependencies for GCP support (try pip install pykube-ng[gcp]" ) auth_config = auth_provider.get("config", {}) if "cmd-path" in auth_config: output = subprocess.check_output( [auth_config["cmd-path"]] + shlex.split(auth_config["cmd-args"]) ) parsed = json.loads(output) token = jsonpath_parse(auth_config["token-key"], parsed) expiry = datetime.datetime.strptime( jsonpath_parse(auth_config["expiry-key"], parsed), "%Y-%m-%dT%H:%M:%SZ", ) retry_func = self._auth_gcp(request, token, expiry, None) else: retry_func = self._auth_gcp( request, auth_config.get("access-token"), auth_config.get("expiry"), config, ) return retry_func elif auth_provider.get("name") == "oidc": auth_config = auth_provider.get("config", {}) if not self._is_valid_jwt(auth_config.get("id-token")): try: self._refresh_oidc_token(config) # ignoring all exceptions, rely on retries except Exception as oidc_exc: LOG.warning(f"Failed to refresh OpenID token: {oidc_exc}") # not using auth_config handle here as the config might have # been reloaded during token refresh request.headers["Authorization"] = "Bearer {}".format( config.user["auth-provider"]["config"]["id-token"] ) return None def _setup_request_certificates(self, config, request, kwargs): if "client-certificate" in config.user: kwargs["cert"] = ( config.user["client-certificate"].filename(), config.user["client-key"].filename(), ) # setup certificate verification if "certificate-authority" in config.cluster: kwargs["verify"] = config.cluster["certificate-authority"].filename() elif "insecure-skip-tls-verify" in config.cluster: kwargs["verify"] = not config.cluster["insecure-skip-tls-verify"]
[docs]class HTTPClient: """ Client for interfacing with the Kubernetes API. """ http_adapter_cls = KubernetesHTTPAdapter def __init__( self, config: KubeConfig, timeout: float = DEFAULT_HTTP_TIMEOUT, dry_run: bool = False, verify: bool = True, http_adapter: Optional[requests.adapters.HTTPAdapter] = None, ): """ Creates a new instance of the HTTPClient. :Parameters: - `config`: The configuration instance """ self.config = config self.timeout = timeout self.url = self.config.cluster["server"] self.dry_run = dry_run session = requests.Session() session.headers["User-Agent"] = f"pykube-ng/{__version__}" if not http_adapter: http_adapter = self.http_adapter_cls(self.config) session.mount("https://", http_adapter) session.mount("http://", http_adapter) self.session = session self.session.verify = verify @property def url(self): return self._url @url.setter def url(self, value): pr = urlparse(value) self._url = pr.geturl() @property def version(self): """ Get Kubernetes API version """ response = self.get(version="", base="/version") response.raise_for_status() data = response.json() return (data["major"], data["minor"])
[docs] def resource_list(self, api_version): cached_attr = f"_cached_resource_list_{api_version}" if not hasattr(self, cached_attr): r = self.get(version=api_version) r.raise_for_status() setattr(self, cached_attr, r.json()) return getattr(self, cached_attr)
[docs] def get_kwargs(self, **kwargs) -> dict: """ Creates a full URL to request based on arguments. :Parametes: - `kwargs`: All keyword arguments to build a kubernetes API endpoint """ version = kwargs.pop("version", "v1") if version == "v1": base = kwargs.pop("base", "/api") elif "/" in version: base = kwargs.pop("base", "/apis") else: if "base" not in kwargs: raise TypeError("unknown API version; base kwarg must be specified.") base = kwargs.pop("base") if version.startswith("/"): # for compatibility with pykube-ng 20.1.0 when calling api.get(version="/apis"): # posixpath.join() was throwing away everything before the first "absolute" path (i.e. starting with a slash) bits = [version] else: bits = [base, version] # Overwrite (default) namespace from context if it was set if "namespace" in kwargs: n = kwargs.pop("namespace") if n is not None: if n: namespace = n else: namespace = self.config.namespace if namespace: bits.extend(["namespaces", namespace]) url = kwargs.get("url", "") bits.append(url) kwargs["url"] = self.url + join_url_path(*bits, join_empty=True) if "timeout" not in kwargs: # apply default HTTP timeout kwargs["timeout"] = self.timeout if self.dry_run: # Add http query param for dryRun params = kwargs.get("params", {}) params["dryRun"] = "All" kwargs["params"] = params return kwargs
[docs] def raise_for_status(self, resp): try: resp.raise_for_status() except Exception: # attempt to provide a more specific exception based around what # Kubernetes returned as the error. if resp.headers["content-type"] == "application/json": payload = resp.json() if payload["kind"] == "Status": raise HTTPError(resp.status_code, payload["message"]) raise
[docs] def request(self, *args, **kwargs): """ Makes an API request based on arguments. :Parameters: - `args`: Non-keyword arguments - `kwargs`: Keyword arguments """ return self.session.request(*args, **self.get_kwargs(**kwargs))
[docs] def get(self, *args, **kwargs): """ Executes an HTTP GET. :Parameters: - `args`: Non-keyword arguments - `kwargs`: Keyword arguments """ return self.session.get(*args, **self.get_kwargs(**kwargs))
[docs] def options(self, *args, **kwargs): """ Executes an HTTP OPTIONS. :Parameters: - `args`: Non-keyword arguments - `kwargs`: Keyword arguments """ return self.session.options(*args, **self.get_kwargs(**kwargs))
[docs] def head(self, *args, **kwargs): """ Executes an HTTP HEAD. :Parameters: - `args`: Non-keyword arguments - `kwargs`: Keyword arguments """ return self.session.head(*args, **self.get_kwargs(**kwargs))
[docs] def post(self, *args, **kwargs): """ Executes an HTTP POST. :Parameters: - `args`: Non-keyword arguments - `kwargs`: Keyword arguments """ return self.session.post(*args, **self.get_kwargs(**kwargs))
[docs] def put(self, *args, **kwargs): """ Executes an HTTP PUT. :Parameters: - `args`: Non-keyword arguments - `kwargs`: Keyword arguments """ return self.session.put(*args, **self.get_kwargs(**kwargs))
[docs] def patch(self, *args, **kwargs): """ Executes an HTTP PATCH. :Parameters: - `args`: Non-keyword arguments - `kwargs`: Keyword arguments """ return self.session.patch(*args, **self.get_kwargs(**kwargs))
[docs] def delete(self, *args, **kwargs): """ Executes an HTTP DELETE. :Parameters: - `args`: Non-keyword arguments - `kwargs`: Keyword arguments """ return self.session.delete(*args, **self.get_kwargs(**kwargs))