Package airflow_rest_api
Expand source code
# -*- coding: utf-8 -*-
__author__ = "Michael R. Kisel"
__license__ = "MIT"
__version__ = "0.0.0"
__maintainer__ = "Michael R. Kisel"
__email__ = "deploy-me@yandex.ru"
__status__ = "Stable"
__all__ = (
"AirflowRestApi",
"TemplateRenderError",
"TemplateSearchError",
)
from airflow_rest_api.api import AirflowRestApi
from airflow_rest_api.exceptions import (
TemplateRenderError,
TemplateSearchError
)
Sub-modules
airflow_rest_api.api
airflow_rest_api.data_classes
airflow_rest_api.exceptions
airflow_rest_api.templates
airflow_rest_api.test_api
airflow_rest_api.test_data_classes
airflow_rest_api.test_templates
airflow_rest_api.tests_fixtures
airflow_rest_api.utils
Classes
class AirflowRestApi (airflow_host, airflow_api='/api/v1')
-
API interface for Airflow REST API
Init params
Parameters
airflow_host
:str
- Airflow host
airflow_api
:str
, optional- Airflow api path, by default "/api/v1"
Expand source code
class AirflowRestApi: """API interface for Airflow REST API """ def __init__(self, airflow_host, airflow_api="/api/v1"): """Init params Parameters ---------- airflow_host : str Airflow host airflow_api : str, optional Airflow api path, by default "/api/v1" """ self.__templates_repository = TemplatesRepository() self.default_kwargs = { "airflow_host": airflow_host, "airflow_api": airflow_api} def __repr__(self): """Return AirflowRestApi representation Returns ------- str AirflowRestApi representation """ return f"{self}(templates={self.templates})" def __str__(self): """Str method Returns ------- str String representation """ return self.__class__.__name__ @property def templates(self): """Return set of templates identifieres Returns ------- set Templates identifieres """ return self.__templates_repository.ids def render_url(self, template_id, **template_vars): """Render url Parameters ---------- template_id : int Template's identifier Returns ------- str Rendered url """ return self.__templates_repository.render_template( template_id, **{**self.default_kwargs, **template_vars} ) def find_template(self, search_pattern): """Find templates by pattern Parameters ---------- search_pattern : str Regex or usual string Returns ------- generator Templates generator """ return self.__templates_repository.find_template_id(search_pattern) def get_template(self, template_id): """Get template by template identifier Parameters ---------- template_id : int Template's identifier Returns ------- Template Template dataclass """ return self.__templates_repository.get_template(template_id) def get_template_default_method(self, template_id): """Get template's HTTP method Parameters ---------- template_id : int Template's identifier Returns ------- str HTTP method """ return self.__templates_repository.get_template(template_id).methods[0] def show_templates(self): """Print markdown table with templates """ print(self.__templates_repository.df_md) async def execute(self, session, url=None, method=None, template_id=None, **kwargs): """Call requested uri Parameters ---------- session : aiohttp.ClientSession Client session url : str, optional Api url, by default None method : str, optional HTTP method, by default None template_id : int, optional Template's identifier, by default None Returns ------- airflow_rest_api.data_classes.Response Dataclass instance """ if url is None: url = self.render_url(template_id=template_id) if method is None: method = self.get_template(template_id).method async with session.request(method, url, **kwargs) as resp: status = resp.status try: raw = await resp.json() except (ContentTypeError, JSONDecodeError): raw = await resp.text() return Response(status=status, raw=raw)
Instance variables
var templates
-
Return set of templates identifieres
Returns
set
- Templates identifieres
Expand source code
@property def templates(self): """Return set of templates identifieres Returns ------- set Templates identifieres """ return self.__templates_repository.ids
Methods
async def execute(self, session, url=None, method=None, template_id=None, **kwargs)
-
Call requested uri
Parameters
session
:aiohttp.ClientSession
- Client session
url
:str
, optional- Api url, by default None
method
:str
, optional- HTTP method, by default None
template_id
:int
, optional- Template's identifier, by default None
Returns
Response
- Dataclass instance
Expand source code
async def execute(self, session, url=None, method=None, template_id=None, **kwargs): """Call requested uri Parameters ---------- session : aiohttp.ClientSession Client session url : str, optional Api url, by default None method : str, optional HTTP method, by default None template_id : int, optional Template's identifier, by default None Returns ------- airflow_rest_api.data_classes.Response Dataclass instance """ if url is None: url = self.render_url(template_id=template_id) if method is None: method = self.get_template(template_id).method async with session.request(method, url, **kwargs) as resp: status = resp.status try: raw = await resp.json() except (ContentTypeError, JSONDecodeError): raw = await resp.text() return Response(status=status, raw=raw)
def find_template(self, search_pattern)
-
Find templates by pattern
Parameters
search_pattern
:str
- Regex or usual string
Returns
generator
- Templates generator
Expand source code
def find_template(self, search_pattern): """Find templates by pattern Parameters ---------- search_pattern : str Regex or usual string Returns ------- generator Templates generator """ return self.__templates_repository.find_template_id(search_pattern)
def get_template(self, template_id)
-
Get template by template identifier
Parameters
template_id
:int
- Template's identifier
Returns
Template
- Template dataclass
Expand source code
def get_template(self, template_id): """Get template by template identifier Parameters ---------- template_id : int Template's identifier Returns ------- Template Template dataclass """ return self.__templates_repository.get_template(template_id)
def get_template_default_method(self, template_id)
-
Get template's HTTP method
Parameters
template_id
:int
- Template's identifier
Returns
str
- HTTP method
Expand source code
def get_template_default_method(self, template_id): """Get template's HTTP method Parameters ---------- template_id : int Template's identifier Returns ------- str HTTP method """ return self.__templates_repository.get_template(template_id).methods[0]
def render_url(self, template_id, **template_vars)
-
Render url
Parameters
template_id
:int
- Template's identifier
Returns
str
- Rendered url
Expand source code
def render_url(self, template_id, **template_vars): """Render url Parameters ---------- template_id : int Template's identifier Returns ------- str Rendered url """ return self.__templates_repository.render_template( template_id, **{**self.default_kwargs, **template_vars} )
def show_templates(self)
-
Print markdown table with templates
Expand source code
def show_templates(self): """Print markdown table with templates """ print(self.__templates_repository.df_md)
class TemplateRenderError (*args)
-
Template Render Error
Unpack args and call parent method
Expand source code
class TemplateRenderError(TemplatesRepositoryError): """Template Render Error """ def __init__(self, *args): """Unpack args and call parent method """ super().__init__(*args) self.diff = args[0] self.path = args[1] def __str__(self): """Str method Returns ------- str String representation """ return f"You forget set up params: {self.diff} for template: {self.path}"
Ancestors
- airflow_rest_api.exceptions.TemplatesRepositoryError
- builtins.Exception
- builtins.BaseException
class TemplateSearchError (*args)
-
Template Search Error
Unpack args and call parent method
Expand source code
class TemplateSearchError(TemplatesRepositoryError): """Template Search Error """ def __init__(self, *args): """Unpack args and call parent method """ super().__init__(*args) self.template_id = args[0] def __str__(self): """Str method Returns ------- str String representation """ return f"Can't find template with id: {self.template_id}"
Ancestors
- airflow_rest_api.exceptions.TemplatesRepositoryError
- builtins.Exception
- builtins.BaseException