2222import logging
2323
2424from base64 import b64decode , b64encode
25- from .validation import is_empty_arg
2625
26+ from openserverless .common .utils import join_host_port
2727from openserverless .config .app_config import AppConfig
2828from openserverless .error .config_exception import ConfigException
2929
3030SERVICE_HOST_ENV_NAME = "KUBERNETES_SERVICE_HOST"
3131SERVICE_PORT_ENV_NAME = "KUBERNETES_SERVICE_PORT"
3232SERVICE_TOKEN_FILENAME = "/var/run/secrets/kubernetes.io/serviceaccount/token"
3333SERVICE_CERT_FILENAME = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
34-
35-
36- def _join_host_port (host , port ):
37- template = "%s:%s"
38- host_requires_bracketing = ":" in host or "%" in host
39- if host_requires_bracketing :
40- template = "[%s]:%s"
41- return template % (host , port )
42-
43-
4434class KubeApiClient :
4535
4636 def __init__ (self , environ = os .environ ):
@@ -50,11 +40,6 @@ def __init__(self, environ=os.environ):
5040 self ._load_incluster_config ()
5141
5242 def _parse_b64 (self , encoded_str ):
53- """
54- Decode b64 encoded string
55- param: encoded_str a Base64 encoded string
56- return: decoded string
57- """
5843 try :
5944 return b64decode (encoded_str ).decode ()
6045 except :
@@ -73,7 +58,7 @@ def _load_incluster_config(self):
7358 ):
7459 raise ConfigException ("Service host/port is not set." )
7560
76- self .host = "https://" + _join_host_port (
61+ self .host = "https://" + join_host_port (
7762 self ._environ .get (SERVICE_HOST_ENV_NAME ),
7863 self ._environ .get (SERVICE_PORT_ENV_NAME ),
7964 )
@@ -254,7 +239,13 @@ def get_config_map(self, cm_name, namespace="nuvolaris"):
254239 return None
255240
256241 def post_config_map (self , cm_name , file_or_dir , namespace = "nuvolaris" ):
257-
242+ """
243+ Create a ConfigMap from a file or directory.
244+ :param cm_name: Name of the ConfigMap.
245+ :param file_or_dir: Path to the file or directory containing the data.
246+ :param namespace: Namespace where the ConfigMap will be created.
247+ :return: The created ConfigMap or None if failed.
248+ """
258249 if not os .path .exists (file_or_dir ):
259250 raise ConfigException (f"File or directory { file_or_dir } does not exist." )
260251
@@ -301,6 +292,12 @@ def post_config_map(self, cm_name, file_or_dir, namespace="nuvolaris"):
301292 return None
302293
303294 def delete_config_map (self , cm_name , namespace = "nuvolaris" ):
295+ """
296+ Delete a ConfigMap by name.
297+ :param cm_name: Name of the ConfigMap to delete.
298+ :param namespace: Namespace where the ConfigMap is located.
299+ :return: True if deletion was successful, False otherwise.
300+ """
304301 url = f"{ self .host } /api/v1/namespaces/{ namespace } /configmaps/{ cm_name } "
305302 headers = {"Authorization" : self .token }
306303
@@ -416,8 +413,14 @@ def delete_secret(self, secret_name, namespace="nuvolaris"):
416413 logging .error (f"delete_secret { ex } " )
417414 return False
418415
419- # --- CREA JOB ---
420- def post_job (self , job_name , job_manifest , namespace = "nuvolaris" ):
416+ def post_job (self , job_name , job_manifest , namespace = "nuvolaris" ):
417+ """
418+ Create a Kubernetes job.
419+ :param job_name: Name of the job.
420+ :param job_manifest: Dictionary containing the job manifest.
421+ :param namespace: Namespace where the job will be created.
422+ :return: The created job or None if failed.
423+ """
421424 url = f"{ self .host } /apis/batch/v1/namespaces/{ namespace } /jobs"
422425 headers = {"Authorization" : self .token }
423426 try :
@@ -437,8 +440,13 @@ def post_job(self, job_name, job_manifest, namespace="nuvolaris"):
437440 logging .error (f"post_job { ex } " )
438441 return None
439442
440- # --- OTTIENI POD ---
441443 def get_pod_by_job_name (self , job_name , namespace = "nuvolaris" ):
444+ """
445+ Get the pod name associated with a job by its name.
446+ :param job_name: Name of the job.
447+ :param namespace: Namespace where the job is located.
448+ :return: The pod name if found, None otherwise.
449+ """
442450 url = f"{ self .host } /api/v1/namespaces/{ namespace } /pods"
443451 headers = {"Authorization" : self .token }
444452 try :
@@ -466,17 +474,26 @@ def get_pod_by_job_name(self, job_name, namespace="nuvolaris"):
466474 logging .error (f"get_pod_by_job_name { ex } " )
467475 return None
468476
469- # --- LEGGI LOG POD ---
470477 def stream_pod_logs (self , pod_name , namespace = "nuvolaris" ):
478+ """
479+ Stream logs from a specific pod.
480+ :param pod_name: Name of the pod to stream logs from.
481+ :param namespace: Namespace where the pod is located.
482+ """
471483 url = f"{ self .host } /api/v1/namespaces/{ namespace } /pods/{ pod_name } /log?follow=true"
472484 headers = {"Authorization" : self .token }
473485 with req .get (url , headers = headers , verify = self .ssl_ca_cert , stream = True ) as r :
474486 for line in r .iter_lines ():
475487 if line :
476488 print (line .decode ())
477489
478- # --- CHECK STATUS JOB ---
479490 def check_job_status (self , job_name , namespace = "nuvolaris" ):
491+ """
492+ Check the status of a job by its name.
493+ :param job_name: Name of the job to check.
494+ :param namespace: Namespace where the job is located.
495+ :return: True if the job has succeeded, False otherwise.
496+ """
480497 url = f"{ self .host } /apis/batch/v1/namespaces/{ namespace } /jobs/{ job_name } "
481498 headers = {"Authorization" : self .token }
482499 try :
0 commit comments