Module keboola.component.interface
Expand source code
import argparse
import csv
import glob
import json
import logging
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Optional, Union
from deprecated import deprecated
from pygelf import GelfUdpHandler, GelfTcpHandler
from pytz import utc
from . import dao
from .exceptions import UserException
def register_csv_dialect():
"""
Register the KBC CSV dialect
"""
csv.register_dialect('kbc', lineterminator='\n', delimiter=',',
quotechar='"')
def init_environment_variables() -> dao.EnvironmentVariables:
"""
Initializes environment variables available in the docker environment
https://developers.keboola.com/extend/common-interface/environment/#environment-variables
Returns:
dao.EnvironmentVariables:
"""
return dao.EnvironmentVariables(data_dir=os.environ.get('KBC_DATADIR', None),
run_id=os.environ.get('KBC_RUNID', None),
project_id=os.environ.get('KBC_PROJECTID', None),
stack_id=os.environ.get('KBC_STACKID', None),
config_id=os.environ.get('KBC_CONFIGID', None),
component_id=os.environ.get('KBC_COMPONENTID', None),
config_row_id=os.environ.get('KBC_CONFIGROWID', None),
branch_id=os.environ.get('KBC_BRANCHID', None),
staging_file_provider=os.environ.get('KBC_STAGING_FILE_PROVIDER', None),
project_name=os.environ.get('KBC_PROJECTNAME', None),
token_id=os.environ.get('KBC_TOKENID', None),
token_desc=os.environ.get('KBC_TOKENDESC', None),
token=os.environ.get('KBC_TOKEN', None),
url=os.environ.get('KBC_URL', None),
real_user=os.environ.get('KBC_REALUSER', None),
logger_addr=os.environ.get('KBC_LOGGER_ADDR', None),
logger_port=os.environ.get('KBC_LOGGER_PORT', None)
)
class CommonInterface:
"""
A class handling standard tasks related to the
[Keboola Common Interface](https://developers.keboola.com/extend/common-interface/)
e.g. config load, validation, component state, I/O handling, I/O metadata and manifest files.
It initializes the environment inject into the Docker container the KBC component runs in and abstracts the tasks
related to the Common Interface interaction.
Attributes:
data_folder_path (str):
Full path to the /data folder
"""
LOGGING_TYPE_STD = 'std'
LOGGING_TYPE_GELF = 'gelf'
def __init__(self, data_folder_path: str = None, log_level=logging.INFO, logging_type=None):
"""
Initializes the CommonInterface environment. If the data_folder_path is not specified the folder
is established in following order:
- From provided argument if present: `-d` or `--data`
- From environment variable if present (KBC_DATADIR)
- Defaults to /data/ if none of the above is specified
Args:
data_folder_path (str): path to a data folder.
log_level (int): logging.INFO or logging.DEBUG
logging_type (str): optional 'std' or 'gelf', if left empty determined automatically
"""
self.environment_variables = init_environment_variables()
register_csv_dialect()
# init logging
logging_type_inf = CommonInterface.LOGGING_TYPE_GELF if os.getenv('KBC_LOGGER_ADDR',
None) else CommonInterface.LOGGING_TYPE_STD
if not logging_type:
logging_type = logging_type_inf
if logging_type == CommonInterface.LOGGING_TYPE_STD:
self.set_default_logger(log_level)
elif logging_type == CommonInterface.LOGGING_TYPE_GELF:
self.set_gelf_logger(log_level)
# init data folder
if not data_folder_path:
data_folder_path = self._get_data_folder_from_context()
# validate
if not os.path.exists(data_folder_path) and not os.path.isdir(data_folder_path):
raise ValueError(
f"The data directory does not exist, verify that the data directory is correct. Dir: "
f"{data_folder_path}"
)
self.data_folder_path = data_folder_path
def _get_data_folder_from_context(self):
# try to get from argument parameter
# get from parameters
argparser = argparse.ArgumentParser()
argparser.add_argument(
'-d',
'--data',
dest='data_dir',
default='',
help='Data directory'
)
# unknown is to ignore extra arguments
args, unknown = argparser.parse_known_args()
data_folder_path = args.data_dir
if data_folder_path == '' and self.environment_variables.data_dir:
data_folder_path = self.environment_variables.data_dir
elif data_folder_path == '':
data_folder_path = '/data/'
return data_folder_path
# ================================= Logging ==============================
@staticmethod
def set_default_logger(log_level: int = logging.INFO): # noqa: E301
"""
Sets default console logger.
Args:
log_level: logging level, default: 'logging.INFO'
Returns:
Logger object
"""
class InfoFilter(logging.Filter):
def filter(self, rec):
return rec.levelno in (logging.DEBUG, logging.INFO)
hd1 = logging.StreamHandler(sys.stdout)
hd1.addFilter(InfoFilter())
hd2 = logging.StreamHandler(sys.stderr)
hd2.setLevel(logging.WARNING)
logging.getLogger().setLevel(log_level)
# remove default handler
for h in logging.getLogger().handlers:
logging.getLogger().removeHandler(h)
logging.getLogger().addHandler(hd1)
logging.getLogger().addHandler(hd2)
logger = logging.getLogger()
return logger
@staticmethod
def set_gelf_logger(log_level: int = logging.INFO, transport_layer='TCP',
stdout=False, include_extra_fields=True, **gelf_kwargs): # noqa: E301
"""
Sets gelf console logger. Handler for console output is not included by default,
for testing in non-gelf environments use stdout=True.
Args:
log_level: logging level, default: 'logging.INFO'
transport_layer: 'TCP' or 'UDP', default:'UDP
stdout: if set to True, Stout handler is also included
include_extra_fields:
Include extra GELF fields in the log messages.
e.g. logging.warning('Some warning',
extra={"additional_info": "Extra info to be displayed in the detail"}
Returns: Logger object
"""
# remove existing handlers
for h in logging.getLogger().handlers:
logging.getLogger().removeHandler(h)
if stdout:
CommonInterface.set_default_logger(log_level)
# gelf handler setup
gelf_kwargs['include_extra_fields'] = include_extra_fields
host = os.getenv('KBC_LOGGER_ADDR', 'localhost')
port = os.getenv('KBC_LOGGER_PORT', 12201)
if transport_layer == 'TCP':
gelf = GelfTcpHandler(host=host, port=port, **gelf_kwargs)
elif transport_layer == 'UDP':
gelf = GelfUdpHandler(host=host, port=port, **gelf_kwargs)
else:
raise ValueError(F'Unsupported gelf transport layer: {transport_layer}. Choose TCP or UDP')
logging.getLogger().setLevel(log_level)
logging.getLogger().addHandler(gelf)
logger = logging.getLogger()
return logger
def get_state_file(self) -> dict:
"""
Returns dict representation of state file or empty dict if not present
Returns:
dict:
"""
logging.info('Loading state file..')
state_file_path = os.path.join(self.data_folder_path, 'in', 'state.json')
if not os.path.isfile(state_file_path):
logging.info('State file not found. First run?')
return {}
try:
with open(state_file_path, 'r') \
as state_file:
return json.load(state_file)
except (OSError, IOError):
raise ValueError(
"State file state.json unable to read "
)
def write_state_file(self, state_dict: dict):
"""
Stores [state file](https://developers.keboola.com/extend/common-interface/config-file/#state-file).
Args:
state_dict (dict):
"""
if not isinstance(state_dict, dict):
raise TypeError('Dictionary expected as a state file datatype!')
with open(os.path.join(self.configuration.data_dir, 'out', 'state.json'), 'w+') as state_file:
json.dump(state_dict, state_file)
def get_input_table_definition_by_name(self, table_name: str) -> dao.TableDefinition:
"""
Return dao.TableDefinition object by table name.
If nor the table itself or it's manifest exists, a ValueError is thrown.
The dao.TableDefinition will contain full path of the source file, it's name and manifest (if present). It also
provides methods for updating the manifest metadata.
Args:
table_name: Destination table name (name of .csv file). e.g. input.csv
Returns:
dao.TableDefinition
"""
manifest_path = os.path.join(
self.tables_in_path,
table_name + '.manifest'
)
return dao.TableDefinition.build_from_manifest(manifest_path)
def get_input_tables_definitions(self, orphaned_manifests=False) -> List[dao.TableDefinition]:
"""
Return dao.TableDefinition objects by scanning the `data/in/tables` folder.
The dao.TableDefinition will contain full path of the source file, it's name and manifest (if present). It also
provides methods for updating the manifest metadata.
By default, orphaned manifests are skipped.
See Also: keboola.component.dao.dao.TableDefinition
Args:
orphaned_manifests (bool): If True, manifests without corresponding files are fetched. This is useful in
in scenarios where [workspaces exchange](
https://developers.keboola.com/extend/common-interface/folders/#exchanging-data-via-workspace) is used
e.g. when only manifest files are present in the `data/in/tables` folder.
Returns: List[dao.TableDefinition]
"""
table_files = [f for f in glob.glob(self.tables_in_path + "/**", recursive=False) if
not f.endswith('.manifest')]
table_defs = list()
for t in table_files:
p = Path(t)
manifest_path = t + '.manifest'
if p.is_dir() and not Path(manifest_path).exists():
# skip folders that do not have matching manifest
logging.warning(f'Folder {t} does not have matching manifest, it will be ignored!')
continue
table_defs.append(dao.TableDefinition.build_from_manifest(manifest_path))
if orphaned_manifests:
files_w_manifest = [t.name + '.manifest' for t in table_defs]
manifest_files = [f for f in glob.glob(self.tables_in_path + "/**.manifest", recursive=False)
if Path(f).name not in files_w_manifest]
for t in manifest_files:
p = Path(t)
if p.is_dir():
# skip folders that do not have matching manifest
logging.warning(f'Manifest {t} is folder,s skipping!')
continue
table_defs.append(dao.TableDefinition.build_from_manifest(t))
return table_defs
def _create_table_definition(self, name: str,
storage_stage: str = 'out',
is_sliced: bool = False,
destination: str = '',
primary_key: List[str] = None,
columns: List[str] = None,
incremental: bool = None,
table_metadata: dao.TableMetadata = None,
enclosure: str = '"',
delimiter: str = ',',
delete_where: dict = None,
write_always: bool = False) -> dao.TableDefinition:
"""
Helper method for dao.TableDefinition creation along with the "manifest".
It initializes path according to the storage_stage type.
Args:
name: Table / file name. e.g. `'my_table.csv'`.
storage_stage:
default value: 'out'
either `'in'` or `'out'`. Determines the path to result file.
E.g. `data/tables/in/my_table.csv`
is_sliced: True if the full_path points to a folder with sliced tables
destination: String name of the table in Storage.
primary_key: List with names of columns used for primary key.
columns: List of columns for headless CSV files
incremental: Set to true to enable incremental loading
table_metadata: <.dao.TableMetadata> object containing column and table metadata
enclosure: str: CSV enclosure, by default "
delimiter: str: CSV delimiter, by default ,
delete_where: Dict with settings for deleting rows
write_always: Bool: If true, the table will be saved to Storage even when the job execution
fails.
"""
if storage_stage == 'in':
full_path = os.path.join(self.tables_in_path, name)
elif storage_stage == 'out':
full_path = os.path.join(self.tables_out_path, name)
else:
raise ValueError(f'Invalid storage_stage value "{storage_stage}". Supported values are: "in" or "out"!')
return dao.TableDefinition(name=name,
full_path=full_path,
is_sliced=is_sliced,
destination=destination,
primary_key=primary_key,
columns=columns,
incremental=incremental,
table_metadata=table_metadata,
enclosure=enclosure,
delimiter=delimiter,
delete_where=delete_where,
stage=storage_stage,
write_always=write_always)
def create_in_table_definition(self, name: str,
is_sliced: bool = False,
destination: str = '',
primary_key: List[str] = None,
columns: List[str] = None,
incremental: bool = None,
table_metadata: dao.TableMetadata = None,
delete_where: str = None) -> dao.TableDefinition:
"""
Helper method for input dao.TableDefinition creation along with the "manifest".
It initializes path in data/tables/in/ folder.
Args:
name: Table / file name. e.g. `'my_table.csv'`.
is_sliced: True if the full_path points to a folder with sliced tables
destination: String name of the table in Storage.
primary_key: List with names of columns used for primary key.
columns: List of columns for headless CSV files
incremental: Set to true to enable incremental loading
table_metadata: <.dao.TableMetadata> object containing column and table metadata
delete_where: Dict with settings for deleting rows
"""
return self._create_table_definition(name=name,
storage_stage='in',
is_sliced=is_sliced,
destination=destination,
primary_key=primary_key,
columns=columns,
incremental=incremental,
table_metadata=table_metadata,
delete_where=delete_where)
def create_out_table_definition(self, name: str,
is_sliced: bool = False,
destination: str = '',
primary_key: List[str] = None,
columns: List[str] = None,
incremental: bool = None,
table_metadata: dao.TableMetadata = None,
enclosure: str = '"',
delimiter: str = ',',
delete_where: dict = None,
write_always: bool = False) -> dao.TableDefinition:
"""
Helper method for output dao.TableDefinition creation along with the "manifest".
It initializes path in data/tables/out/ folder.
Args:
name: Table / file name. e.g. `'my_table.csv'`.
is_sliced: True if the full_path points to a folder with sliced tables
destination: String name of the table in Storage.
primary_key: List with names of columns used for primary key.
columns: List of columns for headless CSV files
incremental: Set to true to enable incremental loading
table_metadata: <.dao.TableMetadata> object containing column and table metadata
enclosure: str: CSV enclosure, by default "
delimiter: str: CSV delimiter, by default ,
delete_where: Dict with settings for deleting rows
write_always: Bool: If true, the table will be saved to Storage even when the job execution
fails.
"""
return self._create_table_definition(name=name,
storage_stage='out',
is_sliced=is_sliced,
destination=destination,
primary_key=primary_key,
columns=columns,
incremental=incremental,
table_metadata=table_metadata,
enclosure=enclosure,
delimiter=delimiter,
delete_where=delete_where,
write_always=write_always)
# # File processing
def get_input_file_definitions_grouped_by_tag_group(self, orphaned_manifests=False,
only_latest_files=True,
tags: List[str] = None,
include_system_tags=False) \
-> Dict[str, List[dao.FileDefinition]]:
"""
Convenience method returning lists of files in dictionary grouped by tag group.
(tag group is string built from alphabetically ordered and concatenated tags, e.g. 'tag1;tag2'
Args:
orphaned_manifests (bool): If True, manifests without corresponding files are fetched. Otherwise
a ValueError is raised.
only_latest_files (bool): If True, only latest versions of each files are included.
tags (List[str]): optional list of tags. If specified only files containing specified tags will be fetched.
include_system_tags (bool): optional flag that will use system generated tags in groups as well.
See FileDefinition.SYSTEM_TAG_PREFIXES
Returns:
Dict[str,List[dao.FileDefinition]] indexed by tag group => string built from alphabetically ordered
and concatenated tags, e.g. `tag1;tag2`
"""
file_definitions = self.get_input_files_definitions(orphaned_manifests, only_latest_files, tags)
return self.__group_file_defs_by_tag_group(file_definitions, include_system_tags=include_system_tags)
def get_input_file_definitions_grouped_by_name(self, orphaned_manifests=False, only_latest_files=True,
tags: List[str] = None) -> Dict[str, List[dao.FileDefinition]]:
"""
Convenience method returning lists of files in dictionary grouped by file name.
Args:
orphaned_manifests (bool): If True, manifests without corresponding files are fetched. Otherwise
a ValueError is raised.
only_latest_files (bool): If True, only latest versions of each files are included.
tags (List[str]): optional list of tags. If specified only files with matching tag group will be fetched.
Returns:
"""
file_definitions = self.get_input_files_definitions(orphaned_manifests, only_latest_files, tags)
return self.__group_files_by_name(file_definitions)
def __group_file_defs_by_tag_group(self, file_definitions: List[dao.FileDefinition], include_system_tags=False) \
-> Dict[str, List[dao.FileDefinition]]:
files_per_tag: dict = {}
for f in file_definitions:
tag_group_v1 = f.tags if include_system_tags else f.user_tags
tag_group_v1.sort()
tag_group_key = ';'.join(tag_group_v1)
if not files_per_tag.get(tag_group_key):
files_per_tag[tag_group_key] = []
files_per_tag[tag_group_key].append(f)
return files_per_tag
def _filter_files(self, file_definitions: List[dao.FileDefinition], tags: List[str] = None,
only_latest: bool = True) -> List[dao.FileDefinition]:
filtered_files = file_definitions
if only_latest:
filtered_files = self.__filter_filedefs_by_latest(filtered_files)
# filter by tags
if tags:
new_filtered = []
filter_set = set(tags)
for fd in filtered_files:
tags_set = set(fd.tags)
if filter_set.issubset(tags_set):
new_filtered.append(fd)
filtered_files = new_filtered
return filtered_files
def __group_files_by_name(self, file_definitions: List[dao.FileDefinition]) -> Dict[str, List[dao.FileDefinition]]:
files_per_name: dict = {}
for f in file_definitions:
if not files_per_name.get(f.name):
files_per_name[f.name] = []
files_per_name[f.name].append(f)
return files_per_name
def __filter_filedefs_by_latest(self, file_definitions: List[dao.FileDefinition]) -> List[dao.FileDefinition]:
"""
Get latest file (according to the timestamp) by each filename
Args:
file_definitions:
Returns:
"""
filtered_files = list()
files_per_name = self.__group_files_by_name(file_definitions)
for group in files_per_name:
max_file = None
max_timestamp = utc.localize(datetime(1900, 5, 17))
for f in files_per_name[group]:
creation_date = f.created
# if date not present ignore and add anyway
if creation_date is None or creation_date > max_timestamp:
max_timestamp = creation_date
max_file = f
filtered_files.append(max_file)
return filtered_files
def get_input_files_definitions(self, orphaned_manifests=False,
only_latest_files=True,
tags: Optional[List[str]] = None) -> List[dao.FileDefinition]:
"""
Return dao.FileDefinition objects by scanning the `data/in/files` folder.
The dao.FileDefinition will contain full path of the source file, it's name and manifest.
By default only latest versions of each file are included.
By default, orphaned manifests are skipped, otherwise fails with ValueError.
A filter may be specified to match only some tags. All files containing specified tags will be returned.
See Also: keboola.component.dao.FileDefinition
Args:
orphaned_manifests (bool): If True, manifests without corresponding files are fetched. Otherwise
a ValueError is raised.
only_latest_files (bool): If True, only latest versions of each files are included.
tags (List[str]): optional list of tags. If specified only files with matching tag group will be fetched.
Returns: List[dao.TableDefinition]
"""
in_files = [f for f in glob.glob(self.files_in_path + "/**", recursive=False) if
not f.endswith('.manifest')]
file_defs = list()
for t in in_files:
manifest_path = t + '.manifest'
file_defs.append(dao.FileDefinition.build_from_manifest(manifest_path))
if orphaned_manifests:
files_w_manifest = [t.full_path for t in file_defs]
manifest_files = [f for f in glob.glob(self.tables_in_path + "/**.manifest", recursive=False)
if Path(f).name not in files_w_manifest]
for t in manifest_files:
p = Path(t)
if p.is_dir():
# skip folders that do not have matching manifest
logging.warning(f'Manifest {t} is folder,s skipping!')
continue
file_defs.append(dao.FileDefinition.build_from_manifest(t))
return self._filter_files(file_defs, tags, only_latest_files)
def _create_file_definition(self,
name: str,
storage_stage: str = 'out',
tags: List[str] = None,
is_public: bool = False,
is_permanent: bool = False,
is_encrypted: bool = False,
notify: bool = False) -> dao.FileDefinition:
"""
Helper method for dao.FileDefinition creation along with the "manifest".
It initializes path according to the storage_stage type.
Args:
name (str): Name of the file, e.g. file.jpg.
tags (list):
List of tags that are assigned to this file
is_public: When true, the file URL will be permanent and publicly accessible.
is_permanent: Keeps a file forever. If false, the file will be deleted after default
period of time (e.g.
15 days)
is_encrypted: If true, the file content will be encrypted in the storage.
notify: Notifies project administrators that a file was uploaded.
"""
if storage_stage == 'in':
full_path = os.path.join(self.files_in_path, name)
elif storage_stage == 'out':
full_path = os.path.join(self.files_out_path, name)
else:
raise ValueError(f'Invalid storage_stage value "{storage_stage}". Supported values are: "in" or "out"!')
return dao.FileDefinition(
full_path=full_path,
tags=tags,
is_public=is_public,
is_permanent=is_permanent,
is_encrypted=is_encrypted,
notify=notify)
def create_out_file_definition(self, name: str,
tags: List[str] = None,
is_public: bool = False,
is_permanent: bool = False,
is_encrypted: bool = False,
notify: bool = False) -> dao.FileDefinition:
"""
Helper method for input dao.FileDefinition creation along with the "manifest".
It initializes path in data/files/out/ folder.
Args:
name (str): Name of the file, e.g. file.jpg.
tags (list):
List of tags that are assigned to this file
is_public: When true, the file URL will be permanent and publicly accessible.
is_permanent: Keeps a file forever. If false, the file will be deleted after default
period of time (e.g.
15 days)
is_encrypted: If true, the file content will be encrypted in the storage.
notify: Notifies project administrators that a file was uploaded.
"""
return self._create_file_definition(name=name,
storage_stage='out',
tags=tags,
is_public=is_public,
is_permanent=is_permanent,
is_encrypted=is_encrypted,
notify=notify)
# TODO: refactor the validate config so it's more userfriendly
"""
- Support for nested params?
"""
def validate_configuration_parameters(self, mandatory_params=None):
"""
Validates config parameters based on provided mandatory parameters.
All provided parameters must be present in config to pass.
ex1.:
par1 = 'par1'
par2 = 'par2'
mandatory_params = [par1, par2]
Validation will fail when one of the above parameters is not found
Two levels of nesting:
Parameters can be grouped as arrays par3 = [groupPar1, groupPar2]
=> at least one of the pars has to be present
ex2.
par1 = 'par1'
par2 = 'par2'
par3 = 'par3'
groupPar1 = 'groupPar1'
groupPar2 = 'groupPar2'
group1 = [groupPar1, groupPar2]
group3 = [par3, group1]
mandatory_params = [par1, par2, group1]
Folowing logical expression is evaluated:
Par1 AND Par2 AND (groupPar1 OR groupPar2)
ex3
par1 = 'par1'
par2 = 'par2'
par3 = 'par3'
groupPar1 = 'groupPar1'
groupPar2 = 'groupPar2'
group1 = [groupPar1, groupPar2]
group3 = [par3, group1]
mandatory_params = [par1, par2, group3]
Following logical expression is evaluated:
par1 AND par2 AND (par3 OR (groupPar1 AND groupPar2))
"""
if not mandatory_params:
mandatory_params = []
return self._validate_parameters(self.configuration.parameters, mandatory_params, 'config parameters')
def validate_image_parameters(self, mandatory_params):
"""
Validates image parameters based on provided mandatory parameters.
All provided parameters must be present in config to pass.
ex1.:
par1 = 'par1'
par2 = 'par2'
mandatory_params = [par1, par2]
Validation will fail when one of the above parameters is not found
Two levels of nesting:
Parameters can be grouped as arrays par3 = [groupPar1, groupPar2]
=> at least one of the pars has to be present
ex2.
par1 = 'par1'
par2 = 'par2'
par3 = 'par3'
groupPar1 = 'groupPar1'
groupPar2 = 'groupPar2'
group1 = [groupPar1, groupPar2]
group3 = [par3, group1]
mandatory_params = [par1, par2, group1]
Folowing logical expression is evaluated:
Par1 AND Par2 AND (groupPar1 OR groupPar2)
ex3
par1 = 'par1'
par2 = 'par2'
par3 = 'par3'
groupPar1 = 'groupPar1'
groupPar2 = 'groupPar2'
group1 = [groupPar1, groupPar2]
group3 = [par3, group1]
mandatory_params = [par1, par2, group3]
Following logical expression is evaluated:
par1 AND par2 AND (par3 OR (groupPar1 AND groupPar2))
"""
return self._validate_parameters(self.configuration.image_parameters,
mandatory_params, 'image/stack parameters')
def _validate_parameters(self, parameters, mandatory_params, _type):
"""
Validates provided parameters based on provided mandatory parameters.
All provided parameters must be present in config to pass.
ex1.:
par1 = 'par1'
par2 = 'par2'
mandatory_params = [par1, par2]
Validation will fail when one of the above parameters is not found
Two levels of nesting:
Parameters can be grouped as arrays par3 = [groupPar1, groupPar2] => at least one of the pars has to be
present
ex2.
par1 = 'par1'
par2 = 'par2'
par3 = 'par3'
groupPar1 = 'groupPar1'
groupPar2 = 'groupPar2'
group1 = [groupPar1, groupPar2]
group3 = [par3, group1]
mandatory_params = [par1, par2, group1]
Folowing logical expression is evaluated:
Par1 AND Par2 AND (groupPar1 OR groupPar2)
ex3
par1 = 'par1'
par2 = 'par2'
par3 = 'par3'
groupPar1 = 'groupPar1'
groupPar2 = 'groupPar2'
group1 = [groupPar1, groupPar2]
group3 = [par3, group1]
mandatory_params = [par1, par2, group3]
Following logical expression is evaluated:
par1 AND par2 AND (par3 OR (groupPar1 AND groupPar2))
Raises:
UserException: on missing parameters
"""
missing_fields = []
for par in mandatory_params:
if isinstance(par, list):
missing_fields.extend(self._validate_par_group(par, parameters))
elif parameters.get(par) is None:
missing_fields.append(par)
if missing_fields:
raise UserException(
'Missing mandatory {} fields: [{}] '.format(_type, ', '.join(missing_fields)))
def _validate_par_group(self, par_group, parameters):
missing_fields = []
is_present = False
for par in par_group:
if isinstance(par, list):
missing_subset = self._get_par_missing_fields(par, parameters)
missing_fields.extend(missing_subset)
if not missing_subset:
is_present = True
elif parameters.get(par):
is_present = True
else:
missing_fields.append(par)
if not is_present:
return missing_fields
else:
return []
def _get_par_missing_fields(self, mand_params, parameters):
missing_fields = []
for par in mand_params:
if not parameters.get(par):
missing_fields.append(par)
return missing_fields
# ### PROPERTIES
@property
def configuration(self):
# try to load the configuration
# raises ValueError
return Configuration(self.data_folder_path)
@property
def tables_out_path(self):
return os.path.join(self.data_folder_path, 'out', 'tables')
@property
def tables_in_path(self):
return os.path.join(self.data_folder_path, 'in', 'tables')
@property
def files_out_path(self):
return os.path.join(self.data_folder_path, 'out', 'files')
@property
def files_in_path(self):
return os.path.join(self.data_folder_path, 'in', 'files')
@property
def is_legacy_queue(self) -> bool:
"""
Check if the project is running on legacy queue (v1)
Returns:
"""
features = os.environ.get('KBC_PROJECT_FEATURE_GATES')
is_legacy_queue = True
if not features or 'queuev2' in features:
is_legacy_queue = False
return is_legacy_queue
def write_manifest(self, io_definition: Union[dao.FileDefinition, dao.TableDefinition]):
"""
Write a table manifest from dao.IODefinition. Creates the appropriate manifest file in the proper location.
** Usage:**
```python
from keboola.component import CommonInterface
from keboola.component import dao
ci = CommonInterface()
# build table definition
table_def = ci.create_out_table_definition(name='my_new_table', mytable.csv'
, incremental = True
, table_metadata = tm
))
ci.write_manifest(table_def)
# build file definition
file_def = ci.create_out_file_definition(name='my_file.xml', tags=['tag', 'tag2'])
ci.write_manifest(file_def)
```
Args:
io_definition Union[dao.FileDefinition, dao.TableDefinition]: Initialized dao.IODefinition
object containing manifest.
Returns:
"""
manifest = io_definition.get_manifest_dictionary(legacy_queue=self.is_legacy_queue)
# make dirs if not exist
os.makedirs(os.path.dirname(io_definition.full_path), exist_ok=True)
with open(io_definition.full_path + '.manifest', 'w') as manifest_file:
json.dump(manifest, manifest_file)
def write_manifests(self, io_definitions: List[Union[dao.FileDefinition, dao.TableDefinition]]):
"""
Process all table definition objects and create appropriate manifest files.
Args:
io_definitions:
Returns:
"""
for io_def in io_definitions:
self.write_manifest(io_def)
# ############# DEPRECATED METHODS, TODO: remove
@deprecated(version='1.3.0', reason="You should use write_manifest function")
def write_filedef_manifest(self, file_definition: dao.FileDefinition):
"""
Write a table manifest from dao.FileDefinition. Creates the appropriate manifest file in the proper location.
** Usage:**
```python
from keboola.component import CommonInterface
from keboola.component import dao
ci = CommonInterface()
# build table definition
file_def = ci.create_out_file_definition(name='my_file.xml', tags=['tag', 'tag2'])
ci.write_filedef_manifest(file_def)
```
Args:
file_definition (dao.FileDefinition): Initialized dao.FileDefinition object containing manifest.
Returns:
"""
self.write_manifest(file_definition)
@deprecated(version='1.3.0', reason="You should use write_manifests function")
def write_filedef_manifests(self, file_definitions: List[dao.FileDefinition]):
"""
Process all table definition objects and create appropriate manifest files.
Args:
file_definitions:
Returns:
"""
self.write_manifests(file_definitions)
@deprecated(version='1.3.0', reason="You should use write_manifest function")
def write_tabledef_manifest(self, table_definition: dao.TableDefinition):
"""
Write a table manifest from dao.TableDefinition. Creates the appropriate manifest file in the proper location.
** Usage:**
```python
from keboola.component import CommonInterface
from keboola.component import dao
ci = CommonInterface()
tm = dao.TableMetadata()
tm.add_table_description("My new table")
# build table definition
table_def = ci.create_out_table_definition(name='my_new_table', mytable.csv'
, incremental = True
, table_metadata = tm
))
ci.write_tabledef_manifest(table_def)
```
Args:
table_definition (dao.TableDefinition): Initialized dao.TableDefinition object containing manifest.
Returns:
"""
self.write_manifest(table_definition)
@deprecated(version='1.3.0', reason="You should use write_manifests function")
def write_tabledef_manifests(self, table_definitions: List[dao.TableDefinition]):
"""
Process all table definition objects and create appropriate manifest files.
Args:
table_definitions:
Returns:
"""
self.write_manifests(table_definitions)
# ########## CONFIGURATION
class Configuration:
"""
Class representing configuration file generated and read
by KBC for docker applications
See docs:
https://developers.keboola.com/extend/common-interface/config-file/
"""
def __init__(self, data_folder_path: str):
"""
Args:
data_folder_path (object):
"""
self.config_data = {}
self.data_dir = data_folder_path
try:
with open(os.path.join(data_folder_path, 'config.json'), 'r') \
as config_file:
self.config_data = json.load(config_file)
except (OSError, IOError):
raise ValueError(
f"Configuration file config.json not found, verify that the data directory is correct and that the "
f"config file is present. Dir: "
f"{self.data_dir}"
)
self.parameters = self.config_data.get('parameters', {})
self.image_parameters = self.config_data.get('image_parameters', {})
self.action = self.config_data.get('action', '')
self.workspace_credentials = self.config_data.get('authorization', {}).get('workspace', {})
# ################ PROPERTIES
@property
def oauth_credentials(self) -> dao.OauthCredentials:
"""
Returns subscriptable class OauthCredentials
Returns: OauthCredentials
"""
oauth_credentials = self.config_data.get('authorization', {}).get('oauth_api', {}).get('credentials', {})
credentials = None
if oauth_credentials:
credentials = dao.OauthCredentials(
id=oauth_credentials.get("id", ''),
created=oauth_credentials.get("created", ''),
data=json.loads(oauth_credentials.get("#data", '{}')),
oauthVersion=oauth_credentials.get("oauthVersion", ''),
appKey=oauth_credentials.get("appKey", ''),
appSecret=oauth_credentials.get("#appSecret", '')
)
return credentials
@property
def tables_input_mapping(self) -> List[dao.TableInputMapping]:
"""
List of table [input mappings](https://developers.keboola.com/extend/common-interface/config-file/#tables)
Tables specified in the configuration file.
Returns: List[TableInputMapping]
"""
tables_defs = self.config_data.get('storage', {}).get('input', {}).get('tables', [])
tables = []
for table in tables_defs:
# nested dataclass
table['column_types'] = [dao.build_dataclass_from_dict(dao.TableColumnTypes, coltype) for coltype in
table.get('column_types', [])]
im = dao.build_dataclass_from_dict(dao.TableInputMapping, table)
im.full_path = os.path.normpath(
os.path.join(
self.data_dir,
'in',
'tables',
table['destination']
)
)
tables.append(im)
return tables
@property
def tables_output_mapping(self) -> List[dao.TableOutputMapping]:
"""
List of table [output mappings](https://developers.keboola.com/extend/common-interface/config-file/#tables)
Get tables which are supposed to be returned when the application finishes. (from configuration[
'storage'] section.
Returns: List[TableOutputMapping]
"""
tables_defs = self.config_data.get('storage', {}).get('output', {}).get('tables', [])
tables = []
for table in tables_defs:
om = dao.build_dataclass_from_dict(dao.TableOutputMapping, table)
tables.append(om)
return tables
@property
def files_input_mapping(self) -> List[dao.FileInputMapping]:
"""
List of file [input mappings](https://developers.keboola.com/extend/common-interface/config-file/#files)
Files specified in the configuration file (defined on component's input mapping). (from configuration[
'storage'] section.
Returns: List[FileInputMapping]
"""
defs = self.config_data.get('storage', {}).get('output', {}).get('files', [])
files = []
for file in defs:
om = dao.build_dataclass_from_dict(dao.FileInputMapping, file)
files.append(om)
return files
@property
def files_output_mapping(self) -> List[dao.FileOutputMapping]:
"""
List of file [output mappings](https://developers.keboola.com/extend/common-interface/config-file/#files)
Get files which are supposed to be returned when the application finishes. (from configuration[
'storage'] section.
Returns:
"""
defs = self.config_data.get('storage', {}).get('output', {}).get('files', [])
files = []
for file in defs:
om = dao.build_dataclass_from_dict(dao.FileOutputMapping, file)
files.append(om)
return files
Functions
def init_environment_variables() ‑> EnvironmentVariables
-
Initializes environment variables available in the docker environment https://developers.keboola.com/extend/common-interface/environment/#environment-variables
Returns
dao.EnvironmentVariables:
Expand source code
def init_environment_variables() -> dao.EnvironmentVariables: """ Initializes environment variables available in the docker environment https://developers.keboola.com/extend/common-interface/environment/#environment-variables Returns: dao.EnvironmentVariables: """ return dao.EnvironmentVariables(data_dir=os.environ.get('KBC_DATADIR', None), run_id=os.environ.get('KBC_RUNID', None), project_id=os.environ.get('KBC_PROJECTID', None), stack_id=os.environ.get('KBC_STACKID', None), config_id=os.environ.get('KBC_CONFIGID', None), component_id=os.environ.get('KBC_COMPONENTID', None), config_row_id=os.environ.get('KBC_CONFIGROWID', None), branch_id=os.environ.get('KBC_BRANCHID', None), staging_file_provider=os.environ.get('KBC_STAGING_FILE_PROVIDER', None), project_name=os.environ.get('KBC_PROJECTNAME', None), token_id=os.environ.get('KBC_TOKENID', None), token_desc=os.environ.get('KBC_TOKENDESC', None), token=os.environ.get('KBC_TOKEN', None), url=os.environ.get('KBC_URL', None), real_user=os.environ.get('KBC_REALUSER', None), logger_addr=os.environ.get('KBC_LOGGER_ADDR', None), logger_port=os.environ.get('KBC_LOGGER_PORT', None) )
def register_csv_dialect()
-
Register the KBC CSV dialect
Expand source code
def register_csv_dialect(): """ Register the KBC CSV dialect """ csv.register_dialect('kbc', lineterminator='\n', delimiter=',', quotechar='"')
Classes
class CommonInterface (data_folder_path: str = None, log_level=20, logging_type=None)
-
A class handling standard tasks related to the Keboola Common Interface e.g. config load, validation, component state, I/O handling, I/O metadata and manifest files.
It initializes the environment inject into the Docker container the KBC component runs in and abstracts the tasks related to the Common Interface interaction.
Attributes
data_folder_path (str): Full path to the /data folder Initializes the CommonInterface environment. If the data_folder_path is not specified the folder is established in following order:
- From provided argument if present:
-d
or--data
- From environment variable if present (KBC_DATADIR)
- Defaults to /data/ if none of the above is specified
Args
data_folder_path
:str
- path to a data folder.
log_level
:int
- logging.INFO or logging.DEBUG
logging_type
:str
- optional 'std' or 'gelf', if left empty determined automatically
Expand source code
class CommonInterface: """ A class handling standard tasks related to the [Keboola Common Interface](https://developers.keboola.com/extend/common-interface/) e.g. config load, validation, component state, I/O handling, I/O metadata and manifest files. It initializes the environment inject into the Docker container the KBC component runs in and abstracts the tasks related to the Common Interface interaction. Attributes: data_folder_path (str): Full path to the /data folder """ LOGGING_TYPE_STD = 'std' LOGGING_TYPE_GELF = 'gelf' def __init__(self, data_folder_path: str = None, log_level=logging.INFO, logging_type=None): """ Initializes the CommonInterface environment. If the data_folder_path is not specified the folder is established in following order: - From provided argument if present: `-d` or `--data` - From environment variable if present (KBC_DATADIR) - Defaults to /data/ if none of the above is specified Args: data_folder_path (str): path to a data folder. log_level (int): logging.INFO or logging.DEBUG logging_type (str): optional 'std' or 'gelf', if left empty determined automatically """ self.environment_variables = init_environment_variables() register_csv_dialect() # init logging logging_type_inf = CommonInterface.LOGGING_TYPE_GELF if os.getenv('KBC_LOGGER_ADDR', None) else CommonInterface.LOGGING_TYPE_STD if not logging_type: logging_type = logging_type_inf if logging_type == CommonInterface.LOGGING_TYPE_STD: self.set_default_logger(log_level) elif logging_type == CommonInterface.LOGGING_TYPE_GELF: self.set_gelf_logger(log_level) # init data folder if not data_folder_path: data_folder_path = self._get_data_folder_from_context() # validate if not os.path.exists(data_folder_path) and not os.path.isdir(data_folder_path): raise ValueError( f"The data directory does not exist, verify that the data directory is correct. Dir: " f"{data_folder_path}" ) self.data_folder_path = data_folder_path def _get_data_folder_from_context(self): # try to get from argument parameter # get from parameters argparser = argparse.ArgumentParser() argparser.add_argument( '-d', '--data', dest='data_dir', default='', help='Data directory' ) # unknown is to ignore extra arguments args, unknown = argparser.parse_known_args() data_folder_path = args.data_dir if data_folder_path == '' and self.environment_variables.data_dir: data_folder_path = self.environment_variables.data_dir elif data_folder_path == '': data_folder_path = '/data/' return data_folder_path # ================================= Logging ============================== @staticmethod def set_default_logger(log_level: int = logging.INFO): # noqa: E301 """ Sets default console logger. Args: log_level: logging level, default: 'logging.INFO' Returns: Logger object """ class InfoFilter(logging.Filter): def filter(self, rec): return rec.levelno in (logging.DEBUG, logging.INFO) hd1 = logging.StreamHandler(sys.stdout) hd1.addFilter(InfoFilter()) hd2 = logging.StreamHandler(sys.stderr) hd2.setLevel(logging.WARNING) logging.getLogger().setLevel(log_level) # remove default handler for h in logging.getLogger().handlers: logging.getLogger().removeHandler(h) logging.getLogger().addHandler(hd1) logging.getLogger().addHandler(hd2) logger = logging.getLogger() return logger @staticmethod def set_gelf_logger(log_level: int = logging.INFO, transport_layer='TCP', stdout=False, include_extra_fields=True, **gelf_kwargs): # noqa: E301 """ Sets gelf console logger. Handler for console output is not included by default, for testing in non-gelf environments use stdout=True. Args: log_level: logging level, default: 'logging.INFO' transport_layer: 'TCP' or 'UDP', default:'UDP stdout: if set to True, Stout handler is also included include_extra_fields: Include extra GELF fields in the log messages. e.g. logging.warning('Some warning', extra={"additional_info": "Extra info to be displayed in the detail"} Returns: Logger object """ # remove existing handlers for h in logging.getLogger().handlers: logging.getLogger().removeHandler(h) if stdout: CommonInterface.set_default_logger(log_level) # gelf handler setup gelf_kwargs['include_extra_fields'] = include_extra_fields host = os.getenv('KBC_LOGGER_ADDR', 'localhost') port = os.getenv('KBC_LOGGER_PORT', 12201) if transport_layer == 'TCP': gelf = GelfTcpHandler(host=host, port=port, **gelf_kwargs) elif transport_layer == 'UDP': gelf = GelfUdpHandler(host=host, port=port, **gelf_kwargs) else: raise ValueError(F'Unsupported gelf transport layer: {transport_layer}. Choose TCP or UDP') logging.getLogger().setLevel(log_level) logging.getLogger().addHandler(gelf) logger = logging.getLogger() return logger def get_state_file(self) -> dict: """ Returns dict representation of state file or empty dict if not present Returns: dict: """ logging.info('Loading state file..') state_file_path = os.path.join(self.data_folder_path, 'in', 'state.json') if not os.path.isfile(state_file_path): logging.info('State file not found. First run?') return {} try: with open(state_file_path, 'r') \ as state_file: return json.load(state_file) except (OSError, IOError): raise ValueError( "State file state.json unable to read " ) def write_state_file(self, state_dict: dict): """ Stores [state file](https://developers.keboola.com/extend/common-interface/config-file/#state-file). Args: state_dict (dict): """ if not isinstance(state_dict, dict): raise TypeError('Dictionary expected as a state file datatype!') with open(os.path.join(self.configuration.data_dir, 'out', 'state.json'), 'w+') as state_file: json.dump(state_dict, state_file) def get_input_table_definition_by_name(self, table_name: str) -> dao.TableDefinition: """ Return dao.TableDefinition object by table name. If nor the table itself or it's manifest exists, a ValueError is thrown. The dao.TableDefinition will contain full path of the source file, it's name and manifest (if present). It also provides methods for updating the manifest metadata. Args: table_name: Destination table name (name of .csv file). e.g. input.csv Returns: dao.TableDefinition """ manifest_path = os.path.join( self.tables_in_path, table_name + '.manifest' ) return dao.TableDefinition.build_from_manifest(manifest_path) def get_input_tables_definitions(self, orphaned_manifests=False) -> List[dao.TableDefinition]: """ Return dao.TableDefinition objects by scanning the `data/in/tables` folder. The dao.TableDefinition will contain full path of the source file, it's name and manifest (if present). It also provides methods for updating the manifest metadata. By default, orphaned manifests are skipped. See Also: keboola.component.dao.dao.TableDefinition Args: orphaned_manifests (bool): If True, manifests without corresponding files are fetched. This is useful in in scenarios where [workspaces exchange]( https://developers.keboola.com/extend/common-interface/folders/#exchanging-data-via-workspace) is used e.g. when only manifest files are present in the `data/in/tables` folder. Returns: List[dao.TableDefinition] """ table_files = [f for f in glob.glob(self.tables_in_path + "/**", recursive=False) if not f.endswith('.manifest')] table_defs = list() for t in table_files: p = Path(t) manifest_path = t + '.manifest' if p.is_dir() and not Path(manifest_path).exists(): # skip folders that do not have matching manifest logging.warning(f'Folder {t} does not have matching manifest, it will be ignored!') continue table_defs.append(dao.TableDefinition.build_from_manifest(manifest_path)) if orphaned_manifests: files_w_manifest = [t.name + '.manifest' for t in table_defs] manifest_files = [f for f in glob.glob(self.tables_in_path + "/**.manifest", recursive=False) if Path(f).name not in files_w_manifest] for t in manifest_files: p = Path(t) if p.is_dir(): # skip folders that do not have matching manifest logging.warning(f'Manifest {t} is folder,s skipping!') continue table_defs.append(dao.TableDefinition.build_from_manifest(t)) return table_defs def _create_table_definition(self, name: str, storage_stage: str = 'out', is_sliced: bool = False, destination: str = '', primary_key: List[str] = None, columns: List[str] = None, incremental: bool = None, table_metadata: dao.TableMetadata = None, enclosure: str = '"', delimiter: str = ',', delete_where: dict = None, write_always: bool = False) -> dao.TableDefinition: """ Helper method for dao.TableDefinition creation along with the "manifest". It initializes path according to the storage_stage type. Args: name: Table / file name. e.g. `'my_table.csv'`. storage_stage: default value: 'out' either `'in'` or `'out'`. Determines the path to result file. E.g. `data/tables/in/my_table.csv` is_sliced: True if the full_path points to a folder with sliced tables destination: String name of the table in Storage. primary_key: List with names of columns used for primary key. columns: List of columns for headless CSV files incremental: Set to true to enable incremental loading table_metadata: <.dao.TableMetadata> object containing column and table metadata enclosure: str: CSV enclosure, by default " delimiter: str: CSV delimiter, by default , delete_where: Dict with settings for deleting rows write_always: Bool: If true, the table will be saved to Storage even when the job execution fails. """ if storage_stage == 'in': full_path = os.path.join(self.tables_in_path, name) elif storage_stage == 'out': full_path = os.path.join(self.tables_out_path, name) else: raise ValueError(f'Invalid storage_stage value "{storage_stage}". Supported values are: "in" or "out"!') return dao.TableDefinition(name=name, full_path=full_path, is_sliced=is_sliced, destination=destination, primary_key=primary_key, columns=columns, incremental=incremental, table_metadata=table_metadata, enclosure=enclosure, delimiter=delimiter, delete_where=delete_where, stage=storage_stage, write_always=write_always) def create_in_table_definition(self, name: str, is_sliced: bool = False, destination: str = '', primary_key: List[str] = None, columns: List[str] = None, incremental: bool = None, table_metadata: dao.TableMetadata = None, delete_where: str = None) -> dao.TableDefinition: """ Helper method for input dao.TableDefinition creation along with the "manifest". It initializes path in data/tables/in/ folder. Args: name: Table / file name. e.g. `'my_table.csv'`. is_sliced: True if the full_path points to a folder with sliced tables destination: String name of the table in Storage. primary_key: List with names of columns used for primary key. columns: List of columns for headless CSV files incremental: Set to true to enable incremental loading table_metadata: <.dao.TableMetadata> object containing column and table metadata delete_where: Dict with settings for deleting rows """ return self._create_table_definition(name=name, storage_stage='in', is_sliced=is_sliced, destination=destination, primary_key=primary_key, columns=columns, incremental=incremental, table_metadata=table_metadata, delete_where=delete_where) def create_out_table_definition(self, name: str, is_sliced: bool = False, destination: str = '', primary_key: List[str] = None, columns: List[str] = None, incremental: bool = None, table_metadata: dao.TableMetadata = None, enclosure: str = '"', delimiter: str = ',', delete_where: dict = None, write_always: bool = False) -> dao.TableDefinition: """ Helper method for output dao.TableDefinition creation along with the "manifest". It initializes path in data/tables/out/ folder. Args: name: Table / file name. e.g. `'my_table.csv'`. is_sliced: True if the full_path points to a folder with sliced tables destination: String name of the table in Storage. primary_key: List with names of columns used for primary key. columns: List of columns for headless CSV files incremental: Set to true to enable incremental loading table_metadata: <.dao.TableMetadata> object containing column and table metadata enclosure: str: CSV enclosure, by default " delimiter: str: CSV delimiter, by default , delete_where: Dict with settings for deleting rows write_always: Bool: If true, the table will be saved to Storage even when the job execution fails. """ return self._create_table_definition(name=name, storage_stage='out', is_sliced=is_sliced, destination=destination, primary_key=primary_key, columns=columns, incremental=incremental, table_metadata=table_metadata, enclosure=enclosure, delimiter=delimiter, delete_where=delete_where, write_always=write_always) # # File processing def get_input_file_definitions_grouped_by_tag_group(self, orphaned_manifests=False, only_latest_files=True, tags: List[str] = None, include_system_tags=False) \ -> Dict[str, List[dao.FileDefinition]]: """ Convenience method returning lists of files in dictionary grouped by tag group. (tag group is string built from alphabetically ordered and concatenated tags, e.g. 'tag1;tag2' Args: orphaned_manifests (bool): If True, manifests without corresponding files are fetched. Otherwise a ValueError is raised. only_latest_files (bool): If True, only latest versions of each files are included. tags (List[str]): optional list of tags. If specified only files containing specified tags will be fetched. include_system_tags (bool): optional flag that will use system generated tags in groups as well. See FileDefinition.SYSTEM_TAG_PREFIXES Returns: Dict[str,List[dao.FileDefinition]] indexed by tag group => string built from alphabetically ordered and concatenated tags, e.g. `tag1;tag2` """ file_definitions = self.get_input_files_definitions(orphaned_manifests, only_latest_files, tags) return self.__group_file_defs_by_tag_group(file_definitions, include_system_tags=include_system_tags) def get_input_file_definitions_grouped_by_name(self, orphaned_manifests=False, only_latest_files=True, tags: List[str] = None) -> Dict[str, List[dao.FileDefinition]]: """ Convenience method returning lists of files in dictionary grouped by file name. Args: orphaned_manifests (bool): If True, manifests without corresponding files are fetched. Otherwise a ValueError is raised. only_latest_files (bool): If True, only latest versions of each files are included. tags (List[str]): optional list of tags. If specified only files with matching tag group will be fetched. Returns: """ file_definitions = self.get_input_files_definitions(orphaned_manifests, only_latest_files, tags) return self.__group_files_by_name(file_definitions) def __group_file_defs_by_tag_group(self, file_definitions: List[dao.FileDefinition], include_system_tags=False) \ -> Dict[str, List[dao.FileDefinition]]: files_per_tag: dict = {} for f in file_definitions: tag_group_v1 = f.tags if include_system_tags else f.user_tags tag_group_v1.sort() tag_group_key = ';'.join(tag_group_v1) if not files_per_tag.get(tag_group_key): files_per_tag[tag_group_key] = [] files_per_tag[tag_group_key].append(f) return files_per_tag def _filter_files(self, file_definitions: List[dao.FileDefinition], tags: List[str] = None, only_latest: bool = True) -> List[dao.FileDefinition]: filtered_files = file_definitions if only_latest: filtered_files = self.__filter_filedefs_by_latest(filtered_files) # filter by tags if tags: new_filtered = [] filter_set = set(tags) for fd in filtered_files: tags_set = set(fd.tags) if filter_set.issubset(tags_set): new_filtered.append(fd) filtered_files = new_filtered return filtered_files def __group_files_by_name(self, file_definitions: List[dao.FileDefinition]) -> Dict[str, List[dao.FileDefinition]]: files_per_name: dict = {} for f in file_definitions: if not files_per_name.get(f.name): files_per_name[f.name] = [] files_per_name[f.name].append(f) return files_per_name def __filter_filedefs_by_latest(self, file_definitions: List[dao.FileDefinition]) -> List[dao.FileDefinition]: """ Get latest file (according to the timestamp) by each filename Args: file_definitions: Returns: """ filtered_files = list() files_per_name = self.__group_files_by_name(file_definitions) for group in files_per_name: max_file = None max_timestamp = utc.localize(datetime(1900, 5, 17)) for f in files_per_name[group]: creation_date = f.created # if date not present ignore and add anyway if creation_date is None or creation_date > max_timestamp: max_timestamp = creation_date max_file = f filtered_files.append(max_file) return filtered_files def get_input_files_definitions(self, orphaned_manifests=False, only_latest_files=True, tags: Optional[List[str]] = None) -> List[dao.FileDefinition]: """ Return dao.FileDefinition objects by scanning the `data/in/files` folder. The dao.FileDefinition will contain full path of the source file, it's name and manifest. By default only latest versions of each file are included. By default, orphaned manifests are skipped, otherwise fails with ValueError. A filter may be specified to match only some tags. All files containing specified tags will be returned. See Also: keboola.component.dao.FileDefinition Args: orphaned_manifests (bool): If True, manifests without corresponding files are fetched. Otherwise a ValueError is raised. only_latest_files (bool): If True, only latest versions of each files are included. tags (List[str]): optional list of tags. If specified only files with matching tag group will be fetched. Returns: List[dao.TableDefinition] """ in_files = [f for f in glob.glob(self.files_in_path + "/**", recursive=False) if not f.endswith('.manifest')] file_defs = list() for t in in_files: manifest_path = t + '.manifest' file_defs.append(dao.FileDefinition.build_from_manifest(manifest_path)) if orphaned_manifests: files_w_manifest = [t.full_path for t in file_defs] manifest_files = [f for f in glob.glob(self.tables_in_path + "/**.manifest", recursive=False) if Path(f).name not in files_w_manifest] for t in manifest_files: p = Path(t) if p.is_dir(): # skip folders that do not have matching manifest logging.warning(f'Manifest {t} is folder,s skipping!') continue file_defs.append(dao.FileDefinition.build_from_manifest(t)) return self._filter_files(file_defs, tags, only_latest_files) def _create_file_definition(self, name: str, storage_stage: str = 'out', tags: List[str] = None, is_public: bool = False, is_permanent: bool = False, is_encrypted: bool = False, notify: bool = False) -> dao.FileDefinition: """ Helper method for dao.FileDefinition creation along with the "manifest". It initializes path according to the storage_stage type. Args: name (str): Name of the file, e.g. file.jpg. tags (list): List of tags that are assigned to this file is_public: When true, the file URL will be permanent and publicly accessible. is_permanent: Keeps a file forever. If false, the file will be deleted after default period of time (e.g. 15 days) is_encrypted: If true, the file content will be encrypted in the storage. notify: Notifies project administrators that a file was uploaded. """ if storage_stage == 'in': full_path = os.path.join(self.files_in_path, name) elif storage_stage == 'out': full_path = os.path.join(self.files_out_path, name) else: raise ValueError(f'Invalid storage_stage value "{storage_stage}". Supported values are: "in" or "out"!') return dao.FileDefinition( full_path=full_path, tags=tags, is_public=is_public, is_permanent=is_permanent, is_encrypted=is_encrypted, notify=notify) def create_out_file_definition(self, name: str, tags: List[str] = None, is_public: bool = False, is_permanent: bool = False, is_encrypted: bool = False, notify: bool = False) -> dao.FileDefinition: """ Helper method for input dao.FileDefinition creation along with the "manifest". It initializes path in data/files/out/ folder. Args: name (str): Name of the file, e.g. file.jpg. tags (list): List of tags that are assigned to this file is_public: When true, the file URL will be permanent and publicly accessible. is_permanent: Keeps a file forever. If false, the file will be deleted after default period of time (e.g. 15 days) is_encrypted: If true, the file content will be encrypted in the storage. notify: Notifies project administrators that a file was uploaded. """ return self._create_file_definition(name=name, storage_stage='out', tags=tags, is_public=is_public, is_permanent=is_permanent, is_encrypted=is_encrypted, notify=notify) # TODO: refactor the validate config so it's more userfriendly """ - Support for nested params? """ def validate_configuration_parameters(self, mandatory_params=None): """ Validates config parameters based on provided mandatory parameters. All provided parameters must be present in config to pass. ex1.: par1 = 'par1' par2 = 'par2' mandatory_params = [par1, par2] Validation will fail when one of the above parameters is not found Two levels of nesting: Parameters can be grouped as arrays par3 = [groupPar1, groupPar2] => at least one of the pars has to be present ex2. par1 = 'par1' par2 = 'par2' par3 = 'par3' groupPar1 = 'groupPar1' groupPar2 = 'groupPar2' group1 = [groupPar1, groupPar2] group3 = [par3, group1] mandatory_params = [par1, par2, group1] Folowing logical expression is evaluated: Par1 AND Par2 AND (groupPar1 OR groupPar2) ex3 par1 = 'par1' par2 = 'par2' par3 = 'par3' groupPar1 = 'groupPar1' groupPar2 = 'groupPar2' group1 = [groupPar1, groupPar2] group3 = [par3, group1] mandatory_params = [par1, par2, group3] Following logical expression is evaluated: par1 AND par2 AND (par3 OR (groupPar1 AND groupPar2)) """ if not mandatory_params: mandatory_params = [] return self._validate_parameters(self.configuration.parameters, mandatory_params, 'config parameters') def validate_image_parameters(self, mandatory_params): """ Validates image parameters based on provided mandatory parameters. All provided parameters must be present in config to pass. ex1.: par1 = 'par1' par2 = 'par2' mandatory_params = [par1, par2] Validation will fail when one of the above parameters is not found Two levels of nesting: Parameters can be grouped as arrays par3 = [groupPar1, groupPar2] => at least one of the pars has to be present ex2. par1 = 'par1' par2 = 'par2' par3 = 'par3' groupPar1 = 'groupPar1' groupPar2 = 'groupPar2' group1 = [groupPar1, groupPar2] group3 = [par3, group1] mandatory_params = [par1, par2, group1] Folowing logical expression is evaluated: Par1 AND Par2 AND (groupPar1 OR groupPar2) ex3 par1 = 'par1' par2 = 'par2' par3 = 'par3' groupPar1 = 'groupPar1' groupPar2 = 'groupPar2' group1 = [groupPar1, groupPar2] group3 = [par3, group1] mandatory_params = [par1, par2, group3] Following logical expression is evaluated: par1 AND par2 AND (par3 OR (groupPar1 AND groupPar2)) """ return self._validate_parameters(self.configuration.image_parameters, mandatory_params, 'image/stack parameters') def _validate_parameters(self, parameters, mandatory_params, _type): """ Validates provided parameters based on provided mandatory parameters. All provided parameters must be present in config to pass. ex1.: par1 = 'par1' par2 = 'par2' mandatory_params = [par1, par2] Validation will fail when one of the above parameters is not found Two levels of nesting: Parameters can be grouped as arrays par3 = [groupPar1, groupPar2] => at least one of the pars has to be present ex2. par1 = 'par1' par2 = 'par2' par3 = 'par3' groupPar1 = 'groupPar1' groupPar2 = 'groupPar2' group1 = [groupPar1, groupPar2] group3 = [par3, group1] mandatory_params = [par1, par2, group1] Folowing logical expression is evaluated: Par1 AND Par2 AND (groupPar1 OR groupPar2) ex3 par1 = 'par1' par2 = 'par2' par3 = 'par3' groupPar1 = 'groupPar1' groupPar2 = 'groupPar2' group1 = [groupPar1, groupPar2] group3 = [par3, group1] mandatory_params = [par1, par2, group3] Following logical expression is evaluated: par1 AND par2 AND (par3 OR (groupPar1 AND groupPar2)) Raises: UserException: on missing parameters """ missing_fields = [] for par in mandatory_params: if isinstance(par, list): missing_fields.extend(self._validate_par_group(par, parameters)) elif parameters.get(par) is None: missing_fields.append(par) if missing_fields: raise UserException( 'Missing mandatory {} fields: [{}] '.format(_type, ', '.join(missing_fields))) def _validate_par_group(self, par_group, parameters): missing_fields = [] is_present = False for par in par_group: if isinstance(par, list): missing_subset = self._get_par_missing_fields(par, parameters) missing_fields.extend(missing_subset) if not missing_subset: is_present = True elif parameters.get(par): is_present = True else: missing_fields.append(par) if not is_present: return missing_fields else: return [] def _get_par_missing_fields(self, mand_params, parameters): missing_fields = [] for par in mand_params: if not parameters.get(par): missing_fields.append(par) return missing_fields # ### PROPERTIES @property def configuration(self): # try to load the configuration # raises ValueError return Configuration(self.data_folder_path) @property def tables_out_path(self): return os.path.join(self.data_folder_path, 'out', 'tables') @property def tables_in_path(self): return os.path.join(self.data_folder_path, 'in', 'tables') @property def files_out_path(self): return os.path.join(self.data_folder_path, 'out', 'files') @property def files_in_path(self): return os.path.join(self.data_folder_path, 'in', 'files') @property def is_legacy_queue(self) -> bool: """ Check if the project is running on legacy queue (v1) Returns: """ features = os.environ.get('KBC_PROJECT_FEATURE_GATES') is_legacy_queue = True if not features or 'queuev2' in features: is_legacy_queue = False return is_legacy_queue def write_manifest(self, io_definition: Union[dao.FileDefinition, dao.TableDefinition]): """ Write a table manifest from dao.IODefinition. Creates the appropriate manifest file in the proper location. ** Usage:** ```python from keboola.component import CommonInterface from keboola.component import dao ci = CommonInterface() # build table definition table_def = ci.create_out_table_definition(name='my_new_table', mytable.csv' , incremental = True , table_metadata = tm )) ci.write_manifest(table_def) # build file definition file_def = ci.create_out_file_definition(name='my_file.xml', tags=['tag', 'tag2']) ci.write_manifest(file_def) ``` Args: io_definition Union[dao.FileDefinition, dao.TableDefinition]: Initialized dao.IODefinition object containing manifest. Returns: """ manifest = io_definition.get_manifest_dictionary(legacy_queue=self.is_legacy_queue) # make dirs if not exist os.makedirs(os.path.dirname(io_definition.full_path), exist_ok=True) with open(io_definition.full_path + '.manifest', 'w') as manifest_file: json.dump(manifest, manifest_file) def write_manifests(self, io_definitions: List[Union[dao.FileDefinition, dao.TableDefinition]]): """ Process all table definition objects and create appropriate manifest files. Args: io_definitions: Returns: """ for io_def in io_definitions: self.write_manifest(io_def) # ############# DEPRECATED METHODS, TODO: remove @deprecated(version='1.3.0', reason="You should use write_manifest function") def write_filedef_manifest(self, file_definition: dao.FileDefinition): """ Write a table manifest from dao.FileDefinition. Creates the appropriate manifest file in the proper location. ** Usage:** ```python from keboola.component import CommonInterface from keboola.component import dao ci = CommonInterface() # build table definition file_def = ci.create_out_file_definition(name='my_file.xml', tags=['tag', 'tag2']) ci.write_filedef_manifest(file_def) ``` Args: file_definition (dao.FileDefinition): Initialized dao.FileDefinition object containing manifest. Returns: """ self.write_manifest(file_definition) @deprecated(version='1.3.0', reason="You should use write_manifests function") def write_filedef_manifests(self, file_definitions: List[dao.FileDefinition]): """ Process all table definition objects and create appropriate manifest files. Args: file_definitions: Returns: """ self.write_manifests(file_definitions) @deprecated(version='1.3.0', reason="You should use write_manifest function") def write_tabledef_manifest(self, table_definition: dao.TableDefinition): """ Write a table manifest from dao.TableDefinition. Creates the appropriate manifest file in the proper location. ** Usage:** ```python from keboola.component import CommonInterface from keboola.component import dao ci = CommonInterface() tm = dao.TableMetadata() tm.add_table_description("My new table") # build table definition table_def = ci.create_out_table_definition(name='my_new_table', mytable.csv' , incremental = True , table_metadata = tm )) ci.write_tabledef_manifest(table_def) ``` Args: table_definition (dao.TableDefinition): Initialized dao.TableDefinition object containing manifest. Returns: """ self.write_manifest(table_definition) @deprecated(version='1.3.0', reason="You should use write_manifests function") def write_tabledef_manifests(self, table_definitions: List[dao.TableDefinition]): """ Process all table definition objects and create appropriate manifest files. Args: table_definitions: Returns: """ self.write_manifests(table_definitions)
Subclasses
Class variables
var LOGGING_TYPE_GELF
var LOGGING_TYPE_STD
Static methods
def set_default_logger(log_level: int = 20)
-
Sets default console logger.
Args
log_level
- logging level, default: 'logging.INFO'
Returns
Logger object
Expand source code
@staticmethod def set_default_logger(log_level: int = logging.INFO): # noqa: E301 """ Sets default console logger. Args: log_level: logging level, default: 'logging.INFO' Returns: Logger object """ class InfoFilter(logging.Filter): def filter(self, rec): return rec.levelno in (logging.DEBUG, logging.INFO) hd1 = logging.StreamHandler(sys.stdout) hd1.addFilter(InfoFilter()) hd2 = logging.StreamHandler(sys.stderr) hd2.setLevel(logging.WARNING) logging.getLogger().setLevel(log_level) # remove default handler for h in logging.getLogger().handlers: logging.getLogger().removeHandler(h) logging.getLogger().addHandler(hd1) logging.getLogger().addHandler(hd2) logger = logging.getLogger() return logger
def set_gelf_logger(log_level: int = 20, transport_layer='TCP', stdout=False, include_extra_fields=True, **gelf_kwargs)
-
Sets gelf console logger. Handler for console output is not included by default, for testing in non-gelf environments use stdout=True.
Args
log_level
- logging level, default: 'logging.INFO'
transport_layer
- 'TCP' or 'UDP', default:'UDP
stdout
- if set to True, Stout handler is also included
include_extra_fields: Include extra GELF fields in the log messages. e.g. logging.warning('Some warning', extra={"additional_info": "Extra info to be displayed in the detail"} Returns: Logger object
Expand source code
@staticmethod def set_gelf_logger(log_level: int = logging.INFO, transport_layer='TCP', stdout=False, include_extra_fields=True, **gelf_kwargs): # noqa: E301 """ Sets gelf console logger. Handler for console output is not included by default, for testing in non-gelf environments use stdout=True. Args: log_level: logging level, default: 'logging.INFO' transport_layer: 'TCP' or 'UDP', default:'UDP stdout: if set to True, Stout handler is also included include_extra_fields: Include extra GELF fields in the log messages. e.g. logging.warning('Some warning', extra={"additional_info": "Extra info to be displayed in the detail"} Returns: Logger object """ # remove existing handlers for h in logging.getLogger().handlers: logging.getLogger().removeHandler(h) if stdout: CommonInterface.set_default_logger(log_level) # gelf handler setup gelf_kwargs['include_extra_fields'] = include_extra_fields host = os.getenv('KBC_LOGGER_ADDR', 'localhost') port = os.getenv('KBC_LOGGER_PORT', 12201) if transport_layer == 'TCP': gelf = GelfTcpHandler(host=host, port=port, **gelf_kwargs) elif transport_layer == 'UDP': gelf = GelfUdpHandler(host=host, port=port, **gelf_kwargs) else: raise ValueError(F'Unsupported gelf transport layer: {transport_layer}. Choose TCP or UDP') logging.getLogger().setLevel(log_level) logging.getLogger().addHandler(gelf) logger = logging.getLogger() return logger
Instance variables
var configuration
-
Expand source code
@property def configuration(self): # try to load the configuration # raises ValueError return Configuration(self.data_folder_path)
var files_in_path
-
Expand source code
@property def files_in_path(self): return os.path.join(self.data_folder_path, 'in', 'files')
var files_out_path
-
Expand source code
@property def files_out_path(self): return os.path.join(self.data_folder_path, 'out', 'files')
var is_legacy_queue : bool
-
Check if the project is running on legacy queue (v1) Returns:
Expand source code
@property def is_legacy_queue(self) -> bool: """ Check if the project is running on legacy queue (v1) Returns: """ features = os.environ.get('KBC_PROJECT_FEATURE_GATES') is_legacy_queue = True if not features or 'queuev2' in features: is_legacy_queue = False return is_legacy_queue
var tables_in_path
-
Expand source code
@property def tables_in_path(self): return os.path.join(self.data_folder_path, 'in', 'tables')
var tables_out_path
-
Expand source code
@property def tables_out_path(self): return os.path.join(self.data_folder_path, 'out', 'tables')
Methods
def create_in_table_definition(self, name: str, is_sliced: bool = False, destination: str = '', primary_key: List[str] = None, columns: List[str] = None, incremental: bool = None, table_metadata: TableMetadata = None, delete_where: str = None) ‑> TableDefinition
-
Helper method for input dao.TableDefinition creation along with the "manifest". It initializes path in data/tables/in/ folder.
Args
name
- Table / file name. e.g.
'my_table.csv'
. is_sliced
- True if the full_path points to a folder with sliced tables
destination
- String name of the table in Storage.
primary_key
- List with names of columns used for primary key.
columns
- List of columns for headless CSV files
incremental
- Set to true to enable incremental loading
table_metadata
- <.dao.TableMetadata> object containing column and table metadata
delete_where
- Dict with settings for deleting rows
Expand source code
def create_in_table_definition(self, name: str, is_sliced: bool = False, destination: str = '', primary_key: List[str] = None, columns: List[str] = None, incremental: bool = None, table_metadata: dao.TableMetadata = None, delete_where: str = None) -> dao.TableDefinition: """ Helper method for input dao.TableDefinition creation along with the "manifest". It initializes path in data/tables/in/ folder. Args: name: Table / file name. e.g. `'my_table.csv'`. is_sliced: True if the full_path points to a folder with sliced tables destination: String name of the table in Storage. primary_key: List with names of columns used for primary key. columns: List of columns for headless CSV files incremental: Set to true to enable incremental loading table_metadata: <.dao.TableMetadata> object containing column and table metadata delete_where: Dict with settings for deleting rows """ return self._create_table_definition(name=name, storage_stage='in', is_sliced=is_sliced, destination=destination, primary_key=primary_key, columns=columns, incremental=incremental, table_metadata=table_metadata, delete_where=delete_where)
def create_out_file_definition(self, name: str, tags: List[str] = None, is_public: bool = False, is_permanent: bool = False, is_encrypted: bool = False, notify: bool = False) ‑> FileDefinition
-
Helper method for input dao.FileDefinition creation along with the "manifest". It initializes path in data/files/out/ folder.
Args
name
:str
- Name of the file, e.g. file.jpg.
- tags (list):
- List of tags that are assigned to this file
is_public
- When true, the file URL will be permanent and publicly accessible.
is_permanent
- Keeps a file forever. If false, the file will be deleted after default
- period of time (e.g.
- 15 days)
is_encrypted
- If true, the file content will be encrypted in the storage.
notify
- Notifies project administrators that a file was uploaded.
Expand source code
def create_out_file_definition(self, name: str, tags: List[str] = None, is_public: bool = False, is_permanent: bool = False, is_encrypted: bool = False, notify: bool = False) -> dao.FileDefinition: """ Helper method for input dao.FileDefinition creation along with the "manifest". It initializes path in data/files/out/ folder. Args: name (str): Name of the file, e.g. file.jpg. tags (list): List of tags that are assigned to this file is_public: When true, the file URL will be permanent and publicly accessible. is_permanent: Keeps a file forever. If false, the file will be deleted after default period of time (e.g. 15 days) is_encrypted: If true, the file content will be encrypted in the storage. notify: Notifies project administrators that a file was uploaded. """ return self._create_file_definition(name=name, storage_stage='out', tags=tags, is_public=is_public, is_permanent=is_permanent, is_encrypted=is_encrypted, notify=notify)
def create_out_table_definition(self, name: str, is_sliced: bool = False, destination: str = '', primary_key: List[str] = None, columns: List[str] = None, incremental: bool = None, table_metadata: TableMetadata = None, enclosure: str = '"', delimiter: str = ',', delete_where: dict = None, write_always: bool = False) ‑> TableDefinition
-
Helper method for output dao.TableDefinition creation along with the "manifest". It initializes path in data/tables/out/ folder.
Args
name
- Table / file name. e.g.
'my_table.csv'
. is_sliced
- True if the full_path points to a folder with sliced tables
destination
- String name of the table in Storage.
primary_key
- List with names of columns used for primary key.
columns
- List of columns for headless CSV files
incremental
- Set to true to enable incremental loading
table_metadata
- <.dao.TableMetadata> object containing column and table metadata
enclosure
- str: CSV enclosure, by default "
delimiter
- str: CSV delimiter, by default ,
delete_where
- Dict with settings for deleting rows
write_always
- Bool: If true, the table will be saved to Storage even when the job execution
fails.
Expand source code
def create_out_table_definition(self, name: str, is_sliced: bool = False, destination: str = '', primary_key: List[str] = None, columns: List[str] = None, incremental: bool = None, table_metadata: dao.TableMetadata = None, enclosure: str = '"', delimiter: str = ',', delete_where: dict = None, write_always: bool = False) -> dao.TableDefinition: """ Helper method for output dao.TableDefinition creation along with the "manifest". It initializes path in data/tables/out/ folder. Args: name: Table / file name. e.g. `'my_table.csv'`. is_sliced: True if the full_path points to a folder with sliced tables destination: String name of the table in Storage. primary_key: List with names of columns used for primary key. columns: List of columns for headless CSV files incremental: Set to true to enable incremental loading table_metadata: <.dao.TableMetadata> object containing column and table metadata enclosure: str: CSV enclosure, by default " delimiter: str: CSV delimiter, by default , delete_where: Dict with settings for deleting rows write_always: Bool: If true, the table will be saved to Storage even when the job execution fails. """ return self._create_table_definition(name=name, storage_stage='out', is_sliced=is_sliced, destination=destination, primary_key=primary_key, columns=columns, incremental=incremental, table_metadata=table_metadata, enclosure=enclosure, delimiter=delimiter, delete_where=delete_where, write_always=write_always)
def get_input_file_definitions_grouped_by_name(self, orphaned_manifests=False, only_latest_files=True, tags: List[str] = None) ‑> Dict[str, List[FileDefinition]]
-
Convenience method returning lists of files in dictionary grouped by file name.
Args
orphaned_manifests
:bool
- If True, manifests without corresponding files are fetched. Otherwise a ValueError is raised.
only_latest_files
:bool
- If True, only latest versions of each files are included.
tags
:List[str]
- optional list of tags. If specified only files with matching tag group will be fetched.
Returns:
Expand source code
def get_input_file_definitions_grouped_by_name(self, orphaned_manifests=False, only_latest_files=True, tags: List[str] = None) -> Dict[str, List[dao.FileDefinition]]: """ Convenience method returning lists of files in dictionary grouped by file name. Args: orphaned_manifests (bool): If True, manifests without corresponding files are fetched. Otherwise a ValueError is raised. only_latest_files (bool): If True, only latest versions of each files are included. tags (List[str]): optional list of tags. If specified only files with matching tag group will be fetched. Returns: """ file_definitions = self.get_input_files_definitions(orphaned_manifests, only_latest_files, tags) return self.__group_files_by_name(file_definitions)
def get_input_file_definitions_grouped_by_tag_group(self, orphaned_manifests=False, only_latest_files=True, tags: List[str] = None, include_system_tags=False) ‑> Dict[str, List[FileDefinition]]
-
Convenience method returning lists of files in dictionary grouped by tag group.
(tag group is string built from alphabetically ordered and concatenated tags, e.g. 'tag1;tag2'
Args
orphaned_manifests
:bool
- If True, manifests without corresponding files are fetched. Otherwise a ValueError is raised.
only_latest_files
:bool
- If True, only latest versions of each files are included.
tags
:List[str]
- optional list of tags. If specified only files containing specified tags will be fetched.
include_system_tags
:bool
- optional flag that will use system generated tags in groups as well. See FileDefinition.SYSTEM_TAG_PREFIXES
Returns
Dict[str,List[dao.FileDefinition]] indexed by tag group => string built from alphabetically ordered and concatenated tags, e.g.
tag1;tag2
Expand source code
def get_input_file_definitions_grouped_by_tag_group(self, orphaned_manifests=False, only_latest_files=True, tags: List[str] = None, include_system_tags=False) \ -> Dict[str, List[dao.FileDefinition]]: """ Convenience method returning lists of files in dictionary grouped by tag group. (tag group is string built from alphabetically ordered and concatenated tags, e.g. 'tag1;tag2' Args: orphaned_manifests (bool): If True, manifests without corresponding files are fetched. Otherwise a ValueError is raised. only_latest_files (bool): If True, only latest versions of each files are included. tags (List[str]): optional list of tags. If specified only files containing specified tags will be fetched. include_system_tags (bool): optional flag that will use system generated tags in groups as well. See FileDefinition.SYSTEM_TAG_PREFIXES Returns: Dict[str,List[dao.FileDefinition]] indexed by tag group => string built from alphabetically ordered and concatenated tags, e.g. `tag1;tag2` """ file_definitions = self.get_input_files_definitions(orphaned_manifests, only_latest_files, tags) return self.__group_file_defs_by_tag_group(file_definitions, include_system_tags=include_system_tags)
def get_input_files_definitions(self, orphaned_manifests=False, only_latest_files=True, tags: Optional[List[str]] = None) ‑> List[FileDefinition]
-
Return dao.FileDefinition objects by scanning the
data/in/files
folder.The dao.FileDefinition will contain full path of the source file, it's name and manifest.
By default only latest versions of each file are included.
By default, orphaned manifests are skipped, otherwise fails with ValueError.
A filter may be specified to match only some tags. All files containing specified tags will be returned.
See Also: keboola.component.dao.FileDefinition
Args
orphaned_manifests
:bool
- If True, manifests without corresponding files are fetched. Otherwise
- a ValueError is raised.
only_latest_files
:bool
- If True, only latest versions of each files are included.
tags
:List[str]
- optional list of tags. If specified only files with matching tag group will be fetched.
Returns: List[dao.TableDefinition]
Expand source code
def get_input_files_definitions(self, orphaned_manifests=False, only_latest_files=True, tags: Optional[List[str]] = None) -> List[dao.FileDefinition]: """ Return dao.FileDefinition objects by scanning the `data/in/files` folder. The dao.FileDefinition will contain full path of the source file, it's name and manifest. By default only latest versions of each file are included. By default, orphaned manifests are skipped, otherwise fails with ValueError. A filter may be specified to match only some tags. All files containing specified tags will be returned. See Also: keboola.component.dao.FileDefinition Args: orphaned_manifests (bool): If True, manifests without corresponding files are fetched. Otherwise a ValueError is raised. only_latest_files (bool): If True, only latest versions of each files are included. tags (List[str]): optional list of tags. If specified only files with matching tag group will be fetched. Returns: List[dao.TableDefinition] """ in_files = [f for f in glob.glob(self.files_in_path + "/**", recursive=False) if not f.endswith('.manifest')] file_defs = list() for t in in_files: manifest_path = t + '.manifest' file_defs.append(dao.FileDefinition.build_from_manifest(manifest_path)) if orphaned_manifests: files_w_manifest = [t.full_path for t in file_defs] manifest_files = [f for f in glob.glob(self.tables_in_path + "/**.manifest", recursive=False) if Path(f).name not in files_w_manifest] for t in manifest_files: p = Path(t) if p.is_dir(): # skip folders that do not have matching manifest logging.warning(f'Manifest {t} is folder,s skipping!') continue file_defs.append(dao.FileDefinition.build_from_manifest(t)) return self._filter_files(file_defs, tags, only_latest_files)
def get_input_table_definition_by_name(self, table_name: str) ‑> TableDefinition
-
Return dao.TableDefinition object by table name.
If nor the table itself or it's manifest exists, a ValueError is thrown.
The dao.TableDefinition will contain full path of the source file, it's name and manifest (if present). It also provides methods for updating the manifest metadata.
Args
table_name
- Destination table name (name of .csv file). e.g. input.csv
Returns
dao.TableDefinition
Expand source code
def get_input_table_definition_by_name(self, table_name: str) -> dao.TableDefinition: """ Return dao.TableDefinition object by table name. If nor the table itself or it's manifest exists, a ValueError is thrown. The dao.TableDefinition will contain full path of the source file, it's name and manifest (if present). It also provides methods for updating the manifest metadata. Args: table_name: Destination table name (name of .csv file). e.g. input.csv Returns: dao.TableDefinition """ manifest_path = os.path.join( self.tables_in_path, table_name + '.manifest' ) return dao.TableDefinition.build_from_manifest(manifest_path)
def get_input_tables_definitions(self, orphaned_manifests=False) ‑> List[TableDefinition]
-
Return dao.TableDefinition objects by scanning the
data/in/tables
folder.The dao.TableDefinition will contain full path of the source file, it's name and manifest (if present). It also provides methods for updating the manifest metadata.
By default, orphaned manifests are skipped.
See Also: keboola.component.dao.dao.TableDefinition
Args
orphaned_manifests
:bool
- If True, manifests without corresponding files are fetched. This is useful in
in scenarios where workspaces exchange is used e.g. when only manifest files are present in the
data/in/tables
folder. Returns: List[dao.TableDefinition]Expand source code
def get_input_tables_definitions(self, orphaned_manifests=False) -> List[dao.TableDefinition]: """ Return dao.TableDefinition objects by scanning the `data/in/tables` folder. The dao.TableDefinition will contain full path of the source file, it's name and manifest (if present). It also provides methods for updating the manifest metadata. By default, orphaned manifests are skipped. See Also: keboola.component.dao.dao.TableDefinition Args: orphaned_manifests (bool): If True, manifests without corresponding files are fetched. This is useful in in scenarios where [workspaces exchange]( https://developers.keboola.com/extend/common-interface/folders/#exchanging-data-via-workspace) is used e.g. when only manifest files are present in the `data/in/tables` folder. Returns: List[dao.TableDefinition] """ table_files = [f for f in glob.glob(self.tables_in_path + "/**", recursive=False) if not f.endswith('.manifest')] table_defs = list() for t in table_files: p = Path(t) manifest_path = t + '.manifest' if p.is_dir() and not Path(manifest_path).exists(): # skip folders that do not have matching manifest logging.warning(f'Folder {t} does not have matching manifest, it will be ignored!') continue table_defs.append(dao.TableDefinition.build_from_manifest(manifest_path)) if orphaned_manifests: files_w_manifest = [t.name + '.manifest' for t in table_defs] manifest_files = [f for f in glob.glob(self.tables_in_path + "/**.manifest", recursive=False) if Path(f).name not in files_w_manifest] for t in manifest_files: p = Path(t) if p.is_dir(): # skip folders that do not have matching manifest logging.warning(f'Manifest {t} is folder,s skipping!') continue table_defs.append(dao.TableDefinition.build_from_manifest(t)) return table_defs
def get_state_file(self) ‑> dict
-
Returns dict representation of state file or empty dict if not present
Returns
dict:
Expand source code
def get_state_file(self) -> dict: """ Returns dict representation of state file or empty dict if not present Returns: dict: """ logging.info('Loading state file..') state_file_path = os.path.join(self.data_folder_path, 'in', 'state.json') if not os.path.isfile(state_file_path): logging.info('State file not found. First run?') return {} try: with open(state_file_path, 'r') \ as state_file: return json.load(state_file) except (OSError, IOError): raise ValueError( "State file state.json unable to read " )
def validate_configuration_parameters(self, mandatory_params=None)
-
Validates config parameters based on provided mandatory parameters. All provided parameters must be present in config to pass. ex1.: par1 = 'par1' par2 = 'par2' mandatory_params = [par1, par2] Validation will fail when one of the above parameters is not found
Two levels of nesting: Parameters can be grouped as arrays par3 = [groupPar1, groupPar2] => at least one of the pars has to be present ex2. par1 = 'par1' par2 = 'par2' par3 = 'par3' groupPar1 = 'groupPar1' groupPar2 = 'groupPar2' group1 = [groupPar1, groupPar2] group3 = [par3, group1] mandatory_params = [par1, par2, group1]
Folowing logical expression is evaluated: Par1 AND Par2 AND (groupPar1 OR groupPar2)
ex3 par1 = 'par1' par2 = 'par2' par3 = 'par3' groupPar1 = 'groupPar1' groupPar2 = 'groupPar2' group1 = [groupPar1, groupPar2] group3 = [par3, group1] mandatory_params = [par1, par2, group3]
Following logical expression is evaluated: par1 AND par2 AND (par3 OR (groupPar1 AND groupPar2))
Expand source code
def validate_configuration_parameters(self, mandatory_params=None): """ Validates config parameters based on provided mandatory parameters. All provided parameters must be present in config to pass. ex1.: par1 = 'par1' par2 = 'par2' mandatory_params = [par1, par2] Validation will fail when one of the above parameters is not found Two levels of nesting: Parameters can be grouped as arrays par3 = [groupPar1, groupPar2] => at least one of the pars has to be present ex2. par1 = 'par1' par2 = 'par2' par3 = 'par3' groupPar1 = 'groupPar1' groupPar2 = 'groupPar2' group1 = [groupPar1, groupPar2] group3 = [par3, group1] mandatory_params = [par1, par2, group1] Folowing logical expression is evaluated: Par1 AND Par2 AND (groupPar1 OR groupPar2) ex3 par1 = 'par1' par2 = 'par2' par3 = 'par3' groupPar1 = 'groupPar1' groupPar2 = 'groupPar2' group1 = [groupPar1, groupPar2] group3 = [par3, group1] mandatory_params = [par1, par2, group3] Following logical expression is evaluated: par1 AND par2 AND (par3 OR (groupPar1 AND groupPar2)) """ if not mandatory_params: mandatory_params = [] return self._validate_parameters(self.configuration.parameters, mandatory_params, 'config parameters')
def validate_image_parameters(self, mandatory_params)
-
Validates image parameters based on provided mandatory parameters. All provided parameters must be present in config to pass. ex1.: par1 = 'par1' par2 = 'par2' mandatory_params = [par1, par2] Validation will fail when one of the above parameters is not found
Two levels of nesting: Parameters can be grouped as arrays par3 = [groupPar1, groupPar2] => at least one of the pars has to be present ex2. par1 = 'par1' par2 = 'par2' par3 = 'par3' groupPar1 = 'groupPar1' groupPar2 = 'groupPar2' group1 = [groupPar1, groupPar2] group3 = [par3, group1] mandatory_params = [par1, par2, group1]
Folowing logical expression is evaluated: Par1 AND Par2 AND (groupPar1 OR groupPar2)
ex3 par1 = 'par1' par2 = 'par2' par3 = 'par3' groupPar1 = 'groupPar1' groupPar2 = 'groupPar2' group1 = [groupPar1, groupPar2] group3 = [par3, group1] mandatory_params = [par1, par2, group3]
Following logical expression is evaluated: par1 AND par2 AND (par3 OR (groupPar1 AND groupPar2))
Expand source code
def validate_image_parameters(self, mandatory_params): """ Validates image parameters based on provided mandatory parameters. All provided parameters must be present in config to pass. ex1.: par1 = 'par1' par2 = 'par2' mandatory_params = [par1, par2] Validation will fail when one of the above parameters is not found Two levels of nesting: Parameters can be grouped as arrays par3 = [groupPar1, groupPar2] => at least one of the pars has to be present ex2. par1 = 'par1' par2 = 'par2' par3 = 'par3' groupPar1 = 'groupPar1' groupPar2 = 'groupPar2' group1 = [groupPar1, groupPar2] group3 = [par3, group1] mandatory_params = [par1, par2, group1] Folowing logical expression is evaluated: Par1 AND Par2 AND (groupPar1 OR groupPar2) ex3 par1 = 'par1' par2 = 'par2' par3 = 'par3' groupPar1 = 'groupPar1' groupPar2 = 'groupPar2' group1 = [groupPar1, groupPar2] group3 = [par3, group1] mandatory_params = [par1, par2, group3] Following logical expression is evaluated: par1 AND par2 AND (par3 OR (groupPar1 AND groupPar2)) """ return self._validate_parameters(self.configuration.image_parameters, mandatory_params, 'image/stack parameters')
def write_filedef_manifest(self, file_definition: FileDefinition)
-
Write a table manifest from dao.FileDefinition. Creates the appropriate manifest file in the proper location.
** Usage:**
from keboola.component import CommonInterface from keboola.component import dao ci = CommonInterface() # build table definition file_def = ci.create_out_file_definition(name='my_file.xml', tags=['tag', 'tag2']) ci.write_filedef_manifest(file_def)
Args
file_definition
:dao.FileDefinition
- Initialized dao.FileDefinition object containing manifest.
Returns:
Expand source code
@deprecated(version='1.3.0', reason="You should use write_manifest function") def write_filedef_manifest(self, file_definition: dao.FileDefinition): """ Write a table manifest from dao.FileDefinition. Creates the appropriate manifest file in the proper location. ** Usage:** ```python from keboola.component import CommonInterface from keboola.component import dao ci = CommonInterface() # build table definition file_def = ci.create_out_file_definition(name='my_file.xml', tags=['tag', 'tag2']) ci.write_filedef_manifest(file_def) ``` Args: file_definition (dao.FileDefinition): Initialized dao.FileDefinition object containing manifest. Returns: """ self.write_manifest(file_definition)
def write_filedef_manifests(self, file_definitions: List[FileDefinition])
-
Process all table definition objects and create appropriate manifest files.
Args
file_definitions: Returns:
Expand source code
@deprecated(version='1.3.0', reason="You should use write_manifests function") def write_filedef_manifests(self, file_definitions: List[dao.FileDefinition]): """ Process all table definition objects and create appropriate manifest files. Args: file_definitions: Returns: """ self.write_manifests(file_definitions)
def write_manifest(self, io_definition: Union[FileDefinition, TableDefinition])
-
Write a table manifest from dao.IODefinition. Creates the appropriate manifest file in the proper location.
** Usage:**
from keboola.component import CommonInterface from keboola.component import dao ci = CommonInterface() # build table definition table_def = ci.create_out_table_definition(name='my_new_table', mytable.csv' , incremental = True , table_metadata = tm )) ci.write_manifest(table_def) # build file definition file_def = ci.create_out_file_definition(name='my_file.xml', tags=['tag', 'tag2']) ci.write_manifest(file_def)
Args
io_definition Union[dao.FileDefinition, dao.TableDefinition]: Initialized dao.IODefinition object containing manifest. Returns:
Expand source code
def write_manifest(self, io_definition: Union[dao.FileDefinition, dao.TableDefinition]): """ Write a table manifest from dao.IODefinition. Creates the appropriate manifest file in the proper location. ** Usage:** ```python from keboola.component import CommonInterface from keboola.component import dao ci = CommonInterface() # build table definition table_def = ci.create_out_table_definition(name='my_new_table', mytable.csv' , incremental = True , table_metadata = tm )) ci.write_manifest(table_def) # build file definition file_def = ci.create_out_file_definition(name='my_file.xml', tags=['tag', 'tag2']) ci.write_manifest(file_def) ``` Args: io_definition Union[dao.FileDefinition, dao.TableDefinition]: Initialized dao.IODefinition object containing manifest. Returns: """ manifest = io_definition.get_manifest_dictionary(legacy_queue=self.is_legacy_queue) # make dirs if not exist os.makedirs(os.path.dirname(io_definition.full_path), exist_ok=True) with open(io_definition.full_path + '.manifest', 'w') as manifest_file: json.dump(manifest, manifest_file)
def write_manifests(self, io_definitions: List[Union[FileDefinition, TableDefinition]])
-
Process all table definition objects and create appropriate manifest files.
Args
io_definitions: Returns:
Expand source code
def write_manifests(self, io_definitions: List[Union[dao.FileDefinition, dao.TableDefinition]]): """ Process all table definition objects and create appropriate manifest files. Args: io_definitions: Returns: """ for io_def in io_definitions: self.write_manifest(io_def)
def write_state_file(self, state_dict: dict)
-
Expand source code
def write_state_file(self, state_dict: dict): """ Stores [state file](https://developers.keboola.com/extend/common-interface/config-file/#state-file). Args: state_dict (dict): """ if not isinstance(state_dict, dict): raise TypeError('Dictionary expected as a state file datatype!') with open(os.path.join(self.configuration.data_dir, 'out', 'state.json'), 'w+') as state_file: json.dump(state_dict, state_file)
def write_tabledef_manifest(self, table_definition: TableDefinition)
-
Write a table manifest from dao.TableDefinition. Creates the appropriate manifest file in the proper location.
** Usage:**
from keboola.component import CommonInterface from keboola.component import dao ci = CommonInterface() tm = dao.TableMetadata() tm.add_table_description("My new table") # build table definition table_def = ci.create_out_table_definition(name='my_new_table', mytable.csv' , incremental = True , table_metadata = tm )) ci.write_tabledef_manifest(table_def)
Args
table_definition
:dao.TableDefinition
- Initialized dao.TableDefinition object containing manifest.
Returns:
Expand source code
@deprecated(version='1.3.0', reason="You should use write_manifest function") def write_tabledef_manifest(self, table_definition: dao.TableDefinition): """ Write a table manifest from dao.TableDefinition. Creates the appropriate manifest file in the proper location. ** Usage:** ```python from keboola.component import CommonInterface from keboola.component import dao ci = CommonInterface() tm = dao.TableMetadata() tm.add_table_description("My new table") # build table definition table_def = ci.create_out_table_definition(name='my_new_table', mytable.csv' , incremental = True , table_metadata = tm )) ci.write_tabledef_manifest(table_def) ``` Args: table_definition (dao.TableDefinition): Initialized dao.TableDefinition object containing manifest. Returns: """ self.write_manifest(table_definition)
def write_tabledef_manifests(self, table_definitions: List[TableDefinition])
-
Process all table definition objects and create appropriate manifest files.
Args
table_definitions: Returns:
Expand source code
@deprecated(version='1.3.0', reason="You should use write_manifests function") def write_tabledef_manifests(self, table_definitions: List[dao.TableDefinition]): """ Process all table definition objects and create appropriate manifest files. Args: table_definitions: Returns: """ self.write_manifests(table_definitions)
- From provided argument if present:
class Configuration (data_folder_path: str)
-
Class representing configuration file generated and read by KBC for docker applications See docs: https://developers.keboola.com/extend/common-interface/config-file/
Args
data_folder_path (object):
Expand source code
class Configuration: """ Class representing configuration file generated and read by KBC for docker applications See docs: https://developers.keboola.com/extend/common-interface/config-file/ """ def __init__(self, data_folder_path: str): """ Args: data_folder_path (object): """ self.config_data = {} self.data_dir = data_folder_path try: with open(os.path.join(data_folder_path, 'config.json'), 'r') \ as config_file: self.config_data = json.load(config_file) except (OSError, IOError): raise ValueError( f"Configuration file config.json not found, verify that the data directory is correct and that the " f"config file is present. Dir: " f"{self.data_dir}" ) self.parameters = self.config_data.get('parameters', {}) self.image_parameters = self.config_data.get('image_parameters', {}) self.action = self.config_data.get('action', '') self.workspace_credentials = self.config_data.get('authorization', {}).get('workspace', {}) # ################ PROPERTIES @property def oauth_credentials(self) -> dao.OauthCredentials: """ Returns subscriptable class OauthCredentials Returns: OauthCredentials """ oauth_credentials = self.config_data.get('authorization', {}).get('oauth_api', {}).get('credentials', {}) credentials = None if oauth_credentials: credentials = dao.OauthCredentials( id=oauth_credentials.get("id", ''), created=oauth_credentials.get("created", ''), data=json.loads(oauth_credentials.get("#data", '{}')), oauthVersion=oauth_credentials.get("oauthVersion", ''), appKey=oauth_credentials.get("appKey", ''), appSecret=oauth_credentials.get("#appSecret", '') ) return credentials @property def tables_input_mapping(self) -> List[dao.TableInputMapping]: """ List of table [input mappings](https://developers.keboola.com/extend/common-interface/config-file/#tables) Tables specified in the configuration file. Returns: List[TableInputMapping] """ tables_defs = self.config_data.get('storage', {}).get('input', {}).get('tables', []) tables = [] for table in tables_defs: # nested dataclass table['column_types'] = [dao.build_dataclass_from_dict(dao.TableColumnTypes, coltype) for coltype in table.get('column_types', [])] im = dao.build_dataclass_from_dict(dao.TableInputMapping, table) im.full_path = os.path.normpath( os.path.join( self.data_dir, 'in', 'tables', table['destination'] ) ) tables.append(im) return tables @property def tables_output_mapping(self) -> List[dao.TableOutputMapping]: """ List of table [output mappings](https://developers.keboola.com/extend/common-interface/config-file/#tables) Get tables which are supposed to be returned when the application finishes. (from configuration[ 'storage'] section. Returns: List[TableOutputMapping] """ tables_defs = self.config_data.get('storage', {}).get('output', {}).get('tables', []) tables = [] for table in tables_defs: om = dao.build_dataclass_from_dict(dao.TableOutputMapping, table) tables.append(om) return tables @property def files_input_mapping(self) -> List[dao.FileInputMapping]: """ List of file [input mappings](https://developers.keboola.com/extend/common-interface/config-file/#files) Files specified in the configuration file (defined on component's input mapping). (from configuration[ 'storage'] section. Returns: List[FileInputMapping] """ defs = self.config_data.get('storage', {}).get('output', {}).get('files', []) files = [] for file in defs: om = dao.build_dataclass_from_dict(dao.FileInputMapping, file) files.append(om) return files @property def files_output_mapping(self) -> List[dao.FileOutputMapping]: """ List of file [output mappings](https://developers.keboola.com/extend/common-interface/config-file/#files) Get files which are supposed to be returned when the application finishes. (from configuration[ 'storage'] section. Returns: """ defs = self.config_data.get('storage', {}).get('output', {}).get('files', []) files = [] for file in defs: om = dao.build_dataclass_from_dict(dao.FileOutputMapping, file) files.append(om) return files
Instance variables
var files_input_mapping : List[FileInputMapping]
-
List of file input mappings
Files specified in the configuration file (defined on component's input mapping). (from configuration[ 'storage'] section. Returns: List[FileInputMapping]
Expand source code
@property def files_input_mapping(self) -> List[dao.FileInputMapping]: """ List of file [input mappings](https://developers.keboola.com/extend/common-interface/config-file/#files) Files specified in the configuration file (defined on component's input mapping). (from configuration[ 'storage'] section. Returns: List[FileInputMapping] """ defs = self.config_data.get('storage', {}).get('output', {}).get('files', []) files = [] for file in defs: om = dao.build_dataclass_from_dict(dao.FileInputMapping, file) files.append(om) return files
var files_output_mapping : List[FileOutputMapping]
-
List of file output mappings
Get files which are supposed to be returned when the application finishes. (from configuration[ 'storage'] section. Returns:
Expand source code
@property def files_output_mapping(self) -> List[dao.FileOutputMapping]: """ List of file [output mappings](https://developers.keboola.com/extend/common-interface/config-file/#files) Get files which are supposed to be returned when the application finishes. (from configuration[ 'storage'] section. Returns: """ defs = self.config_data.get('storage', {}).get('output', {}).get('files', []) files = [] for file in defs: om = dao.build_dataclass_from_dict(dao.FileOutputMapping, file) files.append(om) return files
var oauth_credentials : OauthCredentials
-
Returns subscriptable class OauthCredentials
Returns: OauthCredentials
Expand source code
@property def oauth_credentials(self) -> dao.OauthCredentials: """ Returns subscriptable class OauthCredentials Returns: OauthCredentials """ oauth_credentials = self.config_data.get('authorization', {}).get('oauth_api', {}).get('credentials', {}) credentials = None if oauth_credentials: credentials = dao.OauthCredentials( id=oauth_credentials.get("id", ''), created=oauth_credentials.get("created", ''), data=json.loads(oauth_credentials.get("#data", '{}')), oauthVersion=oauth_credentials.get("oauthVersion", ''), appKey=oauth_credentials.get("appKey", ''), appSecret=oauth_credentials.get("#appSecret", '') ) return credentials
var tables_input_mapping : List[TableInputMapping]
-
List of table input mappings
Tables specified in the configuration file.
Returns: List[TableInputMapping]
Expand source code
@property def tables_input_mapping(self) -> List[dao.TableInputMapping]: """ List of table [input mappings](https://developers.keboola.com/extend/common-interface/config-file/#tables) Tables specified in the configuration file. Returns: List[TableInputMapping] """ tables_defs = self.config_data.get('storage', {}).get('input', {}).get('tables', []) tables = [] for table in tables_defs: # nested dataclass table['column_types'] = [dao.build_dataclass_from_dict(dao.TableColumnTypes, coltype) for coltype in table.get('column_types', [])] im = dao.build_dataclass_from_dict(dao.TableInputMapping, table) im.full_path = os.path.normpath( os.path.join( self.data_dir, 'in', 'tables', table['destination'] ) ) tables.append(im) return tables
var tables_output_mapping : List[TableOutputMapping]
-
List of table output mappings
Get tables which are supposed to be returned when the application finishes. (from configuration[ 'storage'] section. Returns: List[TableOutputMapping]
Expand source code
@property def tables_output_mapping(self) -> List[dao.TableOutputMapping]: """ List of table [output mappings](https://developers.keboola.com/extend/common-interface/config-file/#tables) Get tables which are supposed to be returned when the application finishes. (from configuration[ 'storage'] section. Returns: List[TableOutputMapping] """ tables_defs = self.config_data.get('storage', {}).get('output', {}).get('tables', []) tables = [] for table in tables_defs: om = dao.build_dataclass_from_dict(dao.TableOutputMapping, table) tables.append(om) return tables