import json
from collections import namedtuple
from typing import Union
from urllib.parse import urlencode
from .exceptions import ObjectDoesNotExist
from .http import HTTPClient
all_ = object()
everything = object()
now = object()
[docs]class Table:
"""
Tabular resource representation
See https://kubernetes.io/docs/reference/using-api/api-concepts/#receiving-resources-as-tables
"""
def __init__(self, api_obj_class, obj: dict):
assert obj["kind"] == "Table"
self.api_obj_class = api_obj_class
self.obj = obj
def __repr__(self) -> str:
return "<Table of {kind} at {address}>".format(
kind=self.api_obj_class.kind, address=hex(id(self))
)
@property
def columns(self):
return self.obj["columnDefinitions"]
@property
def rows(self):
return self.obj["rows"]
[docs]class BaseQuery:
def __init__(self, api: HTTPClient, api_obj_class, namespace: str = None):
self.api = api
self.api_obj_class = api_obj_class
self.namespace = namespace
self.selector = everything
self.field_selector = everything
def __repr__(self) -> str:
return "<Query of {kind} at {address}>".format(
kind=self.api_obj_class.kind, address=hex(id(self))
)
[docs] def all(self) -> "BaseQuery":
return self._clone()
[docs] def filter(
self,
namespace: str = None,
selector: Union[str, dict] = None,
field_selector: Union[str, dict] = None,
) -> "BaseQuery":
"""
Filter objects by namespace, labels, or fields
:param namespace: Namespace to filter by (pass pykube.all to get objects in all namespaces)
:param selector: Label selector, can be a dictionary of label names/values
"""
clone = self._clone()
if namespace is not None:
clone.namespace = namespace
if selector is not None:
clone.selector = selector
if field_selector is not None:
clone.field_selector = field_selector
return clone
def _clone(self, cls=None):
if cls is None:
cls = self.__class__
clone = cls(self.api, self.api_obj_class, namespace=self.namespace)
clone.selector = self.selector
clone.field_selector = self.field_selector
return clone
def _build_api_url(self, params: dict = None):
if params is None:
params = {}
if self.selector is not everything:
params["labelSelector"] = as_selector(self.selector) # type: ignore
if self.field_selector is not everything:
params["fieldSelector"] = as_selector(self.field_selector) # type: ignore
query_string = urlencode(params)
return "{}{}".format(
self.api_obj_class.endpoint,
f"?{query_string}" if query_string else "",
)
[docs]class Query(BaseQuery):
[docs] def get_by_name(self, name: str):
"""
Get object by name, raises ObjectDoesNotExist if not found
"""
kwargs = {
"url": f"{self.api_obj_class.endpoint}/{name}",
"namespace": self.namespace,
}
if self.api_obj_class.base:
kwargs["base"] = self.api_obj_class.base
if self.api_obj_class.version:
kwargs["version"] = self.api_obj_class.version
r = self.api.get(**kwargs)
if not r.ok:
if r.status_code == 404:
raise ObjectDoesNotExist(f"{name} does not exist.")
self.api.raise_for_status(r)
return self.api_obj_class(self.api, r.json())
[docs] def get(self, *args, **kwargs):
"""
Get a single object by name, namespace, label, ..
"""
if "name" in kwargs:
return self.get_by_name(kwargs["name"])
clone = self.filter(*args, **kwargs)
num = len(clone)
if num == 1:
return clone.query_cache["objects"][0]
if not num:
raise ObjectDoesNotExist("get() returned zero objects")
raise ValueError("get() more than one object; use filter")
[docs] def get_or_none(self, *args, **kwargs):
"""
Get object by name, return None if not found
"""
try:
return self.get(*args, **kwargs)
except ObjectDoesNotExist:
return None
[docs] def watch(self, since=None, *, params=None):
query = self._clone(WatchQuery)
query.params = params
if since is now:
query.resource_version = self.response["metadata"]["resourceVersion"]
elif since is not None:
query.resource_version = since
return query
[docs] def execute(self, **kwargs):
kwargs["url"] = self._build_api_url()
if self.api_obj_class.base:
kwargs["base"] = self.api_obj_class.base
if self.api_obj_class.version:
kwargs["version"] = self.api_obj_class.version
if self.namespace is not None and self.namespace is not all_:
kwargs["namespace"] = self.namespace
r = self.api.get(**kwargs)
r.raise_for_status()
return r
[docs] def as_table(self) -> Table:
"""
Execute query and return result as Table (similar to what kubectl does)
See https://kubernetes.io/docs/reference/using-api/api-concepts/#receiving-resources-as-tables
"""
response = self.execute(
headers={"Accept": "application/json;as=Table;v=v1beta1;g=meta.k8s.io"}
)
return Table(self.api_obj_class, response.json())
[docs] def iterator(self):
"""
Execute the API request and return an iterator over the objects. This
method does not use the query cache.
"""
for obj in self.execute().json().get("items") or []:
yield self.api_obj_class(self.api, obj)
@property
def query_cache(self):
if not hasattr(self, "_query_cache"):
cache = {"objects": []}
cache["response"] = self.execute().json()
for obj in cache["response"].get("items") or []:
cache["objects"].append(self.api_obj_class(self.api, obj))
self._query_cache = cache
return self._query_cache
def __len__(self):
return len(self.query_cache["objects"])
def __iter__(self):
return iter(self.query_cache["objects"])
@property
def response(self):
return self.query_cache["response"]
[docs]class WatchQuery(BaseQuery):
def __init__(self, *args, **kwargs):
self.resource_version = kwargs.pop("resource_version", None)
self.params = None
super(WatchQuery, self).__init__(*args, **kwargs)
self._response = None
[docs] def object_stream(self):
params = dict(self.params or {}) # shallow clone for local use
params["watch"] = "true"
if self.resource_version is not None:
params["resourceVersion"] = self.resource_version
kwargs = {"url": self._build_api_url(params=params), "stream": True}
if self.namespace is not all_:
kwargs["namespace"] = self.namespace
if self.api_obj_class.version:
kwargs["version"] = self.api_obj_class.version
r = self.api.get(**kwargs)
self.api.raise_for_status(r)
self._response = r
WatchEvent = namedtuple("WatchEvent", "type object")
for line in r.iter_lines():
we = json.loads(line.decode("utf-8"))
yield WatchEvent(
type=we["type"], object=self.api_obj_class(self.api, we["object"])
)
def __iter__(self):
return iter(self.object_stream())
@property
def response(self):
return self._response
[docs]def as_selector(value: Union[str, dict]) -> str:
if isinstance(value, str):
return value
s = []
for k, v in value.items():
bits = k.split("__")
assert len(bits) <= 2, "too many __ in selector"
if len(bits) == 1:
label = bits[0]
op = "eq"
else:
label = bits[0]
op = bits[1]
# map operator to selector
if op == "eq":
s.append(f"{label}={v}")
elif op == "neq":
s.append(f"{label} != {v}")
elif op == "in":
s.append("{} in ({})".format(label, ",".join(sorted(v))))
elif op == "notin":
s.append("{} notin ({})".format(label, ",".join(sorted(v))))
else:
raise ValueError(f"{op} is not a valid comparison operator")
return ",".join(s)