Module keboola.component.dao

Functions

def build_dataclass_from_dict(data_class, dict_value)
Expand source code
def build_dataclass_from_dict(data_class, dict_value):
    """
    Convenience method building specified dataclass from a dictionary

    Args:
        data_class:
        dict_value:

    Returns: dataclass of specified type

    """
    field_names = set(f.name for f in dataclasses.fields(data_class))
    return data_class(**{k: v for k, v in dict_value.items() if k in field_names})

Convenience method building specified dataclass from a dictionary

Args

data_class: dict_value: Returns: dataclass of specified type

Classes

class BaseType (dtype: SupportedDataTypes = SupportedDataTypes.STRING,
length: Optional[str] = None,
default: Optional[str] = None)
Expand source code
class BaseType(dict):
    def __init__(self, dtype: SupportedDataTypes = SupportedDataTypes.STRING, length: Optional[str] = None,
                 default: Optional[str] = None):
        super().__init__(base=DataType(dtype=dtype, length=length, default=default))

    @classmethod
    def string(cls, length: Optional[str] = None, default: Optional[str] = None) -> 'BaseType':
        return BaseType(dtype=SupportedDataTypes.STRING, length=length, default=default)

    @classmethod
    def integer(cls, length: Optional[str] = None,
                default: Optional[str] = None) -> 'BaseType':
        return BaseType(dtype=SupportedDataTypes.INTEGER, length=length, default=default)

    @classmethod
    def numeric(cls, length: Optional[str] = None,
                default: Optional[str] = None) -> 'BaseType':
        return BaseType(dtype=SupportedDataTypes.NUMERIC, length=length, default=default)

    @classmethod
    def float(cls, length: Optional[str] = None,
              default: Optional[str] = None) -> 'BaseType':
        return BaseType(dtype=SupportedDataTypes.FLOAT, length=length, default=default)

    @classmethod
    def boolean(cls, default: Optional[str] = None) -> 'BaseType':
        return BaseType(dtype=SupportedDataTypes.BOOLEAN, default=default)

    @classmethod
    def date(cls, default: Optional[str] = None) -> 'BaseType':
        return BaseType(dtype=SupportedDataTypes.DATE, default=default)

    @classmethod
    def timestamp(cls, default: Optional[str] = None) -> 'BaseType':
        return BaseType(dtype=SupportedDataTypes.TIMESTAMP, default=default)

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Ancestors

  • builtins.dict

Static methods

def boolean(default: Optional[str] = None) ‑> BaseType
def date(default: Optional[str] = None) ‑> BaseType
def float(length: Optional[str] = None, default: Optional[str] = None) ‑> BaseType
def integer(length: Optional[str] = None, default: Optional[str] = None) ‑> BaseType
def numeric(length: Optional[str] = None, default: Optional[str] = None) ‑> BaseType
def string(length: Optional[str] = None, default: Optional[str] = None) ‑> BaseType
def timestamp(default: Optional[str] = None) ‑> BaseType
class ColumnDefinition (data_types: Optional[Union[Dict[str, DataType], BaseType]] = <factory>,
nullable: Optional[bool] = True,
primary_key: Optional[bool] = False,
description: Optional[str] = None,
metadata: Optional[Dict[str, str]] = None)
Expand source code
@dataclass
class ColumnDefinition:
    """
    Represents the definition of a column within a table schema.

    Attributes:
        data_types (Optional[Union[Dict[str, DataType], BaseType]]): Data types of the column for specified backend.
        This can be a specific `DataType` or a `BaseType`, or a dictionary mapping from a string to one of these types.
        Defaults to BaseType.String.
        nullable (Optional[bool]): A flag indicating if the column can contain NULL values. Defaults to True.
        primary_key (Optional[bool]): Indicating if the column is part of the table's primary key. Defaults to False.
        description (Optional[str]): A description of the column's purpose or contents. Defaults to None.
        metadata (Optional[Dict[str, str]]): Additional metadata associated with the column. Defaults to None.
    """
    data_types: Optional[Union[Dict[str, DataType], BaseType]] = field(default_factory=lambda: BaseType())
    nullable: Optional[bool] = True
    primary_key: Optional[bool] = False
    description: Optional[str] = None
    metadata: Optional[Dict[str, str]] = None

    def update_properties(self, **kwargs):
        for key, value in kwargs.items():
            if hasattr(self, key):
                setattr(self, key, value)
            else:
                raise AttributeError(f"{key} is not a valid attribute of {self.__class__.__name__}")

    def from_dict(self, col: dict):
        return ColumnDefinition(
            data_types={key: DataType(dtype=v.get('type'), default=v.get('default'), length=v.get('length'))
                        for key, v in col.get('data_type', {}).items()},
            nullable=col.get('nullable'),
            primary_key=col.get('primary_key'),
            description=col.get('description'),
            metadata=col.get('metadata'))

    def add_datatype(self, backend: str, data_type: DataType):
        if backend in self.data_types:
            raise ValueError(f"Data type for backend {backend} already exists, use update_datatype instead")
        self.data_types[backend] = data_type

    def update_datatype(self, backend: str, data_type: DataType):
        if backend not in self.data_types:
            raise ValueError(f"Data type for backend {backend} does not exist, use add_datatype instead")
        self.data_types[backend] = data_type

    def to_dict(self, name: str):
        # convert datatypes to dict
        datatypes_dict = {}
        for key, value in self.data_types.items():
            datatypes_dict[key] = dataclasses.asdict(value)

        datatypes_dict = {key: {k.replace('dtype', 'type'): v for k, v in value.items()}
                          for key, value in datatypes_dict.items()}

        result = {
            'name': name,
            'data_type': datatypes_dict,
            'nullable': self.nullable,
            'primary_key': self.primary_key,
            'description': self.description,
            'metadata': self.metadata
        }
        # TODO: tohle bych delal az pri zapisu manifestu celkove, chceme vyhodit None values, false nechat
        filtered = {k: v for k, v in result.items() if v not in [False]}

        return filtered

Represents the definition of a column within a table schema.

Attributes

data_types : Optional[Union[Dict[str, DataType], BaseType]]
Data types of the column for specified backend.
This can be a specific DataType or a BaseType, or a dictionary mapping from a string to one of these types.
Defaults to BaseType.String.
nullable : Optional[bool]
A flag indicating if the column can contain NULL values. Defaults to True.
primary_key : Optional[bool]
Indicating if the column is part of the table's primary key. Defaults to False.
description : Optional[str]
A description of the column's purpose or contents. Defaults to None.
metadata : Optional[Dict[str, str]]
Additional metadata associated with the column. Defaults to None.

Class variables

var data_types : Dict[str, DataType] | BaseType | None
var description : str | None
var metadata : Dict[str, str] | None
var nullable : bool | None
var primary_key : bool | None

Methods

def add_datatype(self,
backend: str,
data_type: DataType)
Expand source code
def add_datatype(self, backend: str, data_type: DataType):
    if backend in self.data_types:
        raise ValueError(f"Data type for backend {backend} already exists, use update_datatype instead")
    self.data_types[backend] = data_type
def from_dict(self, col: dict)
Expand source code
def from_dict(self, col: dict):
    return ColumnDefinition(
        data_types={key: DataType(dtype=v.get('type'), default=v.get('default'), length=v.get('length'))
                    for key, v in col.get('data_type', {}).items()},
        nullable=col.get('nullable'),
        primary_key=col.get('primary_key'),
        description=col.get('description'),
        metadata=col.get('metadata'))
def to_dict(self, name: str)
Expand source code
def to_dict(self, name: str):
    # convert datatypes to dict
    datatypes_dict = {}
    for key, value in self.data_types.items():
        datatypes_dict[key] = dataclasses.asdict(value)

    datatypes_dict = {key: {k.replace('dtype', 'type'): v for k, v in value.items()}
                      for key, value in datatypes_dict.items()}

    result = {
        'name': name,
        'data_type': datatypes_dict,
        'nullable': self.nullable,
        'primary_key': self.primary_key,
        'description': self.description,
        'metadata': self.metadata
    }
    # TODO: tohle bych delal az pri zapisu manifestu celkove, chceme vyhodit None values, false nechat
    filtered = {k: v for k, v in result.items() if v not in [False]}

    return filtered
def update_datatype(self,
backend: str,
data_type: DataType)
Expand source code
def update_datatype(self, backend: str, data_type: DataType):
    if backend not in self.data_types:
        raise ValueError(f"Data type for backend {backend} does not exist, use add_datatype instead")
    self.data_types[backend] = data_type
def update_properties(self, **kwargs)
Expand source code
def update_properties(self, **kwargs):
    for key, value in kwargs.items():
        if hasattr(self, key):
            setattr(self, key, value)
        else:
            raise AttributeError(f"{key} is not a valid attribute of {self.__class__.__name__}")
class DataType (dtype: str, length: Optional[str] = None, default: Optional[str] = None)
Expand source code
@dataclass
class DataType:
    dtype: str
    length: Optional[str] = None
    default: Optional[str] = None

    def __post_init__(self):
        if isinstance(self.dtype, SupportedDataTypes):
            self.dtype = self.dtype.value

DataType(dtype: 'str', length: 'Optional[str]' = None, default: 'Optional[str]' = None)

Class variables

var default : str | None
var dtype : str
var length : str | None
class EnvironmentVariables (data_dir: str,
run_id: str,
project_id: str,
stack_id: str,
config_id: str,
component_id: str,
config_row_id: str,
branch_id: str,
staging_file_provider: str,
project_name: str,
token_id: str,
token_desc: str,
token: str,
url: str,
real_user: str,
logger_addr: str,
logger_port: str,
data_type_support: str,
project_features: str)
Expand source code
@dataclass
class EnvironmentVariables:
    """
    Dataclass for variables available in the docker environment
    https://developers.keboola.com/extend/common-interface/environment/#environment-variables
    """
    data_dir: str
    run_id: str
    project_id: str
    stack_id: str
    config_id: str
    component_id: str
    config_row_id: str
    branch_id: str
    staging_file_provider: str

    project_name: str
    token_id: str
    token_desc: str
    token: str
    url: str
    real_user: str

    logger_addr: str
    logger_port: str

    data_type_support: str
    project_features: str

Dataclass for variables available in the docker environment https://developers.keboola.com/extend/common-interface/environment/#environment-variables

Class variables

var branch_id : str
var component_id : str
var config_id : str
var config_row_id : str
var data_dir : str
var data_type_support : str
var logger_addr : str
var logger_port : str
var project_features : str
var project_id : str
var project_name : str
var real_user : str
var run_id : str
var stack_id : str
var staging_file_provider : str
var token : str
var token_desc : str
var token_id : str
var url : str
class FileDefinition (full_path: str,
stage: Optional[str] = 'out',
tags: Optional[List[str]] = None,
is_public: Optional[bool] = False,
is_permanent: Optional[bool] = False,
is_encrypted: Optional[bool] = False,
notify: Optional[bool] = False,
id: Optional[str] = None,
s3: Optional[dict] = None,
abs: Optional[dict] = None,
created: Optional[str] = None,
size_bytes: Optional[int] = None,
max_age_days: Optional[int] = None)
Expand source code
class FileDefinition(IODefinition):
    """
    File definition class. It is used as a container for `{in/out}/files/` files.
    It is a representation of input/output [manifest objects](
    https://developers.keboola.com/extend/common-interface/manifest-files/#files).

    Also, it is useful when collecting results and building export configs.



    To create the FileDefinition directly from the manifest there is a factory build method:

    ```python
    from keboola.component import CommonInterface
    from keboola.component import dao

    table_def = dao.FileDefinition.build_from_manifest('in/files/file.jpg.manifest')


    ```


    Attributes:
        name: File name.
        full_path (str): (optional) Full path of the file.
        tags (list):
                List of tags that are assigned to this file
        is_public: When true, the file URL will be permanent and publicly accessible.
        is_permanent: Keeps a file forever. If false, the file will be deleted after default period of time (e.g.
            15 days)
        is_encrypted: If true, the file content will be encrypted in the storage.
        notify: Notifies project administrators that a file was uploaded.

    """
    SYSTEM_TAG_PREFIXES = ['componentId:',
                           'configurationId:',
                           'configurationRowId:',
                           'runId:',
                           'branchId:']

    OUTPUT_MANIFEST_KEYS = ["tags",
                            "is_public",
                            "is_permanent",
                            "is_encrypted",
                            "notify"]

    def __init__(self, full_path: str,
                 stage: Optional[str] = 'out',
                 tags: Optional[List[str]] = None,
                 is_public: Optional[bool] = False,
                 is_permanent: Optional[bool] = False,
                 is_encrypted: Optional[bool] = False,
                 notify: Optional[bool] = False,
                 id: Optional[str] = None,
                 s3: Optional[dict] = None,
                 abs: Optional[dict] = None,
                 created: Optional[str] = None,
                 size_bytes: Optional[int] = None,
                 max_age_days: Optional[int] = None
                 ):
        """

        Args:
            full_path (str): Full path of the file.
            stage (str): Storage Stage 'in' or 'out' default out
            tags (list):
                List of tags that are assigned to this file
            is_public: When true, the file URL will be permanent and publicly accessible.
            is_permanent: Keeps a file forever. If false, the file will be deleted after default period of time (e.g.
            15 days)
            is_encrypted: If true, the file content will be encrypted in the storage.
            notify: Notifies project administrators that a file was uploaded.
        """
        super().__init__(full_path)

        self.stage = stage

        self.tags = tags
        self.is_public = is_public
        self.is_permanent = is_permanent
        self.is_encrypted = is_encrypted
        self.notify = notify

        # input
        self._id = id
        self._s3 = s3
        self._abs = abs
        self._created = created
        self._size_bytes = size_bytes
        self._max_age_days = max_age_days

    @classmethod
    def build_output_definition(cls,
                                full_path: str,
                                tags: Optional[List[str]] = None,
                                is_public: Optional[bool] = False,
                                is_permanent: Optional[bool] = False,
                                is_encrypted: Optional[bool] = False,
                                notify: Optional[bool] = False):
        """
        Factory method to create an instance of FileDefinition for output files.

        This method initializes a FileDefinition object with properties specific to output files,
        including file path, tags, and various flags indicating the file's accessibility, permanence, encryption status,
        and whether project administrators should be notified upon file upload.

        Args:
            full_path (str): The full path where the file is or will be stored.
            tags (Optional[List[str]]): A list of tags associated with the file. Defaults to None.
            is_public (Optional[bool]): Flag indicating if the file URL will be permanent and publicly accessible. Defaults to False.  # noqa
            is_permanent (Optional[bool]): Flag indicating if the file should be kept forever. Defaults to False.
            is_encrypted (Optional[bool]): Flag indicating if the file content will be encrypted in storage. Defaults to False.  # noqa
            notify (Optional[bool]): Flag indicating if project administrators should be notified that a file was uploaded. Defaults to False.  # noqa

        Returns:
            An instance of FileDefinition configured for output files.
        """
        return cls(full_path=full_path, stage="out", tags=tags, is_public=is_public, is_permanent=is_permanent,
                   is_encrypted=is_encrypted, notify=notify)

    @classmethod
    def build_input_definition(cls, full_path: str,
                               id: Optional[str] = None,
                               s3: Optional[dict] = None,
                               abs: Optional[dict] = None,
                               created: Optional[str] = None,
                               size_bytes: Optional[int] = None,
                               max_age_days: Optional[int] = None):
        """
        Factory method to create an instance of FileDefinition for input files.

        This method initializes a FileDefinition object with properties specific to input files,
        including the file path, optional metadata such as the file's ID, S3 and ABS storage details,
        creation date, size in bytes, and the maximum age in days before the file is considered expired.

        Args:
            full_path (str): The full path where the file is or will be stored.
            id (Optional[str]): The unique identifier of the file. Defaults to None.
            s3 (Optional[dict]): A dictionary containing Amazon S3 storage details. Defaults to None.
            abs (Optional[dict]): A dictionary containing Azure Blob Storage details. Defaults to None.
            created (Optional[str]): The creation date of the file. Defaults to None.
            size_bytes (Optional[int]): The size of the file in bytes. Defaults to None.
            max_age_days (Optional[int]): The maximum age of the file in days. Defaults to None.

        Returns:
            An instance of FileDefinition configured for input files.
        """
        return cls(full_path=full_path, stage="in", id=id, s3=s3, abs=abs, created=created, size_bytes=size_bytes,
                   max_age_days=max_age_days)

    @classmethod
    def build_from_manifest(cls,
                            manifest_file_path: str
                            ):
        """
        Factory method for FileDefinition from the raw "manifest" path.

        The FileDefinition then validates presence of the manifest counterpart.
        E.g. file.jpg if `file.jpg.manifest` is provided.

        If the counterpart file does not exist a ValueError is raised.


        Args:
            manifest_file_path (str):
                (optional) Full path of the file [manifest](
                https://developers.keboola.com/extend/common-interface/manifest-files/#files)


        """
        manifest = dict()
        if Path(manifest_file_path).exists():
            with open(manifest_file_path) as in_file:
                manifest = json.load(in_file)

        file_path = Path(manifest_file_path.replace('.manifest', ''))

        if not file_path.exists():
            raise ValueError(f'The corresponding file {file_path} does not exist!')

        full_path = str(file_path)

        if manifest.get('id'):
            stage = 'in'
        else:
            stage = 'out'

        file_def = cls(full_path=full_path,
                       stage=stage,
                       tags=manifest.get('tags', []),
                       is_public=manifest.get('is_public', False),
                       is_permanent=manifest.get('is_permanent', False),
                       is_encrypted=manifest.get('is_encrypted', False),
                       id=manifest.get('id', ''),
                       s3=manifest.get('s3'),
                       abs=manifest.get('abs'),
                       created=manifest.get('created'),
                       size_bytes=manifest.get('size_bytes', 0),
                       max_age_days=manifest.get('max_age_days', 0)
                       )

        return file_def

    @classmethod
    def is_system_tag(cls, tag: str) -> bool:
        for prefix in cls.SYSTEM_TAG_PREFIXES:
            if tag.startswith(prefix):
                return True
        return False

    def get_manifest_dictionary(self, manifest_type: Optional[str] = None, legacy_queue: bool = False,
                                legacy_manifest: Optional[bool] = None) -> dict:
        """
        Returns manifest dictionary in appropriate manifest_type: either 'in' or 'out'.
        By default, returns output manifest.
             The result keeps only values that are applicable for
             the selected type of the Manifest file. Because although input and output manifests share most of
             the attributes, some are not shared.

             See [manifest files](https://developers.keboola.com/extend/common-interface/manifest-files)
             for more information.

        Args:
            manifest_type (str): either 'in' or 'out'.
             See [manifest files](https://developers.keboola.com/extend/common-interface/manifest-files)
             for more information.
            legacy_queue (bool): optional flag marking project on legacy queue.(some options are not allowed on queue2)
            legacy_manifest (bool): If True, creates a legacy manifest; otherwise, uses the new format if permitted.

        Returns:
            dict representation of the manifest file in a format expected / produced by the Keboola Connection

        """
        if not manifest_type:
            manifest_type = self.stage

        dictionary = self._filter_attributes_by_manifest_type(manifest_type, legacy_queue, legacy_manifest)

        filtered_dictionary = {k: v for k, v in dictionary.items() if v not in [None, [], {}, ""]}

        return filtered_dictionary

    def _filter_attributes_by_manifest_type(self, manifest_type: Literal["in", "out"], legacy_queue: bool = False,
                                            legacy_manifest: bool = False):
        """
        Filter manifest to contain only supported fields
        Args:
            manifest_type:

        Returns:

        """

        if manifest_type == 'in':

            manifest_dictionary = {
                'id': self.id,
                'created': self.created.strftime('%Y-%m-%dT%H:%M:%S%z') if self.created else None,
                'is_public': self.is_public,
                'is_encrypted': self.is_encrypted,
                'name': self.name,
                'size_bytes': self.size_bytes,
                'tags': self.tags,
                'notify': self.notify,
                'max_age_days': self.max_age_days,
                'is_permanent': self.is_permanent,
            }

        else:
            manifest_dictionary = {
                'is_public': self.is_public,
                'is_permanent': self.is_permanent,
                'is_encrypted': self.is_encrypted,
                'tags': self.tags,
                'notify': self.notify,
            }

        return manifest_dictionary

    @property
    def name(self) -> str:
        """
        File name - excluding the KBC ID if present (`str`, read-only)
        """
        # separate id from name
        file_name = Path(self.full_path).name
        if self._id:
            fsplit = file_name.split('_', 1)
            if len(fsplit) > 1:
                self._id = fsplit[0]
                file_name = fsplit[1]
        return file_name

    @property
    def full_name(self):
        """
        File name - full file name, directly from the path. Includes the KBC generated ID. (`str`, read-only)
        """
        return Path(self.full_path).name

    @property
    def _manifest_attributes(self) -> SupportedManifestAttributes:
        return SupportedManifestAttributes(self.OUTPUT_MANIFEST_KEYS, [])

    # ########### Output manifest properties - R/W

    @property
    def user_tags(self) -> List[str]:
        """
        User defined tags excluding the system tags
        """
        # filter system tags
        tags: List[str] = [tag for tag in self._tags if not self.is_system_tag(tag)]
        return tags

    @property
    def tags(self) -> List[str]:
        """
        All tags specified on the file
        """
        return self._tags

    @tags.setter
    def tags(self, tags: List[str]):
        if tags is None:
            tags = list()
        self._tags = tags

    @property
    def is_public(self) -> bool:
        return self._is_public

    @is_public.setter
    def is_public(self, is_public: bool):
        self._is_public = is_public

    @property
    def is_permanent(self) -> bool:
        return self._is_permanent

    @is_permanent.setter
    def is_permanent(self, is_permanent: bool):
        self._is_permanent = is_permanent

    @property
    def is_encrypted(self) -> bool:
        return self._is_encrypted

    @is_encrypted.setter
    def is_encrypted(self, is_encrypted: bool):
        self._is_encrypted = is_encrypted

    @property
    def notify(self) -> bool:
        return self._notify

    @notify.setter
    def notify(self, notify: bool):
        self._notify = notify

    # ########### Input manifest properties - Read ONLY
    @property
    def id(self) -> str:  # File ID in the KBC Storage (read only input attribute)
        return self._id

    @property
    def created(self) -> Union[datetime, None]:  # Created timestamp  in the KBC Storage (read only input attribute)
        if self._created:
            return datetime.strptime(self._created, KBC_DEFAULT_TIME_FORMAT)
        else:
            return None

    @property
    def size_bytes(self) -> int:  # File size in the KBC Storage (read only input attribute)
        return self._size_bytes

    @property
    def max_age_days(self) -> int:  # File max age (read only input attribute)
        return self._max_age_days

File definition class. It is used as a container for {in/out}/files/ files. It is a representation of input/output manifest objects.

Also, it is useful when collecting results and building export configs.

To create the FileDefinition directly from the manifest there is a factory build method:

from keboola.component import CommonInterface
from keboola.component import dao

table_def = dao.FileDefinition.build_from_manifest('in/files/file.jpg.manifest')


Attributes

name
File name.
full_path : str
(optional) Full path of the file.
tags (list):
List of tags that are assigned to this file
is_public
When true, the file URL will be permanent and publicly accessible.
is_permanent
Keeps a file forever. If false, the file will be deleted after default period of time (e.g. 15 days)
is_encrypted
If true, the file content will be encrypted in the storage.
notify
Notifies project administrators that a file was uploaded.

Args

full_path : str
Full path of the file.
stage : str
Storage Stage 'in' or 'out' default out
tags (list):
List of tags that are assigned to this file
is_public
When true, the file URL will be permanent and publicly accessible.
is_permanent
Keeps a file forever. If false, the file will be deleted after default period of time (e.g.
15 days)
is_encrypted
If true, the file content will be encrypted in the storage.
notify
Notifies project administrators that a file was uploaded.

Ancestors

Class variables

var OUTPUT_MANIFEST_KEYS
var SYSTEM_TAG_PREFIXES

Static methods

def build_from_manifest(manifest_file_path: str)

Factory method for FileDefinition from the raw "manifest" path.

The FileDefinition then validates presence of the manifest counterpart. E.g. file.jpg if file.jpg.manifest is provided.

If the counterpart file does not exist a ValueError is raised.

Args

manifest_file_path (str): (optional) Full path of the file manifest

def build_input_definition(full_path: str,
id: Optional[str] = None,
s3: Optional[dict] = None,
abs: Optional[dict] = None,
created: Optional[str] = None,
size_bytes: Optional[int] = None,
max_age_days: Optional[int] = None)

Factory method to create an instance of FileDefinition for input files.

This method initializes a FileDefinition object with properties specific to input files, including the file path, optional metadata such as the file's ID, S3 and ABS storage details, creation date, size in bytes, and the maximum age in days before the file is considered expired.

Args

full_path : str
The full path where the file is or will be stored.
id : Optional[str]
The unique identifier of the file. Defaults to None.
s3 : Optional[dict]
A dictionary containing Amazon S3 storage details. Defaults to None.
abs : Optional[dict]
A dictionary containing Azure Blob Storage details. Defaults to None.
created : Optional[str]
The creation date of the file. Defaults to None.
size_bytes : Optional[int]
The size of the file in bytes. Defaults to None.
max_age_days : Optional[int]
The maximum age of the file in days. Defaults to None.

Returns

An instance of FileDefinition configured for input files.

def build_output_definition(full_path: str,
tags: Optional[List[str]] = None,
is_public: Optional[bool] = False,
is_permanent: Optional[bool] = False,
is_encrypted: Optional[bool] = False,
notify: Optional[bool] = False)

Factory method to create an instance of FileDefinition for output files.

This method initializes a FileDefinition object with properties specific to output files, including file path, tags, and various flags indicating the file's accessibility, permanence, encryption status, and whether project administrators should be notified upon file upload.

Args

full_path : str
The full path where the file is or will be stored.
tags : Optional[List[str]]
A list of tags associated with the file. Defaults to None.
is_public : Optional[bool]
Flag indicating if the file URL will be permanent and publicly accessible. Defaults to False. # noqa
is_permanent : Optional[bool]
Flag indicating if the file should be kept forever. Defaults to False.
is_encrypted : Optional[bool]
Flag indicating if the file content will be encrypted in storage. Defaults to False. # noqa
notify : Optional[bool]
Flag indicating if project administrators should be notified that a file was uploaded. Defaults to False. # noqa

Returns

An instance of FileDefinition configured for output files.

def is_system_tag(tag: str) ‑> bool

Instance variables

prop created : Union[datetime, None]
Expand source code
@property
def created(self) -> Union[datetime, None]:  # Created timestamp  in the KBC Storage (read only input attribute)
    if self._created:
        return datetime.strptime(self._created, KBC_DEFAULT_TIME_FORMAT)
    else:
        return None
prop full_name
Expand source code
@property
def full_name(self):
    """
    File name - full file name, directly from the path. Includes the KBC generated ID. (`str`, read-only)
    """
    return Path(self.full_path).name

File name - full file name, directly from the path. Includes the KBC generated ID. (str, read-only)

prop id : str
Expand source code
@property
def id(self) -> str:  # File ID in the KBC Storage (read only input attribute)
    return self._id
prop is_encrypted : bool
Expand source code
@property
def is_encrypted(self) -> bool:
    return self._is_encrypted
prop is_permanent : bool
Expand source code
@property
def is_permanent(self) -> bool:
    return self._is_permanent
prop is_public : bool
Expand source code
@property
def is_public(self) -> bool:
    return self._is_public
prop max_age_days : int
Expand source code
@property
def max_age_days(self) -> int:  # File max age (read only input attribute)
    return self._max_age_days
prop notify : bool
Expand source code
@property
def notify(self) -> bool:
    return self._notify
prop size_bytes : int
Expand source code
@property
def size_bytes(self) -> int:  # File size in the KBC Storage (read only input attribute)
    return self._size_bytes
prop tags : List[str]
Expand source code
@property
def tags(self) -> List[str]:
    """
    All tags specified on the file
    """
    return self._tags

All tags specified on the file

prop user_tags : List[str]
Expand source code
@property
def user_tags(self) -> List[str]:
    """
    User defined tags excluding the system tags
    """
    # filter system tags
    tags: List[str] = [tag for tag in self._tags if not self.is_system_tag(tag)]
    return tags

User defined tags excluding the system tags

Methods

def get_manifest_dictionary(self,
manifest_type: Optional[str] = None,
legacy_queue: bool = False,
legacy_manifest: Optional[bool] = None) ‑> dict
Expand source code
def get_manifest_dictionary(self, manifest_type: Optional[str] = None, legacy_queue: bool = False,
                            legacy_manifest: Optional[bool] = None) -> dict:
    """
    Returns manifest dictionary in appropriate manifest_type: either 'in' or 'out'.
    By default, returns output manifest.
         The result keeps only values that are applicable for
         the selected type of the Manifest file. Because although input and output manifests share most of
         the attributes, some are not shared.

         See [manifest files](https://developers.keboola.com/extend/common-interface/manifest-files)
         for more information.

    Args:
        manifest_type (str): either 'in' or 'out'.
         See [manifest files](https://developers.keboola.com/extend/common-interface/manifest-files)
         for more information.
        legacy_queue (bool): optional flag marking project on legacy queue.(some options are not allowed on queue2)
        legacy_manifest (bool): If True, creates a legacy manifest; otherwise, uses the new format if permitted.

    Returns:
        dict representation of the manifest file in a format expected / produced by the Keboola Connection

    """
    if not manifest_type:
        manifest_type = self.stage

    dictionary = self._filter_attributes_by_manifest_type(manifest_type, legacy_queue, legacy_manifest)

    filtered_dictionary = {k: v for k, v in dictionary.items() if v not in [None, [], {}, ""]}

    return filtered_dictionary

Returns manifest dictionary in appropriate manifest_type: either 'in' or 'out'. By default, returns output manifest. The result keeps only values that are applicable for the selected type of the Manifest file. Because although input and output manifests share most of the attributes, some are not shared.

 See [manifest files](https://developers.keboola.com/extend/common-interface/manifest-files)
 for more information.

Args

manifest_type : str
either 'in' or 'out'.
See manifest files
for more information.
legacy_queue : bool
optional flag marking project on legacy queue.(some options are not allowed on queue2)
legacy_manifest : bool
If True, creates a legacy manifest; otherwise, uses the new format if permitted.

Returns

dict representation of the manifest file in a format expected / produced by the Keboola Connection

Inherited members

class FileInputMapping (tags: List[str], query: str = '', filter_by_run_id: bool = False)
Expand source code
@dataclass
class FileInputMapping(SubscriptableDataclass):
    """
    Abstraction of [output mapping definition](
    https://developers.keboola.com/extend/common-interface/config-file/#files) in the config file
    """
    tags: List[str]
    query: str = ''
    filter_by_run_id: bool = False

Abstraction of output mapping definition in the config file

Ancestors

Class variables

var filter_by_run_id : bool
var query : str
var tags : List[str]
class FileOutputMapping (source: str,
is_public: bool = False,
is_permanent: bool = False,
tags: List[str] = <factory>)
Expand source code
@dataclass
class FileOutputMapping(SubscriptableDataclass):
    """
    Abstraction of [output mapping definition](
    https://developers.keboola.com/extend/common-interface/config-file/#files) in the config file
    """
    source: str
    is_public: bool = False
    is_permanent: bool = False
    tags: List[str] = dataclasses.field(default_factory=lambda: [])

Abstraction of output mapping definition in the config file

Ancestors

Class variables

var is_permanent : bool
var is_public : bool
var source : str
var tags : List[str]
class IODefinition (full_path)
Expand source code
class IODefinition(ABC):

    def __init__(self, full_path):
        self.full_path = full_path

    @classmethod
    def build_from_manifest(cls,
                            manifest_file_path: str
                            ):
        raise NotImplementedError

    def _filter_attributes_by_manifest_type(self, manifest_type: Literal["in", "out"], legacy_queue: bool = False,
                                            native_types: bool = False):
        raise NotImplementedError

    def get_manifest_dictionary(self, manifest_type: Optional[str] = None, legacy_queue: bool = False,
                                legacy_manifest: Optional[bool] = None) -> dict:
        raise NotImplementedError

    @property
    def stage(self) -> str:
        """
        Helper property marking the stage of the file. (str)
        """
        return self._stage

    @stage.setter
    def stage(self, stage: str):
        if stage not in ['in', 'out']:
            raise ValueError(f'Invalid stage "{stage}", supported values are: "in", "out"')
        self._stage = stage

    @property
    @abstractmethod
    def _manifest_attributes(self) -> SupportedManifestAttributes:
        """
        Manifest attributes
        """
        return SupportedManifestAttributes([], [])

    @property
    @abstractmethod
    def name(self) -> str:
        """
        File name - excluding the KBC ID if present (`str`, read-only)
        """
        raise NotImplementedError

    # ############ Staging parameters

    @dataclass
    class S3Staging:
        is_sliced: bool
        region: str
        bucket: str
        key: str
        credentials_access_key_id: str
        credentials_secret_access_key: str
        credentials_session_token: str

    @dataclass
    class ABSStaging:
        is_sliced: bool
        region: str
        container: str
        name: str
        credentials_sas_connection_string: str
        credentials_expiration: str

    @property
    def s3_staging(self) -> Union[S3Staging, None]:
        s3 = self._s3
        if s3:
            return IODefinition.S3Staging(is_sliced=s3['isSliced'],
                                          region=s3['region'],
                                          bucket=s3['bucket'],
                                          key=s3['key'],
                                          credentials_access_key_id=s3['credentials']['access_key_id'],
                                          credentials_secret_access_key=s3['credentials']['secret_access_key'],
                                          credentials_session_token=s3['credentials']['session_token']
                                          )
        else:
            return None

    @property
    def abs_staging(self) -> Union[ABSStaging, None]:
        _abs = self._abs
        if _abs:
            return IODefinition.ABSStaging(is_sliced=_abs['is_sliced'],
                                           region=_abs['region'],
                                           container=_abs['container'],
                                           name=_abs['name'],
                                           credentials_sas_connection_string=_abs['credentials'][
                                               'sas_connection_string'],
                                           credentials_expiration=_abs['credentials']['expiration']
                                           )
        else:
            return None

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • abc.ABC

Subclasses

Class variables

var ABSStaging
var S3Staging

Static methods

def build_from_manifest(manifest_file_path: str)

Instance variables

prop abs_staging : Union[ABSStaging, None]
Expand source code
@property
def abs_staging(self) -> Union[ABSStaging, None]:
    _abs = self._abs
    if _abs:
        return IODefinition.ABSStaging(is_sliced=_abs['is_sliced'],
                                       region=_abs['region'],
                                       container=_abs['container'],
                                       name=_abs['name'],
                                       credentials_sas_connection_string=_abs['credentials'][
                                           'sas_connection_string'],
                                       credentials_expiration=_abs['credentials']['expiration']
                                       )
    else:
        return None
prop name : str
Expand source code
@property
@abstractmethod
def name(self) -> str:
    """
    File name - excluding the KBC ID if present (`str`, read-only)
    """
    raise NotImplementedError

File name - excluding the KBC ID if present (str, read-only)

prop s3_staging : Union[S3Staging, None]
Expand source code
@property
def s3_staging(self) -> Union[S3Staging, None]:
    s3 = self._s3
    if s3:
        return IODefinition.S3Staging(is_sliced=s3['isSliced'],
                                      region=s3['region'],
                                      bucket=s3['bucket'],
                                      key=s3['key'],
                                      credentials_access_key_id=s3['credentials']['access_key_id'],
                                      credentials_secret_access_key=s3['credentials']['secret_access_key'],
                                      credentials_session_token=s3['credentials']['session_token']
                                      )
    else:
        return None
prop stage : str
Expand source code
@property
def stage(self) -> str:
    """
    Helper property marking the stage of the file. (str)
    """
    return self._stage

Helper property marking the stage of the file. (str)

Methods

def get_manifest_dictionary(self,
manifest_type: Optional[str] = None,
legacy_queue: bool = False,
legacy_manifest: Optional[bool] = None) ‑> dict
Expand source code
def get_manifest_dictionary(self, manifest_type: Optional[str] = None, legacy_queue: bool = False,
                            legacy_manifest: Optional[bool] = None) -> dict:
    raise NotImplementedError
class KBCMetadataKeys (*args, **kwds)
Expand source code
class KBCMetadataKeys(Enum):
    base_data_type = 'KBC.datatype.basetype'  # base type of a column as defined in php-datatypes
    source_data_type = 'KBC.datatype.type'  # data type of a column - extracted value from the source
    data_type_nullable = 'KBC.datatype.nullable'
    data_type_length = 'KBC.datatype.length'  # data type length (e.g., VARCHAR(255) - this is the 255
    data_type_default = 'KBC.datatype.default'
    description = 'KBC.description'
    created_by_component = 'KBC.createdBy.component.id'
    last_updated_by_component = 'KBC.lastUpdatedBy.component.id'
    createdBy_configuration_id = 'KBC.createdBy.configuration.id'
    createdBy_branch_id = 'KBC.createdBy.branch.id'  # ID of the branch whose job created the table/bucket
    lastUpdatedBy_configuration_id = 'KBC.lastUpdatedBy.configuration.id'
    lastUpdatedBy_branch_id = 'KBC.lastUpdatedBy.branch.id'  # ID of the branch whose job last touched the bucket/table
    shared_description = 'KBC.sharedDescription'  # description of the bucket;
    # it will be used when the bucket is shared

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access::
>>> Color.RED
<Color.RED: 1>
  • value lookup:
>>> Color(1)
<Color.RED: 1>
  • name lookup:
>>> Color['RED']
<Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Ancestors

  • enum.Enum

Class variables

var base_data_type
var createdBy_branch_id
var createdBy_configuration_id
var created_by_component
var data_type_default
var data_type_length
var data_type_nullable
var description
var lastUpdatedBy_branch_id
var lastUpdatedBy_configuration_id
var last_updated_by_component
var shared_description
var source_data_type
class OauthCredentials (id: str, created: str, data: dict, oauthVersion: str, appKey: str, appSecret: str)
Expand source code
@dataclass
class OauthCredentials(SubscriptableDataclass):
    id: str
    created: str
    data: dict
    oauthVersion: str
    appKey: str
    appSecret: str

OauthCredentials(id: 'str', created: 'str', data: 'dict', oauthVersion: 'str', appKey: 'str', appSecret: 'str')

Ancestors

Class variables

var appKey : str
var appSecret : str
var created : str
var data : dict
var id : str
var oauthVersion : str
class SubscriptableDataclass
Expand source code
@dataclass
class SubscriptableDataclass:
    """
    Helper class to make dataclasses subscriptable
    """

    def __getitem__(self, index):
        return getattr(self, index)

Helper class to make dataclasses subscriptable

Subclasses

class SupportedDataTypes (*args, **kwds)
Expand source code
class SupportedDataTypes(str, Enum):
    """
    Enum of [supported datatypes](https://help.keboola.com/storage/tables/data-types/)
    """
    STRING = 'STRING'
    INTEGER = 'INTEGER'
    NUMERIC = 'NUMERIC'
    FLOAT = 'FLOAT'
    BOOLEAN = 'BOOLEAN'
    DATE = 'DATE'
    TIMESTAMP = 'TIMESTAMP'

    @classmethod
    def list(cls):
        return list(map(lambda c: c.value, cls))

    @classmethod
    def is_valid_type(cls, data_type: str):
        return data_type in cls.list()

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var BOOLEAN
var DATE
var FLOAT
var INTEGER
var NUMERIC
var STRING
var TIMESTAMP

Static methods

def is_valid_type(data_type: str)
def list()
class SupportedManifestAttributes (out_attributes: List[str],
in_attributes: List[str],
out_legacy_exclude: List[str] = <factory>,
in_legacy_exclude: List[str] = <factory>)
Expand source code
@dataclass
class SupportedManifestAttributes(SubscriptableDataclass):
    out_attributes: List[str]
    in_attributes: List[str]
    out_legacy_exclude: List[str] = dataclasses.field(default_factory=lambda: [])
    in_legacy_exclude: List[str] = dataclasses.field(default_factory=lambda: [])

    def get_attributes_by_stage(self, stage: Literal['in', 'out'], legacy_queue: bool = False,
                                legacy_manifest: bool = False) -> List[str]:
        if stage == 'out':
            attributes = self.out_attributes
            exclude = self.out_legacy_exclude

            if not legacy_manifest:
                to_remove = ['primary_key', 'columns', 'distribution_key', 'column_metadata', 'metadata']
                attributes = list(set(attributes).difference(to_remove))

                to_add = ['manifest_type', 'has_header', 'table_metadata', 'schema']
                attributes.extend(to_add)

        elif stage == 'in':
            attributes = self.in_attributes
            exclude = self.in_legacy_exclude

        else:
            raise ValueError(f'Unsupported stage {stage}')

        if legacy_queue:
            logging.warning(f'Running on legacy queue some manifest properties will be ignored: {exclude}')
            attributes = list(set(attributes).difference(exclude))

        return attributes

SupportedManifestAttributes(out_attributes: 'List[str]', in_attributes: 'List[str]', out_legacy_exclude: 'List[str]' = , in_legacy_exclude: 'List[str]' = )

Ancestors

Class variables

var in_attributes : List[str]
var in_legacy_exclude : List[str]
var out_attributes : List[str]
var out_legacy_exclude : List[str]

Methods

def get_attributes_by_stage(self,
stage: "Literal['in', 'out']",
legacy_queue: bool = False,
legacy_manifest: bool = False) ‑> List[str]
Expand source code
def get_attributes_by_stage(self, stage: Literal['in', 'out'], legacy_queue: bool = False,
                            legacy_manifest: bool = False) -> List[str]:
    if stage == 'out':
        attributes = self.out_attributes
        exclude = self.out_legacy_exclude

        if not legacy_manifest:
            to_remove = ['primary_key', 'columns', 'distribution_key', 'column_metadata', 'metadata']
            attributes = list(set(attributes).difference(to_remove))

            to_add = ['manifest_type', 'has_header', 'table_metadata', 'schema']
            attributes.extend(to_add)

    elif stage == 'in':
        attributes = self.in_attributes
        exclude = self.in_legacy_exclude

    else:
        raise ValueError(f'Unsupported stage {stage}')

    if legacy_queue:
        logging.warning(f'Running on legacy queue some manifest properties will be ignored: {exclude}')
        attributes = list(set(attributes).difference(exclude))

    return attributes
class TableColumnTypes (source: str,
type: str,
destination: str,
length: int,
nullable: bool,
convert_empty_values_to_null: bool)
Expand source code
@dataclass
class TableColumnTypes(SubscriptableDataclass):
    """
    Abstraction of [column types](https://developers.keboola.com/extend/common-interface/config-file/#input-mapping
    --column-types) in the config file.
    Applicable only for workspace.
    """

    source: str
    type: str
    destination: str
    length: int
    nullable: bool
    convert_empty_values_to_null: bool

Abstraction of column types in the config file. Applicable only for workspace.

Ancestors

Class variables

var convert_empty_values_to_null : bool
var destination : str
var length : int
var nullable : bool
var source : str
var type : str
class TableDefinition (name: str,
full_path: Optional[Union[str, None]] = None,
is_sliced: Optional[bool] = False,
destination: Optional[str] = '',
primary_key: Optional[List[str]] = None,
schema: SCHEMA_TYPE = None,
incremental: Optional[bool] = None,
table_metadata: Optional[TableMetadata] = None,
enclosure: Optional[str] = '"',
delimiter: Optional[str] = ',',
delete_where: Optional[dict] = None,
stage: Optional[str] = 'out',
write_always: Optional[bool] = False,
has_header: Optional[bool] = None,
description: Optional[str] = None,
**kwargs)
Expand source code
class TableDefinition(IODefinition):
    """
    Table definition class. It is used as a container for `in/tables/` files.
    It is a representation of input/output manifest objects with additional attributes containing information
    about related file full path and whether it is a sliced table.

    Also, it is useful when collecting results and building export configs.

    To create the TableDefinition directly from the manifest there is a factory build method:

    ```python
    from keboola.component import CommonInterface
    from keboola.component import dao

    table_def = dao.TableDefinition.build_from_manifest(manifest_dict,
                                            'table name',
                                            full_path='optional full path',
                                            is_sliced=False)


    ```


    Attributes:
        name: Table / file name.
        full_path (str): (optional) Full path of the file. May be empty in case it represents only orphaned manifest.
            May also be a folder path - in this case it is a [sliced tables](
            https://developers.keboola.com/extend/common-interface/folders/#sliced-tables) folder.
            The full_path is None when dealing with [workspaces](
            https://developers.keboola.com/extend/common-interface/folders/#exchanging-data-via-workspace)
        is_sliced: True if the full_path points to a folder with sliced tables
        has_header: True if the file has a header
        destination: String name of the table in Storage.
        primary_key: List with names of columns used for primary key.
        columns: List of columns for headless CSV files
        incremental: Set to true to enable incremental loading
        table_metadata: <.dao.TableMetadata> object containing column and table metadata
        delete_where: Dict with settings for deleting rows
    """

    INPUT_MANIFEST_ATTRIBUTES = [
        "id",
        "uri",
        "name",
        "primary_key",
        "created",
        "last_change_date",
        "last_import_date",
        "columns",
        "metadata",
        "column_metadata",
        "rows_count",
        "data_size_bytes",
        "is_alias",
        "attributes",
        "indexed_columns"
    ]

    OUTPUT_MANIFEST_ATTRIBUTES = [
        "destination",
        "columns",
        "incremental",
        "primary_key",
        "write_always",
        "delimiter",
        "enclosure",
        "metadata",
        "column_metadata",
        "delete_where_column",
        "delete_where_values",
        "delete_where_operator",
    ]

    OUTPUT_MANIFEST_LEGACY_EXCLUDES = [
        "write_always"
    ]

    MANIFEST_ATTRIBUTES = {'in': INPUT_MANIFEST_ATTRIBUTES,
                           'out': OUTPUT_MANIFEST_ATTRIBUTES}
    SCHEMA_TYPE = Union[Dict[str, ColumnDefinition], TypeOrderedDict[str, ColumnDefinition], List[str]]

    def __init__(self, name: str,
                 full_path: Optional[Union[str, None]] = None,
                 is_sliced: Optional[bool] = False,
                 destination: Optional[str] = '',
                 primary_key: Optional[List[str]] = None,
                 schema: SCHEMA_TYPE = None,
                 incremental: Optional[bool] = None,
                 table_metadata: Optional[TableMetadata] = None,
                 enclosure: Optional[str] = '"',
                 delimiter: Optional[str] = ',',
                 delete_where: Optional[dict] = None,
                 stage: Optional[str] = 'out',
                 write_always: Optional[bool] = False,
                 has_header: Optional[bool] = None,
                 description: Optional[str] = None,
                 # input
                 **kwargs
                 ):
        """

        Args:
            name: Table / file name.
            full_path (str):
                (optional) Full path of the file. May be empty in case it represents only orphaned
                manifest.
                May also be a folder path - in this case it is a [sliced tables](
                https://developers.keboola.com/extend/common-interface/folders/#sliced-tables) folder.
                The full_path is None when dealing with [workspaces](
                https://developers.keboola.com/extend/common-interface/folders/#exchanging-data-via-workspace)
            is_sliced: True if the full_path points to a folder with sliced tables
            has_header: True if the file has a header, if emtpy inferred.
            destination: String name of the table in Storage.
            primary_key: List with names of columns used for primary key.
            incremental: Set to true to enable incremental loading
            table_metadata: <.dao.TableMetadata> object containing column and table metadata (deprecated)
            enclosure: str: CSV enclosure, by default "
            delimiter: str: CSV delimiter, by default ,
            delete_where (dict): Dict with settings for deleting rows
            stage: str: Storage Stage 'in' or 'out'
            write_always: Bool: If true, the table will be saved to Storage even when the job execution
                           fails.
            schema: (dict|lis[str]) Mapping of column names andColumnDefinition objects, or a list of names
            description: str: Table description

        """
        super().__init__(full_path)
        self._name = name
        self.is_sliced = is_sliced

        # initialize manifest properties
        self._destination = None
        self.destination = destination
        self._schema: Dict[str, ColumnDefinition] = dict()

        if schema:
            self.schema = schema
        # deprecated argument for backward compatibility
        self._legacy_mode = False
        if kwargs.get('force_legacy_mode'):
            self._legacy_mode = True
        if kwargs.get('columns'):
            self.columns = kwargs['columns']

        self._legacy_primary_key = list()
        self.primary_key = primary_key
        self._incremental = incremental

        self.enclosure = enclosure
        self.delimiter = delimiter

        if not table_metadata:
            table_metadata = TableMetadata()
        self.table_metadata = table_metadata
        if description:
            self.table_metadata.add_table_description(description)

        self.delete_where_values = None
        self.delete_where_column = None
        self.delete_where_operator = None

        if kwargs.get('delete_where_values'):
            self.delete_where_values = kwargs['delete_where_values']

        if kwargs.get('delete_where_column'):
            self.delete_where_column = kwargs['delete_where_column']

        if kwargs.get('delete_where_operator'):
            self.delete_where_operator = kwargs['delete_where_operator']

        self.set_delete_where_from_dict(delete_where)
        self.write_always = write_always

        # input manifest properties
        self._id = kwargs.get('id')
        self._uri = kwargs.get('uri')
        self._created = kwargs.get('created')
        self._last_change_date = kwargs.get('last_change_date')
        self._last_import_date = kwargs.get('last_import_date')
        self._rows_count = kwargs.get('rows_count')
        self._data_size_bytes = kwargs.get('data_size_bytes')
        self._is_alias = kwargs.get('is_alias')
        self._indexed_columns = kwargs.get('indexed_columns')
        self._attributes = kwargs.get('attributes')

        self.stage = stage
        self.has_header = has_header or self._has_header_in_file()

    def __get_stage_inferred(self):
        if self._uri:
            return 'in'
        return 'out'

    @classmethod
    def build_output_definition(cls, name: str,
                                destination: Optional[str] = '',
                                columns: Optional[List[str]] = None,
                                primary_key: Optional[List[str]] = None,
                                incremental: Optional[bool] = False,
                                table_metadata: Optional[TableMetadata] = None,
                                enclosure: Optional[str] = '"',
                                delimiter: Optional[str] = ',',
                                delete_where: Optional[dict] = None,
                                write_always: Optional[bool] = False,
                                schema: SCHEMA_TYPE = None,
                                description: Optional[str] = None,
                                **kwargs
                                ):
        """
        Factory method for creating a TableDefinition instance for output tables.

        This method initializes a TableDefinition object with properties specific to output tables,
        including metadata and schema definitions.

        Args:
            name (str): The name of the table.
            destination (Optional[str]): The destination table name in the storage. Defaults to an empty string.
            columns (Optional[List[str]]): A list of column names for the table. Defaults to None.
            primary_key (Optional[List[str]]): A list of column names that form the primary key. Defaults to None.
            incremental (Optional[bool]): Indicates if the loading should be incremental. Defaults to False.
            table_metadata (Optional[TableMetadata]): An object containing table and column metadata. Defaults to None.
            enclosure (Optional[str]): The character used as a text qualifier in the CSV file. Defaults to '"'.
            delimiter (Optional[str]): The character used to separate columns in the CSV file. Defaults to ','.
            delete_where (Optional[dict]): Criteria for row deletion in incremental loads. Defaults to None.
            write_always (Optional[bool]): If True, the table will be saved to storage even if the job fails.
            schema (Optional[List[ColumnDefinition]]): Dictionary of ColumnDefinition objects.
            description (Optional[str]): The description of the table. Defaults to None.

        Returns:
            TableDefinition: An instance of TableDefinition configured for output tables.
        """
        return cls(name=name,
                   destination=destination,
                   columns=columns,
                   primary_key=primary_key,
                   incremental=incremental,
                   table_metadata=table_metadata,
                   enclosure=enclosure,
                   delimiter=delimiter,
                   delete_where=delete_where,
                   write_always=write_always,
                   schema=schema,
                   description=description,
                   **kwargs
                   )

    @classmethod
    def build_input_definition(cls, name: str,
                               full_path: Optional[Union[str, None]] = None,
                               is_sliced: Optional[bool] = False,

                               destination: Optional[str] = '',
                               primary_key: Optional[List[str]] = None,
                               columns: Optional[List[str]] = None,
                               incremental: Optional[bool] = None,
                               table_metadata: Optional[TableMetadata] = None,
                               enclosure: Optional[str] = '"',
                               delimiter: Optional[str] = ',',
                               delete_where: Optional[dict] = None,
                               stage: Optional[str] = 'in',
                               write_always: Optional[bool] = False,
                               schema: Optional[Union[TypeOrderedDict[str, ColumnDefinition], list[str]]] = None,
                               rows_count: Optional[int] = None,
                               data_size_bytes: Optional[int] = None,
                               is_alias: Optional[bool] = False,

                               # input
                               uri: Optional[str] = None,
                               id: Optional[str] = '',
                               created: Optional[str] = None,
                               last_change_date: Optional[str] = None,
                               last_import_date: Optional[str] = None,
                               **kwargs
                               ):
        """
        Factory method for creating a TableDefinition instance for input tables.

        This method initializes a TableDefinition object with properties specific to input tables,
        including metadata and schema definitions.

        Args:
            name (str): The name of the table.
            full_path (Optional[Union[str, None]]): The full path to the table file or folder (for sliced tables).
            is_sliced (Optional[bool]): Indicates if the table is sliced (stored in multiple files).
            destination (Optional[str]): The destination table name in the storage. Defaults to an empty string.
            primary_key (Optional[List[str]]): A list of column names that form the primary key. Defaults to None.
            columns (Optional[List[str]]): A list of column names for the table. Defaults to None.
            incremental (Optional[bool]): Indicates if the loading should be incremental. Defaults to None.
            table_metadata (Optional[TableMetadata]): An object containing table and column metadata. Defaults to None.
            enclosure (Optional[str]): The character used as a text qualifier in the CSV file. Defaults to '"'.
            delimiter (Optional[str]): The character used to separate columns in the CSV file. Defaults to ','.
            delete_where (Optional[dict]): Criteria for row deletion in incremental loads. Defaults to None.
            stage (Optional[str]): Indicates the stage ('in' for input tables). Defaults to 'in'.
            write_always (Optional[bool]): If True, the table will be saved to storage even if the job fails. Defaults to False.  # noqa
            schema (Optional[List[ColumnDefinition]]): A list of ColumnDefinition objects defining the table schema. Defaults to None.  # noqa
            rows_count (Optional[int]): The number of rows in the table. Defaults to None.
            data_size_bytes (Optional[int]): The size of the table data in bytes. Defaults to None.
            is_alias (Optional[bool]): Indicates if the table is an alias. Defaults to False.
            uri (Optional[str]): The URI of the table. Defaults to None.
            id (Optional[str]): The ID of the table. Defaults to an empty string.
            created (Optional[str]): The creation timestamp of the table. Defaults to None.
            last_change_date (Optional[str]): The last modification timestamp of the table. Defaults to None.
            last_import_date (Optional[str]): The last import timestamp of the table. Defaults to None.

        Returns:
            TableDefinition: An instance of TableDefinition configured for input tables.
        """
        return cls(name=name,
                   full_path=full_path,
                   is_sliced=is_sliced,
                   destination=destination,
                   primary_key=primary_key,
                   columns=columns,
                   incremental=incremental,
                   table_metadata=table_metadata,
                   enclosure=enclosure,
                   delimiter=delimiter,
                   delete_where=delete_where,
                   stage=stage,
                   write_always=write_always,
                   schema=schema,
                   rows_count=rows_count,
                   data_size_bytes=data_size_bytes,
                   is_alias=is_alias,
                   uri=uri,
                   id=id,
                   created=created,
                   last_change_date=last_change_date,
                   last_import_date=last_import_date,
                   **kwargs
                   )

    @classmethod
    def convert_to_column_definition(cls, column_name, column_metadata, primary_key=False):
        data_type = {'base': DataType(dtype='STRING')}
        nullable = True
        for item in column_metadata:
            if item['key'] == 'KBC.datatype.basetype':
                data_type = {'base': DataType(dtype=item['value'])}
            elif item['key'] == 'KBC.datatype.nullable':
                nullable = item['value']
        return ColumnDefinition(data_types=data_type, nullable=nullable, primary_key=primary_key)

    @classmethod
    def return_schema_from_manifest(cls, json_data):
        if TableDefinition.is_new_manifest(json_data):
            schema = OrderedDict()
            for col in json_data.get('schema'):
                schema[col.get("name")] = ColumnDefinition().from_dict(col)

        else:
            # legacy support
            columns_metadata = json_data.get('column_metadata', {})
            primary_key = json_data.get('primary_key', [])
            columns = json_data.get('columns', [])

            all_columns = columns
            schema = OrderedDict()

            for col in all_columns:
                pk = col in primary_key
                if col in columns_metadata:
                    schema[col] = cls.convert_to_column_definition(col, columns_metadata[col], primary_key=pk)
                else:
                    schema[col] = ColumnDefinition(data_types={"base": DataType(dtype="STRING")}, primary_key=pk)

        return schema

    @classmethod
    def is_new_manifest(cls, json_data):
        return json_data.get('schema')

    @classmethod
    def build_from_manifest(cls,
                            manifest_file_path: str
                            ):
        """
        Factory method for TableDefinition from the raw "manifest" path.

        The TableDefinition then validates presence of the manifest counterpart.
        E.g. table.csv if `table.csv.manifest` is provided.

        The manifest file does not need to exist, in such case a ValueError is raised
        if the counterpart table is not found.

        The counterpart table file does not need to exist, in such case, the manifest represents an orphaned manifest.

        Args:
            manifest_file_path (str):
                (optional) Full path of the manifest file. May be empty in case it represents only expected
                 table with no input manifest.


        """
        is_sliced = False
        full_path = None
        manifest = dict()
        if Path(manifest_file_path).exists():
            with open(manifest_file_path) as in_file:
                manifest = json.load(in_file)

        file_path = Path(manifest_file_path.replace('.manifest', ''))

        if file_path.is_dir() and manifest:
            is_sliced = True
        elif file_path.is_dir() and not manifest:
            # skip folders that do not have matching manifest
            raise ValueError(f'The manifest {manifest_file_path} does not exist '
                             f'and it'f's matching file {file_path} is folder!')
        elif not file_path.exists() and not manifest:
            raise ValueError(f'Nor the manifest file or the corresponding file {file_path} exist!')

        if file_path.exists():
            full_path = str(file_path)
            name = file_path.name
        else:
            name = Path(manifest_file_path).stem

        if manifest.get('name'):
            name = manifest.get('name')

        # test if the manifest is output and incompatible
        force_legacy_mode = False
        if not manifest.get('columns') and manifest.get('primary_key'):
            warnings.warn('Primary key is set but columns are not. Forcing legacy mode for CSV file.',
                          DeprecationWarning)
            force_legacy_mode = True

        if manifest.get('id'):
            stage = 'in'
            table_def = cls.build_input_definition(
                # helper parameters
                stage=stage,
                force_legacy_mode=force_legacy_mode,
                is_sliced=is_sliced,
                full_path=full_path,

                # basic in manifest parameters
                id=manifest.get('id'),
                uri=manifest.get('uri'),
                name=name,
                primary_key=manifest.get('primary_key'),
                created=manifest.get('created'),
                last_change_date=manifest.get('last_change_date'),
                last_import_date=manifest.get('last_import_date'),
                schema=cls.return_schema_from_manifest(manifest),
                table_metadata=TableMetadata(manifest),

                # additional in manifest parameters
                rows_count=manifest.get('rows_count'),
                data_size_bytes=manifest.get('data_size_bytes'),
                is_alias=manifest.get('is_alias'),
                attributes=manifest.get('attributes'),
                indexed_columns=manifest.get('indexed_columns'),
            )

        else:
            stage = 'out'
            table_def = cls.build_output_definition(
                # helper parameters
                stage=stage,
                force_legacy_mode=force_legacy_mode,
                is_sliced=is_sliced,
                full_path=full_path,

                # basic out manifest parameters
                name=name,
                destination=manifest.get('destination'),
                schema=cls.return_schema_from_manifest(manifest),
                incremental=manifest.get('incremental', False),
                primary_key=manifest.get('primary_key'),
                write_always=manifest.get('write_always', False),
                delimiter=manifest.get('delimiter', ','),
                enclosure=manifest.get('enclosure', '"'),
                table_metadata=TableMetadata(manifest),

                # additional in manifest parameters
                delete_where_values=manifest.get('delete_where_values'),
                delete_where_column=manifest.get('delete_where_column'),
                delete_where_operator=manifest.get('delete_where_operator')
            )

        return table_def

    def get_manifest_dictionary(self, manifest_type: Optional[str] = None, legacy_queue: bool = False,
                                legacy_manifest: Optional[bool] = None) -> dict:
        """
        Returns manifest dictionary in appropriate manifest_type: either 'in' or 'out'.
        By default, returns output manifest.
             The result keeps only values that are applicable for
             the selected type of the Manifest file. Because although input and output manifests share most of
             the attributes, some are not shared.

             See [manifest files](https://developers.keboola.com/extend/common-interface/manifest-files)
             for more information.

        Args:
            manifest_type (str): either 'in' or 'out'.
             See [manifest files](https://developers.keboola.com/extend/common-interface/manifest-files)
             for more information.
            legacy_queue (bool): optional flag marking project on legacy queue.(some options are not allowed on queue2)
            legacy_manifest (bool): If True, creates a legacy manifest; otherwise, uses the new format if permitted.

        Returns:
            dict representation of the manifest file in a format expected / produced by the Keboola Connection

        """

        if not manifest_type:
            manifest_type = self.stage

        if self._legacy_mode:
            legacy_manifest = True
        dictionary = self._filter_attributes_by_manifest_type(manifest_type, legacy_queue, legacy_manifest)

        filtered_dictionary = self._filter_dictionary(dictionary)

        return filtered_dictionary

    def _filter_dictionary(self, data):
        if isinstance(data, dict):
            return {
                k: self._filter_dictionary(v)
                for k, v in data.items()
                if v not in (None, [], {}, "")
            }
        elif isinstance(data, list):
            return [self._filter_dictionary(item) for item in data if item not in (None, [], {}, "")]
        else:
            return data

    # Usage

    def _filter_attributes_by_manifest_type(self, manifest_type: Literal["in", "out"], legacy_queue: bool = False,
                                            legacy_manifest: bool = False):
        """
        Filter manifest to contain only supported fields
        Args:
            manifest_type:

        Returns:

        """

        supported_fields = self._manifest_attributes.get_attributes_by_stage(manifest_type, legacy_queue,
                                                                             legacy_manifest)
        fields = {
            'id': self.id,
            'uri': self._uri,
            'name': self.name,
            'created': self._created,
            'last_change_date': self._last_change_date,
            'last_import_date': self._last_import_date,
            'rows_count': self._rows_count,
            'data_size_bytes': self._data_size_bytes,
            'is_alias': self._is_alias,
            'indexed_columns': self._indexed_columns,
            'attributes': self._attributes,

            'destination': self.destination,
            'incremental': self.incremental,
            'primary_key': self.primary_key,
            'write_always': self.write_always,
            'delimiter': self.delimiter,
            'enclosure': self.enclosure,
            'metadata': self.table_metadata.get_table_metadata_for_manifest(legacy_manifest=True),
            'column_metadata': self.table_metadata._get_legacy_column_metadata_for_manifest(),
            'manifest_type': manifest_type,
            'has_header': self.has_header,
            'table_metadata': self.table_metadata.get_table_metadata_for_manifest(),
            'delete_where_column': self.delete_where_column,
            'delete_where_values': self.delete_where_values,
            'delete_where_operator': self.delete_where_operator,
            'schema': [col.to_dict(name)
                       for name, col in self.schema.items()] if isinstance(self.schema, (OrderedDict, dict)) else []
        }

        if (legacy_manifest and not self.has_header) or self.stage == 'in':
            fields['columns'] = self.column_names

        new_dict = fields.copy()

        if supported_fields:
            for attr in fields:
                if attr not in supported_fields:
                    new_dict.pop(attr, None)
        return new_dict

    def _has_header_in_file(self):
        if self.is_sliced:
            has_header = False
        elif self.column_names and not self.stage == 'in':
            has_header = False
        else:
            has_header = True
        return has_header

    @property
    def schema(self) -> TypeOrderedDict[str, ColumnDefinition]:
        return self._schema

    @schema.setter
    def schema(self, value: Union[TypeOrderedDict[str, ColumnDefinition], list[str]]):
        if value:
            if not isinstance(value, (list, dict, OrderedDict)):
                raise TypeError("Columns must be a list or a mapping of column names and ColumnDefinition objects")

            if isinstance(value, list):
                self._schema = OrderedDict()
                for col in value:
                    self._schema[col] = ColumnDefinition()
            else:
                self._schema = value

    @property
    def _manifest_attributes(self) -> SupportedManifestAttributes:
        return SupportedManifestAttributes(self.MANIFEST_ATTRIBUTES['out'], self.MANIFEST_ATTRIBUTES['in'],
                                           self.OUTPUT_MANIFEST_LEGACY_EXCLUDES)

    # #### Manifest properties
    @property
    def destination(self) -> str:
        return self._destination

    @destination.setter
    def destination(self, val: str):
        if val:
            if isinstance(val, str):
                self._destination = val
            else:
                raise TypeError("Destination must be a string")

    @property
    def id(self) -> str:
        """
        str: id property used in input manifest. Contains Keboola Storage ID, e.g. in.c-bucket.table

        """
        return self._id

    @property
    def name(self) -> str:
        """
        File name - excluding the KBC ID if present (`str`, read-only)
        """
        return self._name

    @property
    def rows_count(self) -> int:
        """
                int: rows_count property used in input manifest.

        """
        return self._rows_count

    @property
    def data_size_bytes(self) -> int:
        """
                int: data_size_bytes property used in input manifest.

        """
        return self._data_size_bytes

    @property
    @deprecated(version='1.5.1', reason="Please use new column_names method instead of columns property")
    def columns(self) -> List[str]:
        if isinstance(self.schema, (OrderedDict, dict)):
            return list(self.schema.keys())
        else:
            return []

    @columns.setter
    @deprecated(version='1.5.1', reason="Please use new column_names method instead of schema property")
    def columns(self, val: List[str]):
        """
        Set columns for the table.
        If list of names provided, the columns will be created with default settings Basetype.String.
        Args:
            val:

        Returns:

        """
        if not isinstance(val, list):
            raise TypeError("Columns must be a list")

        self.schema = val

    @property
    def column_names(self) -> List[str]:
        if self.schema:
            return list(self.schema.keys())
        else:
            return []

    @property
    def incremental(self) -> bool:
        return self._incremental

    @incremental.setter
    def incremental(self, incremental: bool):
        if incremental is not None:
            self._incremental = incremental

    @property
    def write_always(self) -> bool:
        return self._write_always

    @write_always.setter
    def write_always(self, write_always: bool):
        self._write_always = write_always

    @property
    def primary_key(self) -> List[str]:
        if not self._legacy_mode:
            return [column_name for column_name, column_def in self.schema.items() if column_def.primary_key]
        else:
            return self._legacy_primary_key

    @primary_key.setter
    def primary_key(self, primary_key: List[str]):
        if not primary_key:
            return

        if not isinstance(primary_key, list):
            raise TypeError("Primary key must be a list")
        if not self._legacy_mode:
            for col in primary_key:
                if col in self.schema:
                    self.schema[col].primary_key = True
                else:
                    raise UserException(f"Primary key column {col} not found in schema. "
                                        f"Please specify all columns / schema")
        else:
            self._legacy_primary_key = primary_key

    @property
    def delimiter(self) -> str:
        return self._delimiter

    @delimiter.setter
    def delimiter(self, delimiter: str):
        self._delimiter = delimiter

    @property
    def enclosure(self) -> str:
        return self._enclosure

    @enclosure.setter
    def enclosure(self, enclosure: str):
        self._enclosure = enclosure

    @property
    def table_metadata(self) -> TableMetadata:
        return self._table_metadata

    @table_metadata.setter
    def table_metadata(self, table_metadata: TableMetadata):
        self._table_metadata = table_metadata
        # backward compatibility legacy support
        for col, val in table_metadata._get_legacy_column_metadata_for_manifest().items():
            if not self.schema.get(col):
                self.schema[col] = ColumnDefinition()
            self.schema[col].metadata = {item['key']: item['value'] for item in val}

    @property
    def created(self) -> Union[datetime, None]:  # Created timestamp  in the KBC Storage (read only input attribute)
        if self._created:
            return datetime.strptime(self._created, KBC_DEFAULT_TIME_FORMAT)
        else:
            return None

    @property
    def uri(self) -> str:
        return self._uri

    @property
    def last_change_date(self) -> str:
        return self._last_change_date

    @property
    def last_import_date(self) -> str:
        return self._last_import_date

    @property
    def is_alias(self) -> bool:
        return self._is_alias

    def add_column(self, name: str, definition: ColumnDefinition = ColumnDefinition()):
        """
        Add column definition, accepts either ColumnDefinition or a string
        (in which case the base type STRING will be used).
        """

        if name in self._schema:
            raise ValueError(f"Column with name '{name}' already exists")

        self._schema[name] = definition

    def update_column(self, name: str, column_definition: ColumnDefinition):
        if not isinstance(column_definition, ColumnDefinition):
            raise ValueError("New column must be an instance of ColumnDefinition")

        if name in self.schema:
            self.schema[name] = column_definition
        else:
            raise ValueError(f'Column with name: "{name}" not found')

    def delete_column(self, column_name: str):

        if column_name not in self.schema:
            raise ValueError(f"Column with name {column_name} not found")
        del self.schema[column_name]

    def add_columns(self, columns: Union[List[str], Dict[str, ColumnDefinition]]):
        if isinstance(columns, list):
            for name in columns:
                self.add_column(name)
        else:
            for name, column in columns.items():
                self.add_column(name, column)

    def update_columns(self, columns: Dict[str, ColumnDefinition]):
        for name, column in columns:
            self.update_column(name, column)

    def delete_columns(self, column_names: List[str]):
        for name in column_names:
            self.delete_column(name)

    def set_delete_where_from_dict(self, delete_where):
        """
        Process metadata as dictionary and returns modified manifest

        Args:
            delete_where: Dictionary of where condition specification

        Returns:
            Manifest dict
        """
        if delete_where:
            if 'column' in delete_where and 'values' in delete_where:
                if not isinstance(delete_where['column'], str):
                    raise TypeError("Delete column must be a string")
                if not isinstance(delete_where['values'], list):
                    raise TypeError("Delete values must be a list")
                op = delete_where['operator'] or 'eq'
                if (not op == 'eq') and (not op == 'ne'):
                    raise ValueError("Delete operator must be 'eq' or 'ne'")
                self.delete_where_values = delete_where['values']
                self.delete_where_column = delete_where['column']
                self.delete_where_operator = op
            else:
                raise ValueError("Delete where specification must contain "
                                 "keys 'column' and 'values'")

Table definition class. It is used as a container for in/tables/ files. It is a representation of input/output manifest objects with additional attributes containing information about related file full path and whether it is a sliced table.

Also, it is useful when collecting results and building export configs.

To create the TableDefinition directly from the manifest there is a factory build method:

from keboola.component import CommonInterface
from keboola.component import dao

table_def = dao.TableDefinition.build_from_manifest(manifest_dict,
                                        'table name',
                                        full_path='optional full path',
                                        is_sliced=False)


Attributes

name
Table / file name.
full_path : str
(optional) Full path of the file. May be empty in case it represents only orphaned manifest. May also be a folder path - in this case it is a sliced tables folder. The full_path is None when dealing with workspaces
is_sliced
True if the full_path points to a folder with sliced tables
has_header
True if the file has a header
destination
String name of the table in Storage.
primary_key
List with names of columns used for primary key.
columns
List of columns for headless CSV files
incremental
Set to true to enable incremental loading
table_metadata
<.dao.TableMetadata> object containing column and table metadata
delete_where
Dict with settings for deleting rows

Args

name
Table / file name.
full_path (str):
(optional) Full path of the file. May be empty in case it represents only orphaned
manifest.
May also be a folder path - in this case it is a [sliced tables](
https://developers.keboola.com/extend/common-interface/folders/#sliced-tables) folder.
The full_path is None when dealing with [workspaces](
https://developers.keboola.com/extend/common-interface/folders/#exchanging-data-via-workspace)
is_sliced
True if the full_path points to a folder with sliced tables
has_header
True if the file has a header, if emtpy inferred.
destination
String name of the table in Storage.
primary_key
List with names of columns used for primary key.
incremental
Set to true to enable incremental loading
table_metadata
<.dao.TableMetadata> object containing column and table metadata (deprecated)
enclosure
str: CSV enclosure, by default "
delimiter
str: CSV delimiter, by default ,
delete_where : dict
Dict with settings for deleting rows
stage
str: Storage Stage 'in' or 'out'
write_always
Bool: If true, the table will be saved to Storage even when the job execution fails.
schema
(dict|lis[str]) Mapping of column names andColumnDefinition objects, or a list of names
description
str: Table description

Ancestors

Class variables

var INPUT_MANIFEST_ATTRIBUTES
var MANIFEST_ATTRIBUTES
var OUTPUT_MANIFEST_ATTRIBUTES
var OUTPUT_MANIFEST_LEGACY_EXCLUDES
var SCHEMA_TYPE

Static methods

def build_from_manifest(manifest_file_path: str)

Factory method for TableDefinition from the raw "manifest" path.

The TableDefinition then validates presence of the manifest counterpart. E.g. table.csv if table.csv.manifest is provided.

The manifest file does not need to exist, in such case a ValueError is raised if the counterpart table is not found.

The counterpart table file does not need to exist, in such case, the manifest represents an orphaned manifest.

Args

manifest_file_path (str): (optional) Full path of the manifest file. May be empty in case it represents only expected table with no input manifest.

def build_input_definition(name: str,
full_path: Optional[Union[str, None]] = None,
is_sliced: Optional[bool] = False,
destination: Optional[str] = '',
primary_key: Optional[List[str]] = None,
columns: Optional[List[str]] = None,
incremental: Optional[bool] = None,
table_metadata: Optional[TableMetadata] = None,
enclosure: Optional[str] = '"',
delimiter: Optional[str] = ',',
delete_where: Optional[dict] = None,
stage: Optional[str] = 'in',
write_always: Optional[bool] = False,
schema: Optional[Union[TypeOrderedDict[str, ColumnDefinition], list[str]]] = None,
rows_count: Optional[int] = None,
data_size_bytes: Optional[int] = None,
is_alias: Optional[bool] = False,
uri: Optional[str] = None,
id: Optional[str] = '',
created: Optional[str] = None,
last_change_date: Optional[str] = None,
last_import_date: Optional[str] = None,
**kwargs)

Factory method for creating a TableDefinition instance for input tables.

This method initializes a TableDefinition object with properties specific to input tables, including metadata and schema definitions.

Args

name : str
The name of the table.
full_path : Optional[Union[str, None]]
The full path to the table file or folder (for sliced tables).
is_sliced : Optional[bool]
Indicates if the table is sliced (stored in multiple files).
destination : Optional[str]
The destination table name in the storage. Defaults to an empty string.
primary_key : Optional[List[str]]
A list of column names that form the primary key. Defaults to None.
columns : Optional[List[str]]
A list of column names for the table. Defaults to None.
incremental : Optional[bool]
Indicates if the loading should be incremental. Defaults to None.
table_metadata : Optional[TableMetadata]
An object containing table and column metadata. Defaults to None.
enclosure : Optional[str]
The character used as a text qualifier in the CSV file. Defaults to '"'.
delimiter : Optional[str]
The character used to separate columns in the CSV file. Defaults to ','.
delete_where : Optional[dict]
Criteria for row deletion in incremental loads. Defaults to None.
stage : Optional[str]
Indicates the stage ('in' for input tables). Defaults to 'in'.
write_always : Optional[bool]
If True, the table will be saved to storage even if the job fails. Defaults to False. # noqa
schema : Optional[List[ColumnDefinition]]
A list of ColumnDefinition objects defining the table schema. Defaults to None. # noqa
rows_count : Optional[int]
The number of rows in the table. Defaults to None.
data_size_bytes : Optional[int]
The size of the table data in bytes. Defaults to None.
is_alias : Optional[bool]
Indicates if the table is an alias. Defaults to False.
uri : Optional[str]
The URI of the table. Defaults to None.
id : Optional[str]
The ID of the table. Defaults to an empty string.
created : Optional[str]
The creation timestamp of the table. Defaults to None.
last_change_date : Optional[str]
The last modification timestamp of the table. Defaults to None.
last_import_date : Optional[str]
The last import timestamp of the table. Defaults to None.

Returns

TableDefinition
An instance of TableDefinition configured for input tables.
def build_output_definition(name: str,
destination: Optional[str] = '',
columns: Optional[List[str]] = None,
primary_key: Optional[List[str]] = None,
incremental: Optional[bool] = False,
table_metadata: Optional[TableMetadata] = None,
enclosure: Optional[str] = '"',
delimiter: Optional[str] = ',',
delete_where: Optional[dict] = None,
write_always: Optional[bool] = False,
schema: SCHEMA_TYPE = None,
description: Optional[str] = None,
**kwargs)

Factory method for creating a TableDefinition instance for output tables.

This method initializes a TableDefinition object with properties specific to output tables, including metadata and schema definitions.

Args

name : str
The name of the table.
destination : Optional[str]
The destination table name in the storage. Defaults to an empty string.
columns : Optional[List[str]]
A list of column names for the table. Defaults to None.
primary_key : Optional[List[str]]
A list of column names that form the primary key. Defaults to None.
incremental : Optional[bool]
Indicates if the loading should be incremental. Defaults to False.
table_metadata : Optional[TableMetadata]
An object containing table and column metadata. Defaults to None.
enclosure : Optional[str]
The character used as a text qualifier in the CSV file. Defaults to '"'.
delimiter : Optional[str]
The character used to separate columns in the CSV file. Defaults to ','.
delete_where : Optional[dict]
Criteria for row deletion in incremental loads. Defaults to None.
write_always : Optional[bool]
If True, the table will be saved to storage even if the job fails.
schema : Optional[List[ColumnDefinition]]
Dictionary of ColumnDefinition objects.
description : Optional[str]
The description of the table. Defaults to None.

Returns

TableDefinition
An instance of TableDefinition configured for output tables.
def convert_to_column_definition(column_name, column_metadata, primary_key=False)
def is_new_manifest(json_data)
def return_schema_from_manifest(json_data)

Instance variables

prop column_names : List[str]
Expand source code
@property
def column_names(self) -> List[str]:
    if self.schema:
        return list(self.schema.keys())
    else:
        return []
prop columns : List[str]
Expand source code
@property
@deprecated(version='1.5.1', reason="Please use new column_names method instead of columns property")
def columns(self) -> List[str]:
    if isinstance(self.schema, (OrderedDict, dict)):
        return list(self.schema.keys())
    else:
        return []
prop created : Union[datetime, None]
Expand source code
@property
def created(self) -> Union[datetime, None]:  # Created timestamp  in the KBC Storage (read only input attribute)
    if self._created:
        return datetime.strptime(self._created, KBC_DEFAULT_TIME_FORMAT)
    else:
        return None
prop data_size_bytes : int
Expand source code
@property
def data_size_bytes(self) -> int:
    """
            int: data_size_bytes property used in input manifest.

    """
    return self._data_size_bytes

int: data_size_bytes property used in input manifest.

prop delimiter : str
Expand source code
@property
def delimiter(self) -> str:
    return self._delimiter
prop destination : str
Expand source code
@property
def destination(self) -> str:
    return self._destination
prop enclosure : str
Expand source code
@property
def enclosure(self) -> str:
    return self._enclosure
prop id : str
Expand source code
@property
def id(self) -> str:
    """
    str: id property used in input manifest. Contains Keboola Storage ID, e.g. in.c-bucket.table

    """
    return self._id

str: id property used in input manifest. Contains Keboola Storage ID, e.g. in.c-bucket.table

prop incremental : bool
Expand source code
@property
def incremental(self) -> bool:
    return self._incremental
prop is_alias : bool
Expand source code
@property
def is_alias(self) -> bool:
    return self._is_alias
prop last_change_date : str
Expand source code
@property
def last_change_date(self) -> str:
    return self._last_change_date
prop last_import_date : str
Expand source code
@property
def last_import_date(self) -> str:
    return self._last_import_date
prop primary_key : List[str]
Expand source code
@property
def primary_key(self) -> List[str]:
    if not self._legacy_mode:
        return [column_name for column_name, column_def in self.schema.items() if column_def.primary_key]
    else:
        return self._legacy_primary_key
prop rows_count : int
Expand source code
@property
def rows_count(self) -> int:
    """
            int: rows_count property used in input manifest.

    """
    return self._rows_count

int: rows_count property used in input manifest.

prop schema : TypeOrderedDict[str, ColumnDefinition]
Expand source code
@property
def schema(self) -> TypeOrderedDict[str, ColumnDefinition]:
    return self._schema
prop table_metadataTableMetadata
Expand source code
@property
def table_metadata(self) -> TableMetadata:
    return self._table_metadata
prop uri : str
Expand source code
@property
def uri(self) -> str:
    return self._uri
prop write_always : bool
Expand source code
@property
def write_always(self) -> bool:
    return self._write_always

Methods

def add_column(self,
name: str,
definition: ColumnDefinition = ColumnDefinition(data_types={'base': DataType(dtype='STRING', length=None, default=None)}, nullable=True, primary_key=False, description=None, metadata=None))
Expand source code
def add_column(self, name: str, definition: ColumnDefinition = ColumnDefinition()):
    """
    Add column definition, accepts either ColumnDefinition or a string
    (in which case the base type STRING will be used).
    """

    if name in self._schema:
        raise ValueError(f"Column with name '{name}' already exists")

    self._schema[name] = definition

Add column definition, accepts either ColumnDefinition or a string (in which case the base type STRING will be used).

def add_columns(self,
columns: Union[List[str], Dict[str, ColumnDefinition]])
Expand source code
def add_columns(self, columns: Union[List[str], Dict[str, ColumnDefinition]]):
    if isinstance(columns, list):
        for name in columns:
            self.add_column(name)
    else:
        for name, column in columns.items():
            self.add_column(name, column)
def delete_column(self, column_name: str)
Expand source code
def delete_column(self, column_name: str):

    if column_name not in self.schema:
        raise ValueError(f"Column with name {column_name} not found")
    del self.schema[column_name]
def delete_columns(self, column_names: List[str])
Expand source code
def delete_columns(self, column_names: List[str]):
    for name in column_names:
        self.delete_column(name)
def get_manifest_dictionary(self,
manifest_type: Optional[str] = None,
legacy_queue: bool = False,
legacy_manifest: Optional[bool] = None) ‑> dict
Expand source code
def get_manifest_dictionary(self, manifest_type: Optional[str] = None, legacy_queue: bool = False,
                            legacy_manifest: Optional[bool] = None) -> dict:
    """
    Returns manifest dictionary in appropriate manifest_type: either 'in' or 'out'.
    By default, returns output manifest.
         The result keeps only values that are applicable for
         the selected type of the Manifest file. Because although input and output manifests share most of
         the attributes, some are not shared.

         See [manifest files](https://developers.keboola.com/extend/common-interface/manifest-files)
         for more information.

    Args:
        manifest_type (str): either 'in' or 'out'.
         See [manifest files](https://developers.keboola.com/extend/common-interface/manifest-files)
         for more information.
        legacy_queue (bool): optional flag marking project on legacy queue.(some options are not allowed on queue2)
        legacy_manifest (bool): If True, creates a legacy manifest; otherwise, uses the new format if permitted.

    Returns:
        dict representation of the manifest file in a format expected / produced by the Keboola Connection

    """

    if not manifest_type:
        manifest_type = self.stage

    if self._legacy_mode:
        legacy_manifest = True
    dictionary = self._filter_attributes_by_manifest_type(manifest_type, legacy_queue, legacy_manifest)

    filtered_dictionary = self._filter_dictionary(dictionary)

    return filtered_dictionary

Returns manifest dictionary in appropriate manifest_type: either 'in' or 'out'. By default, returns output manifest. The result keeps only values that are applicable for the selected type of the Manifest file. Because although input and output manifests share most of the attributes, some are not shared.

 See [manifest files](https://developers.keboola.com/extend/common-interface/manifest-files)
 for more information.

Args

manifest_type : str
either 'in' or 'out'.
See manifest files
for more information.
legacy_queue : bool
optional flag marking project on legacy queue.(some options are not allowed on queue2)
legacy_manifest : bool
If True, creates a legacy manifest; otherwise, uses the new format if permitted.

Returns

dict representation of the manifest file in a format expected / produced by the Keboola Connection

def set_delete_where_from_dict(self, delete_where)
Expand source code
def set_delete_where_from_dict(self, delete_where):
    """
    Process metadata as dictionary and returns modified manifest

    Args:
        delete_where: Dictionary of where condition specification

    Returns:
        Manifest dict
    """
    if delete_where:
        if 'column' in delete_where and 'values' in delete_where:
            if not isinstance(delete_where['column'], str):
                raise TypeError("Delete column must be a string")
            if not isinstance(delete_where['values'], list):
                raise TypeError("Delete values must be a list")
            op = delete_where['operator'] or 'eq'
            if (not op == 'eq') and (not op == 'ne'):
                raise ValueError("Delete operator must be 'eq' or 'ne'")
            self.delete_where_values = delete_where['values']
            self.delete_where_column = delete_where['column']
            self.delete_where_operator = op
        else:
            raise ValueError("Delete where specification must contain "
                             "keys 'column' and 'values'")

Process metadata as dictionary and returns modified manifest

Args

delete_where
Dictionary of where condition specification

Returns

Manifest dict

def update_column(self,
name: str,
column_definition: ColumnDefinition)
Expand source code
def update_column(self, name: str, column_definition: ColumnDefinition):
    if not isinstance(column_definition, ColumnDefinition):
        raise ValueError("New column must be an instance of ColumnDefinition")

    if name in self.schema:
        self.schema[name] = column_definition
    else:
        raise ValueError(f'Column with name: "{name}" not found')
def update_columns(self,
columns: Dict[str, ColumnDefinition])
Expand source code
def update_columns(self, columns: Dict[str, ColumnDefinition]):
    for name, column in columns:
        self.update_column(name, column)

Inherited members

class TableInputMapping (source: str = '',
destination: str = None,
limit: int = None,
columns: List[str] = <factory>,
where_values: List[str] = None,
full_path: str = None,
where_operator: str = '',
days: int = 0,
column_types: List[TableColumnTypes] = None)
Expand source code
@dataclass
class TableInputMapping(SubscriptableDataclass):
    """
    Abstraction of [input mapping definition](
    https://developers.keboola.com/extend/common-interface/config-file/#tables) in the config file
    """
    source: str = ''
    destination: str = None
    limit: int = None
    columns: List[str] = dataclasses.field(default_factory=lambda: [])
    where_values: List[str] = None
    full_path: str = None
    where_operator: str = ''
    days: int = 0
    column_types: List[TableColumnTypes] = None

Abstraction of input mapping definition in the config file

Ancestors

Class variables

var column_types : List[TableColumnTypes]
var columns : List[str]
var days : int
var destination : str
var full_path : str
var limit : int
var source : str
var where_operator : str
var where_values : List[str]
class TableMetadata (manifest: dict = None)
Expand source code
class TableMetadata:
    """
    Abstraction of metadata and table_metadata than can be provided within the manifest file. This is useful for
    creation
    of table/column descriptions, assigning column base types etc. without knowing the complexity
    of the json object and the internal KBC metadata keys.

    Example:

        ```python

        tm = TableMetadata()

        # or alternatively load from existing manifest
        # tm = TableMetadata(manifest_dict)

        # add column types
        tm.add_column_types({"column_a":"INTEGER", "column_b":SupportedDataTypes.BOOLEAN.value})

        # add table description
        tm.add_table_description("desc")

        # add column description
        tm.add_column_descriptions({"column_a":"Integer columns", "column_b":"my boolean test"})

        # add arbitrary table metadata
        tm.add_table_metadata("my_arbitrary_key","some value")

        # update manifest
        manifest = {}
        manifest['metadata'] = tm.get_table_metadata_for_manifest()
        manifest['column_metadata'] = tm.get_column_metadata_for_manifest()
        ```


    """

    def __init__(self, manifest: dict = None):
        """

        Args:
            manifest (dict): Existing manifest file
        """
        self.table_metadata = dict()
        self.column_metadata = dict()
        if manifest:
            self.load_table_metadata_from_manifest(manifest)

    def load_table_metadata_from_manifest(self, manifest: dict):
        """
        Load metadata from manifest file.

        Args:
            manifest:

        Returns:TableMetadata

        """

        if manifest.get('schema') and (
                manifest.get('metadata') or manifest.get('column_metadata') or manifest.get('columns')):  # noqa
            raise UserException("Manifest can't contain new 'schema' and old 'metadata'/'column_metadata'/'columns'")

        if not manifest.get('schema'):

            # column metadata
            for column, metadata_list in manifest.get('column_metadata', {}).items():
                for metadata in metadata_list:
                    if not metadata.get('key') and metadata.get('value'):
                        continue
                    key = metadata['key']
                    value = metadata['value']
                    self.add_column_metadata(column, key, value)

        # table metadata
        for metadata in manifest.get('metadata', []):
            if not metadata.get('key') and metadata.get('value'):
                continue
            key = metadata['key']
            value = metadata['value']
            self.add_table_metadata(key, value)

    def get_table_metadata_for_manifest(self, legacy_manifest: bool = False) -> List[dict]:
        """
        Returns table metadata list as required by the
        [manifest format]
        (https://developers.keboola.com/extend/common-interface/manifest-files/#dataintables-manifests)

        e.g.
        tm = TableMetadata()
        manifest['metadata'] = tm.table_metadata

        Returns: List[dict]

        """
        if legacy_manifest:
            final_metadata_list = [{'key': key,
                                    'value': value}
                                   for key, value in self.table_metadata.items() if value not in [None, '']]
        else:
            final_metadata_list = {key: value
                                   for key, value in self.table_metadata.items() if value not in [None, '']}

        return final_metadata_list

    @deprecated(version='1.5.1', reason="Please use schema instead of Column Metadata")
    def get_column_metadata_for_manifest(self) -> dict:
        """
                Returns column metadata dict as required by the
                [manifest format](https://developers.keboola.com/extend/common-interface/manifest-files/#dataintables
                -manifests)

                e.g.
                tm = TableMetadata()
                manifest['column_metadata'] = tm.column_metadata

                Returns: dict

        """

        return self._get_legacy_column_metadata_for_manifest()

    def _get_legacy_column_metadata_for_manifest(self) -> dict:
        """
                Returns column metadata dict as required by the
                [manifest format](https://developers.keboola.com/extend/common-interface/manifest-files/#dataintables
                -manifests)

                e.g.
                tm = TableMetadata()
                manifest['column_metadata'] = tm.column_metadata

                Returns: dict

        """
        final_column_metadata = dict()

        # collect unique metadata keys
        for column in self.column_metadata:
            column_metadata_dicts = self.column_metadata[column]
            if not final_column_metadata.get(column):
                final_column_metadata[column] = list()

            column_metadata = [{'key': key,
                                'value': value} for key, value in
                               column_metadata_dicts.items() if value not in [None, '']]
            final_column_metadata[column].extend(column_metadata)

        return final_column_metadata

    @property
    @deprecated(version='1.5.1', reason="Please use TableDefinition.description instead of TableMetadata")
    def table_description(self) -> str:
        """
        Returns table description (KBC.description)

        Returns: str

        """
        return self.table_metadata.get(KBCMetadataKeys.description.value)

    @property
    @deprecated(version='1.5.1', reason="Column datatypes were moved to dao.TableDefinition.schema property."
                                        "Please use the dao.ColumnDefinition objects")
    def column_datatypes(self) -> dict:
        """
        Return dictionary of column base datatypes
        e.g. {"col1name":"basetype"}

        Returns: dict e.g. {"col1name":"basetype"}

        """

        return self.get_columns_metadata_by_key(KBCMetadataKeys.base_data_type.value)

    @property
    @deprecated(version='1.5.1', reason="Column datatypes were moved to dao.TableDefinition.schema property."
                                        " Please use the dao.ColumnDefinition objects")
    def column_descriptions(self) -> dict:
        """
        Return dictionary of column descriptions
        e.g. {"col1name":"desc"}

        Returns: dict e.g. {"col1name":"desc"}

        """

        return self.get_columns_metadata_by_key(KBCMetadataKeys.description.value)

    @deprecated(version='1.5.1', reason="Please use schema instead of Table Metadata")
    def get_columns_metadata_by_key(self, metadata_key) -> dict:
        """
        Returns all columns with specified metadata_key as dictionary of column:metadata_key pairs
        e.g. {"col1name":"value_of_metadata_with_the_key"}

        Returns: dict e.g. {"col1name":"value_of_metadata_with_the_key"}

        """
        column_types = dict()
        for col in self.column_metadata:
            if col.get(metadata_key):
                column_types[col] = col[metadata_key]

        return column_types

    def add_column_descriptions(self, column_descriptions: dict):
        """
                Add column description metadata. It will be shown in the KBC Storage UI.

                Args:
                    column_descriptions: dict -> {"colname":"description"}

                """
        for col in column_descriptions:
            self.add_column_metadata(col, KBCMetadataKeys.description.value, column_descriptions[col])

    @deprecated(version='1.5.1', reason="Column datatypes were moved to dao.TableDefinition.schema property."
                                        "Please use the dao.ColumnDefinition objects and associated"
                                        "dao.TableDefinition methods to define columns. e.g."
                                        "dao.TableDefinition.add_columns()")
    def add_column_data_types(self, column_types: Dict[str, Union[SupportedDataTypes, str]]):
        """
        Add column types metadata. Note that only supported datatypes
        (<keboola.component.dao.ColumnDataTypes>) may be provided. The value accepts either instance of ColumnDataTypes
        or a valid string.

        Args:
            column_types (Dict[str, Union[SupportedDataTypes, str]]): dict -> {"colname":"datatype"}

        Raises:
            ValueError when the provided data type value is not recognized
        """

        for col in column_types:
            self.add_column_data_type(col, column_types[col])

    @deprecated(version='1.5.1', reason="Column datatypes were moved to dao.TableDefinition.schema property."
                                        "Please use the dao.ColumnDefinition objects and associated"
                                        "dao.TableDefinition methods to define columns. e.g."
                                        "dao.TableDefinition.add_column()")
    def add_column_data_type(self, column: str, data_type: Union[SupportedDataTypes, str],
                             source_data_type: str = None,
                             nullable: bool = False,
                             length: str = None, default=None):
        """
        Add single column data type
        Args:
            column (str): name of the column
            data_type (Union[SupportedDataTypes, str]):
                Either instance of ColumnDataTypes enum or a valid string. Basetype supported by KBC.
                base type of a column as defined in
                [php-datatypes](https://github.com/keboola/php-datatypes#base-types);
                see getBaseType implementations (e.g., [mysql](https://github.com/keboola/
                php-datatypes/blob/325fe4eff3e3dfae986ebbdb769eaefd18be6086/src/Definition/MySQL.php#L225))
                for mapping between KBC.datatype.type and KBC.datatype.basetype
            source_data_type (str):
                Optional. Data type of a column - extracted value from the source.
            nullable (bool): Is column nullable? KBC input mapping converts empty values to NULL
            length (str): Column length when applicable e.g. 39,8; 4000
            default: Default value

        Raises:
            ValueError when the provided data_type is not recognized

        """
        if isinstance(data_type, SupportedDataTypes):
            base_type = data_type.value
        else:
            self._validate_data_types({column: data_type})
            base_type = data_type

        self.add_column_metadata(column, KBCMetadataKeys.base_data_type.value, base_type)
        self.add_column_metadata(column, KBCMetadataKeys.data_type_nullable.value, nullable)

        if source_data_type is not None:
            self.add_column_metadata(column, KBCMetadataKeys.source_data_type.value, source_data_type)

        if length is not None:
            self.add_column_metadata(column, KBCMetadataKeys.data_type_length.value, length)
        if default is not None:
            self.add_column_metadata(column, KBCMetadataKeys.data_type_default.value, default)

    def add_table_description(self, description: str):
        """
        Adds/Updates table description that is displayed in the Storage UI
        Args:
            description: str
        """
        self.add_table_metadata(KBCMetadataKeys.description.value, description)

    def add_table_metadata(self, key: str, value: str):
        """
                Add/Updates table metadata and ensures the Key is unique.
                Args:

        """
        if value is None:
            return
        self.table_metadata = {**self.table_metadata, **{key: value}}

    @deprecated(version='1.5.1', reason="Column metadata ere moved to dao.TableDefinition.schema property."
                                        "Please use the dao.ColumnDefinition.metadata")
    def add_column_metadata(self, column: str, key: str, value: Union[str, bool, int], backend="base"):
        """
        Add/Updates column metadata and ensures the Key is unique.
        Args:

        """
        if value is None:
            return
        if not self.column_metadata.get(column):
            self.column_metadata[column] = dict()

        self.column_metadata[column][key] = value

        # self.schema = [ColumnDefinition(name=column, data_type={backend: DataType(type=value)})]

    @deprecated(version='1.5.1', reason="Column metadata ere moved to dao.TableDefinition.schema property."
                                        "Please use the dao.ColumnDefinition.metadata")
    def add_multiple_column_metadata(self, column_metadata: Dict[str, List[dict]]):
        """
        Add key-value pairs to column metadata.

        **NOTE:** Ensures uniqueness
        Args:
            column_metadata: dict {"column_name":[{"some_key":"some_value"}]}
        """
        for column, metadata_list in column_metadata:
            for metadata in metadata_list:
                key = metadata.items()[0]
                value = metadata[key]
                self.add_column_metadata(column, key, value)

    @staticmethod
    def _validate_data_types(column_types: dict):
        errors = []
        for col in column_types:
            dtype = column_types[col]
            if not SupportedDataTypes.is_valid_type(dtype):
                errors.append(f'Datatype "{dtype}" is not valid KBC Basetype!')
        if errors:
            raise ValueError(', '.join(errors) + f'\n Supported base types are: [{SupportedDataTypes.list()}]')

Abstraction of metadata and table_metadata than can be provided within the manifest file. This is useful for creation of table/column descriptions, assigning column base types etc. without knowing the complexity of the json object and the internal KBC metadata keys.

Example


tm = TableMetadata()

# or alternatively load from existing manifest
# tm = TableMetadata(manifest_dict)

# add column types
tm.add_column_types({"column_a":"INTEGER", "column_b":SupportedDataTypes.BOOLEAN.value})

# add table description
tm.add_table_description("desc")

# add column description
tm.add_column_descriptions({"column_a":"Integer columns", "column_b":"my boolean test"})

# add arbitrary table metadata
tm.add_table_metadata("my_arbitrary_key","some value")

# update manifest
manifest = {}
manifest['metadata'] = tm.get_table_metadata_for_manifest()
manifest['column_metadata'] = tm.get_column_metadata_for_manifest()

Args

manifest : dict
Existing manifest file

Instance variables

prop column_datatypes : dict
Expand source code
@property
@deprecated(version='1.5.1', reason="Column datatypes were moved to dao.TableDefinition.schema property."
                                    "Please use the dao.ColumnDefinition objects")
def column_datatypes(self) -> dict:
    """
    Return dictionary of column base datatypes
    e.g. {"col1name":"basetype"}

    Returns: dict e.g. {"col1name":"basetype"}

    """

    return self.get_columns_metadata_by_key(KBCMetadataKeys.base_data_type.value)

Return dictionary of column base datatypes e.g. {"col1name":"basetype"}

Returns: dict e.g. {"col1name":"basetype"}

prop column_descriptions : dict
Expand source code
@property
@deprecated(version='1.5.1', reason="Column datatypes were moved to dao.TableDefinition.schema property."
                                    " Please use the dao.ColumnDefinition objects")
def column_descriptions(self) -> dict:
    """
    Return dictionary of column descriptions
    e.g. {"col1name":"desc"}

    Returns: dict e.g. {"col1name":"desc"}

    """

    return self.get_columns_metadata_by_key(KBCMetadataKeys.description.value)

Return dictionary of column descriptions e.g. {"col1name":"desc"}

Returns: dict e.g. {"col1name":"desc"}

prop table_description : str
Expand source code
@property
@deprecated(version='1.5.1', reason="Please use TableDefinition.description instead of TableMetadata")
def table_description(self) -> str:
    """
    Returns table description (KBC.description)

    Returns: str

    """
    return self.table_metadata.get(KBCMetadataKeys.description.value)

Returns table description (KBC.description)

Returns: str

Methods

def add_column_data_type(self,
column: str,
data_type: Union[SupportedDataTypes, str],
source_data_type: str = None,
nullable: bool = False,
length: str = None,
default=None)
Expand source code
@deprecated(version='1.5.1', reason="Column datatypes were moved to dao.TableDefinition.schema property."
                                    "Please use the dao.ColumnDefinition objects and associated"
                                    "dao.TableDefinition methods to define columns. e.g."
                                    "dao.TableDefinition.add_column()")
def add_column_data_type(self, column: str, data_type: Union[SupportedDataTypes, str],
                         source_data_type: str = None,
                         nullable: bool = False,
                         length: str = None, default=None):
    """
    Add single column data type
    Args:
        column (str): name of the column
        data_type (Union[SupportedDataTypes, str]):
            Either instance of ColumnDataTypes enum or a valid string. Basetype supported by KBC.
            base type of a column as defined in
            [php-datatypes](https://github.com/keboola/php-datatypes#base-types);
            see getBaseType implementations (e.g., [mysql](https://github.com/keboola/
            php-datatypes/blob/325fe4eff3e3dfae986ebbdb769eaefd18be6086/src/Definition/MySQL.php#L225))
            for mapping between KBC.datatype.type and KBC.datatype.basetype
        source_data_type (str):
            Optional. Data type of a column - extracted value from the source.
        nullable (bool): Is column nullable? KBC input mapping converts empty values to NULL
        length (str): Column length when applicable e.g. 39,8; 4000
        default: Default value

    Raises:
        ValueError when the provided data_type is not recognized

    """
    if isinstance(data_type, SupportedDataTypes):
        base_type = data_type.value
    else:
        self._validate_data_types({column: data_type})
        base_type = data_type

    self.add_column_metadata(column, KBCMetadataKeys.base_data_type.value, base_type)
    self.add_column_metadata(column, KBCMetadataKeys.data_type_nullable.value, nullable)

    if source_data_type is not None:
        self.add_column_metadata(column, KBCMetadataKeys.source_data_type.value, source_data_type)

    if length is not None:
        self.add_column_metadata(column, KBCMetadataKeys.data_type_length.value, length)
    if default is not None:
        self.add_column_metadata(column, KBCMetadataKeys.data_type_default.value, default)

Add single column data type

Args

column : str
name of the column
data_type (Union[SupportedDataTypes, str]):
Either instance of ColumnDataTypes enum or a valid string. Basetype supported by KBC.
base type of a column as defined in
php-datatypes;
see getBaseType implementations (e.g., [mysql](https://github.com/keboola/
php-datatypes/blob/325fe4eff3e3dfae986ebbdb769eaefd18be6086/src/Definition/MySQL.php#L225))
for mapping between KBC.datatype.type and KBC.datatype.basetype
source_data_type (str):
Optional. Data type of a column - extracted value from the source.
nullable : bool
Is column nullable? KBC input mapping converts empty values to NULL
length : str
Column length when applicable e.g. 39,8; 4000
default
Default value

Raises

ValueError when the provided data_type is not recognized

def add_column_data_types(self,
column_types: Dict[str, Union[SupportedDataTypes, str]])
Expand source code
@deprecated(version='1.5.1', reason="Column datatypes were moved to dao.TableDefinition.schema property."
                                    "Please use the dao.ColumnDefinition objects and associated"
                                    "dao.TableDefinition methods to define columns. e.g."
                                    "dao.TableDefinition.add_columns()")
def add_column_data_types(self, column_types: Dict[str, Union[SupportedDataTypes, str]]):
    """
    Add column types metadata. Note that only supported datatypes
    (<keboola.component.dao.ColumnDataTypes>) may be provided. The value accepts either instance of ColumnDataTypes
    or a valid string.

    Args:
        column_types (Dict[str, Union[SupportedDataTypes, str]]): dict -> {"colname":"datatype"}

    Raises:
        ValueError when the provided data type value is not recognized
    """

    for col in column_types:
        self.add_column_data_type(col, column_types[col])

Add column types metadata. Note that only supported datatypes () may be provided. The value accepts either instance of ColumnDataTypes or a valid string.

Args

column_types : Dict[str, Union[SupportedDataTypes, str]]
dict -> {"colname":"datatype"}

Raises

ValueError when the provided data type value is not recognized

def add_column_descriptions(self, column_descriptions: dict)
Expand source code
def add_column_descriptions(self, column_descriptions: dict):
    """
            Add column description metadata. It will be shown in the KBC Storage UI.

            Args:
                column_descriptions: dict -> {"colname":"description"}

            """
    for col in column_descriptions:
        self.add_column_metadata(col, KBCMetadataKeys.description.value, column_descriptions[col])

Add column description metadata. It will be shown in the KBC Storage UI.

Args

column_descriptions
dict -> {"colname":"description"}
def add_column_metadata(self, column: str, key: str, value: Union[str, bool, int], backend='base')
Expand source code
@deprecated(version='1.5.1', reason="Column metadata ere moved to dao.TableDefinition.schema property."
                                    "Please use the dao.ColumnDefinition.metadata")
def add_column_metadata(self, column: str, key: str, value: Union[str, bool, int], backend="base"):
    """
    Add/Updates column metadata and ensures the Key is unique.
    Args:

    """
    if value is None:
        return
    if not self.column_metadata.get(column):
        self.column_metadata[column] = dict()

    self.column_metadata[column][key] = value

    # self.schema = [ColumnDefinition(name=column, data_type={backend: DataType(type=value)})]

Add/Updates column metadata and ensures the Key is unique. Args:

def add_multiple_column_metadata(self, column_metadata: Dict[str, List[dict]])
Expand source code
@deprecated(version='1.5.1', reason="Column metadata ere moved to dao.TableDefinition.schema property."
                                    "Please use the dao.ColumnDefinition.metadata")
def add_multiple_column_metadata(self, column_metadata: Dict[str, List[dict]]):
    """
    Add key-value pairs to column metadata.

    **NOTE:** Ensures uniqueness
    Args:
        column_metadata: dict {"column_name":[{"some_key":"some_value"}]}
    """
    for column, metadata_list in column_metadata:
        for metadata in metadata_list:
            key = metadata.items()[0]
            value = metadata[key]
            self.add_column_metadata(column, key, value)

Add key-value pairs to column metadata.

NOTE: Ensures uniqueness

Args

column_metadata
dict {"column_name":[{"some_key":"some_value"}]}
def add_table_description(self, description: str)
Expand source code
def add_table_description(self, description: str):
    """
    Adds/Updates table description that is displayed in the Storage UI
    Args:
        description: str
    """
    self.add_table_metadata(KBCMetadataKeys.description.value, description)

Adds/Updates table description that is displayed in the Storage UI

Args

description
str
def add_table_metadata(self, key: str, value: str)
Expand source code
def add_table_metadata(self, key: str, value: str):
    """
            Add/Updates table metadata and ensures the Key is unique.
            Args:

    """
    if value is None:
        return
    self.table_metadata = {**self.table_metadata, **{key: value}}

Add/Updates table metadata and ensures the Key is unique. Args:

def get_column_metadata_for_manifest(self) ‑> dict
Expand source code
@deprecated(version='1.5.1', reason="Please use schema instead of Column Metadata")
def get_column_metadata_for_manifest(self) -> dict:
    """
            Returns column metadata dict as required by the
            [manifest format](https://developers.keboola.com/extend/common-interface/manifest-files/#dataintables
            -manifests)

            e.g.
            tm = TableMetadata()
            manifest['column_metadata'] = tm.column_metadata

            Returns: dict

    """

    return self._get_legacy_column_metadata_for_manifest()

Returns column metadata dict as required by the manifest format

e.g. tm = TableMetadata() manifest['column_metadata'] = tm.column_metadata

Returns: dict

def get_columns_metadata_by_key(self, metadata_key) ‑> dict
Expand source code
@deprecated(version='1.5.1', reason="Please use schema instead of Table Metadata")
def get_columns_metadata_by_key(self, metadata_key) -> dict:
    """
    Returns all columns with specified metadata_key as dictionary of column:metadata_key pairs
    e.g. {"col1name":"value_of_metadata_with_the_key"}

    Returns: dict e.g. {"col1name":"value_of_metadata_with_the_key"}

    """
    column_types = dict()
    for col in self.column_metadata:
        if col.get(metadata_key):
            column_types[col] = col[metadata_key]

    return column_types

Returns all columns with specified metadata_key as dictionary of column:metadata_key pairs e.g. {"col1name":"value_of_metadata_with_the_key"}

Returns: dict e.g. {"col1name":"value_of_metadata_with_the_key"}

def get_table_metadata_for_manifest(self, legacy_manifest: bool = False) ‑> List[dict]
Expand source code
def get_table_metadata_for_manifest(self, legacy_manifest: bool = False) -> List[dict]:
    """
    Returns table metadata list as required by the
    [manifest format]
    (https://developers.keboola.com/extend/common-interface/manifest-files/#dataintables-manifests)

    e.g.
    tm = TableMetadata()
    manifest['metadata'] = tm.table_metadata

    Returns: List[dict]

    """
    if legacy_manifest:
        final_metadata_list = [{'key': key,
                                'value': value}
                               for key, value in self.table_metadata.items() if value not in [None, '']]
    else:
        final_metadata_list = {key: value
                               for key, value in self.table_metadata.items() if value not in [None, '']}

    return final_metadata_list

Returns table metadata list as required by the [manifest format] (https://developers.keboola.com/extend/common-interface/manifest-files/#dataintables-manifests)

e.g. tm = TableMetadata() manifest['metadata'] = tm.table_metadata

Returns: List[dict]

def load_table_metadata_from_manifest(self, manifest: dict)
Expand source code
def load_table_metadata_from_manifest(self, manifest: dict):
    """
    Load metadata from manifest file.

    Args:
        manifest:

    Returns:TableMetadata

    """

    if manifest.get('schema') and (
            manifest.get('metadata') or manifest.get('column_metadata') or manifest.get('columns')):  # noqa
        raise UserException("Manifest can't contain new 'schema' and old 'metadata'/'column_metadata'/'columns'")

    if not manifest.get('schema'):

        # column metadata
        for column, metadata_list in manifest.get('column_metadata', {}).items():
            for metadata in metadata_list:
                if not metadata.get('key') and metadata.get('value'):
                    continue
                key = metadata['key']
                value = metadata['value']
                self.add_column_metadata(column, key, value)

    # table metadata
    for metadata in manifest.get('metadata', []):
        if not metadata.get('key') and metadata.get('value'):
            continue
        key = metadata['key']
        value = metadata['value']
        self.add_table_metadata(key, value)

Load metadata from manifest file.

Args

manifest: Returns:TableMetadata

class TableOutputMapping (source: str,
destination: str,
incremental: bool = False,
columns: str = '',
primary_key: str = '',
delete_where_column: str = '',
delete_where_operator: str = '',
delete_where_values: str = '',
delimiter: str = '',
enclosure: str = '')
Expand source code
@dataclass
class TableOutputMapping(SubscriptableDataclass):
    """
    Abstraction of [output mapping definition](
    https://developers.keboola.com/extend/common-interface/config-file/#tables) in the config file
    """
    source: str
    destination: str
    incremental: bool = False
    columns: str = ''
    primary_key: str = ''
    delete_where_column: str = ''
    delete_where_operator: str = ''
    delete_where_values: str = ''
    delimiter: str = ''
    enclosure: str = ''

Abstraction of output mapping definition in the config file

Ancestors

Class variables

var columns : str
var delete_where_column : str
var delete_where_operator : str
var delete_where_values : str
var delimiter : str
var destination : str
var enclosure : str
var incremental : bool
var primary_key : str
var source : str