Module airflow_rest_api.data_classes
Expand source code
# -*- coding: utf-8 -*-
import re
from dataclasses import dataclass, asdict, field
from typing import FrozenSet, Dict, List
from pandas import DataFrame
__all__ = ("Template", "Response")
expr_path_variables = re.compile(r"\{([A-Za-z0-9_]+)\}")
@dataclass
class Template:
"""Data class for template
"""
id: int
path: str
method: str
description: str
tag: str
filter_key: str
query_params: List
path_variables: FrozenSet = field(init=False)
def __post_init__(self):
"""Post init attrs
"""
self.path_variables = {*expr_path_variables.findall(self.path)}
def dict(self):
data = asdict(self)
del data["tag"], data["filter_key"], data["query_params"], data["path_variables"]
return data
@dataclass
class Response:
"""Data class wrapper for json response
"""
status: int
raw: Dict
def to_df(self, filter_key=None):
"""Convert dict to dataframe
Parameters
----------
filter_key : str, optional
Key from dict for select, by default None
Returns
-------
pandas.core.frame.DataFrame
Converted dataframe
"""
if self.raw is None:
return
try:
return DataFrame(self.raw[filter_key] if filter_key else self.raw)
except (ValueError, KeyError):
return
Classes
class Response (status: int, raw: Dict[~KT, ~VT])
-
Data class wrapper for json response
Expand source code
class Response: """Data class wrapper for json response """ status: int raw: Dict def to_df(self, filter_key=None): """Convert dict to dataframe Parameters ---------- filter_key : str, optional Key from dict for select, by default None Returns ------- pandas.core.frame.DataFrame Converted dataframe """ if self.raw is None: return try: return DataFrame(self.raw[filter_key] if filter_key else self.raw) except (ValueError, KeyError): return
Class variables
var raw : Dict[~KT, ~VT]
var status : int
Methods
def to_df(self, filter_key=None)
-
Convert dict to dataframe
Parameters
filter_key
:str
, optional- Key from dict for select, by default None
Returns
pandas.core.frame.DataFrame
- Converted dataframe
Expand source code
def to_df(self, filter_key=None): """Convert dict to dataframe Parameters ---------- filter_key : str, optional Key from dict for select, by default None Returns ------- pandas.core.frame.DataFrame Converted dataframe """ if self.raw is None: return try: return DataFrame(self.raw[filter_key] if filter_key else self.raw) except (ValueError, KeyError): return
class Template (id: int, path: str, method: str, description: str, tag: str, filter_key: str, query_params: List[~T])
-
Data class for template
Expand source code
class Template: """Data class for template """ id: int path: str method: str description: str tag: str filter_key: str query_params: List path_variables: FrozenSet = field(init=False) def __post_init__(self): """Post init attrs """ self.path_variables = {*expr_path_variables.findall(self.path)} def dict(self): data = asdict(self) del data["tag"], data["filter_key"], data["query_params"], data["path_variables"] return data
Class variables
var description : str
var filter_key : str
var id : int
var method : str
var path : str
var path_variables : FrozenSet[+T_co]
var query_params : List[~T]
var tag : str
Methods
def dict(self)
-
Expand source code
def dict(self): data = asdict(self) del data["tag"], data["filter_key"], data["query_params"], data["path_variables"] return data