Source code for pykube.query

import json
from collections import namedtuple
from typing import Union
from urllib.parse import urlencode

from .exceptions import ObjectDoesNotExist
from .http import HTTPClient


all_ = object()
everything = object()
now = object()


[docs]class Table: """ Tabular resource representation See https://kubernetes.io/docs/reference/using-api/api-concepts/#receiving-resources-as-tables """ def __init__(self, api_obj_class, obj: dict): assert obj["kind"] == "Table" self.api_obj_class = api_obj_class self.obj = obj def __repr__(self) -> str: return "<Table of {kind} at {address}>".format( kind=self.api_obj_class.kind, address=hex(id(self)) ) @property def columns(self): return self.obj["columnDefinitions"] @property def rows(self): return self.obj["rows"]
[docs]class BaseQuery: def __init__(self, api: HTTPClient, api_obj_class, namespace: str = None): self.api = api self.api_obj_class = api_obj_class self.namespace = namespace self.selector = everything self.field_selector = everything def __repr__(self) -> str: return "<Query of {kind} at {address}>".format( kind=self.api_obj_class.kind, address=hex(id(self)) )
[docs] def all(self) -> "BaseQuery": return self._clone()
[docs] def filter( self, namespace: str = None, selector: Union[str, dict] = None, field_selector: Union[str, dict] = None, ) -> "BaseQuery": """ Filter objects by namespace, labels, or fields :param namespace: Namespace to filter by (pass pykube.all to get objects in all namespaces) :param selector: Label selector, can be a dictionary of label names/values """ clone = self._clone() if namespace is not None: clone.namespace = namespace if selector is not None: clone.selector = selector if field_selector is not None: clone.field_selector = field_selector return clone
def _clone(self, cls=None): if cls is None: cls = self.__class__ clone = cls(self.api, self.api_obj_class, namespace=self.namespace) clone.selector = self.selector clone.field_selector = self.field_selector return clone def _build_api_url(self, params: dict = None): if params is None: params = {} if self.selector is not everything: params["labelSelector"] = as_selector(self.selector) # type: ignore if self.field_selector is not everything: params["fieldSelector"] = as_selector(self.field_selector) # type: ignore query_string = urlencode(params) return "{}{}".format( self.api_obj_class.endpoint, f"?{query_string}" if query_string else "", )
[docs]class Query(BaseQuery):
[docs] def get_by_name(self, name: str): """ Get object by name, raises ObjectDoesNotExist if not found """ kwargs = { "url": f"{self.api_obj_class.endpoint}/{name}", "namespace": self.namespace, } if self.api_obj_class.base: kwargs["base"] = self.api_obj_class.base if self.api_obj_class.version: kwargs["version"] = self.api_obj_class.version r = self.api.get(**kwargs) if not r.ok: if r.status_code == 404: raise ObjectDoesNotExist(f"{name} does not exist.") self.api.raise_for_status(r) return self.api_obj_class(self.api, r.json())
[docs] def get(self, *args, **kwargs): """ Get a single object by name, namespace, label, .. """ if "name" in kwargs: return self.get_by_name(kwargs["name"]) clone = self.filter(*args, **kwargs) num = len(clone) if num == 1: return clone.query_cache["objects"][0] if not num: raise ObjectDoesNotExist("get() returned zero objects") raise ValueError("get() more than one object; use filter")
[docs] def get_or_none(self, *args, **kwargs): """ Get object by name, return None if not found """ try: return self.get(*args, **kwargs) except ObjectDoesNotExist: return None
[docs] def watch(self, since=None, *, params=None): query = self._clone(WatchQuery) query.params = params if since is now: query.resource_version = self.response["metadata"]["resourceVersion"] elif since is not None: query.resource_version = since return query
[docs] def execute(self, **kwargs): kwargs["url"] = self._build_api_url() if self.api_obj_class.base: kwargs["base"] = self.api_obj_class.base if self.api_obj_class.version: kwargs["version"] = self.api_obj_class.version if self.namespace is not None and self.namespace is not all_: kwargs["namespace"] = self.namespace r = self.api.get(**kwargs) r.raise_for_status() return r
[docs] def as_table(self) -> Table: """ Execute query and return result as Table (similar to what kubectl does) See https://kubernetes.io/docs/reference/using-api/api-concepts/#receiving-resources-as-tables """ response = self.execute( headers={"Accept": "application/json;as=Table;v=v1beta1;g=meta.k8s.io"} ) return Table(self.api_obj_class, response.json())
[docs] def iterator(self): """ Execute the API request and return an iterator over the objects. This method does not use the query cache. """ for obj in self.execute().json().get("items") or []: yield self.api_obj_class(self.api, obj)
@property def query_cache(self): if not hasattr(self, "_query_cache"): cache = {"objects": []} cache["response"] = self.execute().json() for obj in cache["response"].get("items") or []: cache["objects"].append(self.api_obj_class(self.api, obj)) self._query_cache = cache return self._query_cache def __len__(self): return len(self.query_cache["objects"]) def __iter__(self): return iter(self.query_cache["objects"]) @property def response(self): return self.query_cache["response"]
[docs]class WatchQuery(BaseQuery): def __init__(self, *args, **kwargs): self.resource_version = kwargs.pop("resource_version", None) self.params = None super(WatchQuery, self).__init__(*args, **kwargs) self._response = None
[docs] def object_stream(self): params = dict(self.params or {}) # shallow clone for local use params["watch"] = "true" if self.resource_version is not None: params["resourceVersion"] = self.resource_version kwargs = {"url": self._build_api_url(params=params), "stream": True} if self.namespace is not all_: kwargs["namespace"] = self.namespace if self.api_obj_class.version: kwargs["version"] = self.api_obj_class.version r = self.api.get(**kwargs) self.api.raise_for_status(r) self._response = r WatchEvent = namedtuple("WatchEvent", "type object") for line in r.iter_lines(): we = json.loads(line.decode("utf-8")) yield WatchEvent( type=we["type"], object=self.api_obj_class(self.api, we["object"]) )
def __iter__(self): return iter(self.object_stream()) @property def response(self): return self._response
[docs]def as_selector(value: Union[str, dict]) -> str: if isinstance(value, str): return value s = [] for k, v in value.items(): bits = k.split("__") assert len(bits) <= 2, "too many __ in selector" if len(bits) == 1: label = bits[0] op = "eq" else: label = bits[0] op = bits[1] # map operator to selector if op == "eq": s.append(f"{label}={v}") elif op == "neq": s.append(f"{label} != {v}") elif op == "in": s.append("{} in ({})".format(label, ",".join(sorted(v)))) elif op == "notin": s.append("{} notin ({})".format(label, ",".join(sorted(v)))) else: raise ValueError(f"{op} is not a valid comparison operator") return ",".join(s)