Module keboola.json_to_csv.analyzer

Classes

class Analyzer (table_mapping: TableMapping | None = None,
root_name: str | None = None)
Expand source code
class Analyzer:
    """
    The Analyzer class is designed to analyze a data structure and build a node hierarchy
    that represents the structure and data types of the objects within it. It can be used
    to create mappings for tabular data and handle complex nested structures.

    The Analyzer works based on a node hierarchy, where each node represents an object or
    a field in the data structure. The hierarchy is built by analyzing the provided data
    using the `analyze_object` method, or by having it be initialized by a predefined table
    mapping by the user.

    The Analyzer supports different data types, such as dictionaries, lists, scalars, and
    nested structures. It can handle various scenarios, including lists of dictionaries,
    lists of scalars, and more. The data types are represented using the `NodeType` enum.

    To use the Analyzer, you can initialize an instance with an optional root name and
    TableMapping. The TableMapping helps to specify the mapping of the data structure to
    tabular data. Then, call the `analyze_object` method to analyze each object within
    the data structure. The `get_mapping_dict_fom_structure` method can be used to get
    the resulting mapping dictionary.

    The class also provides methods to upgrade node data types and get column mappings at
    specific paths in the hierarchy.

    """

    def __init__(self,
                 table_mapping: Optional[TableMapping] = None,
                 root_name: Optional[str] = None) -> None:
        """
        Initialize the Analyzer object.

        Args:
            table_mapping (Optional[TableMapping]): The TableMapping object to use for the initialization of the
                                                    node_hierarchy
            root_name (Optional[str]): The root name for the data structure (optional).
        """

        self.root_name = root_name
        self.node_hierarchy = {"children": {}, "node": Node([], root_name, NodeType.LIST)}

        if table_mapping:
            self.update_with_table_mapping(table_mapping, None)

    def get_mapping_dict_fom_structure(self) -> Dict:
        return self._get_table_mapping_of_node_hierarchy()

    def analyze_object(self, path_to_object: List[Union[Any, str]], name: str, value: Any) -> None:
        """
            Analyzes an object within the data structure and updates the node hierarchy accordingly.

            This method is a recursive function that analyzes each object and its nested objects
            within the data structure. It updates the node hierarchy with the appropriate data types
            and mappings for each object.

            Args:
                path_to_object (List[Union[Any, str]]): The path to the current object within the data structure.
                    This list represents the sequence of keys or indices to reach the current object from the root.
                name (str): The name of the current object. In the case of dictionaries, it represents the key name.
                    In the case of lists, it represents the index.
                value (Any): The value of the current object. It can be a scalar, dictionary, list, or None..
            """

        object_path = self.create_path_to_child_object(path_to_object, name)

        expected_node = self.get_node_dict(object_path)
        if not expected_node:
            real_type = self.get_value_type(path_to_object, value)
            real_node = self.add_node(object_path, real_type)
        else:
            real_type = self.get_value_type(path_to_object, value)
            expected_type = expected_node.get("node").data_type
            if real_type != expected_type and not expected_node.get("node").force_type:
                if self._check_node_type_upgrade(expected_node.get('node'), expected_type, real_type):
                    self._perform_node_type_upgrade(expected_node.get('node'), expected_type, real_type)
            real_node = self.get_node_dict(object_path)

        if real_node["node"].data_type == NodeType.DICT:
            for sub_obj_name, sub_obj_value in value.items():
                self.analyze_object(object_path, sub_obj_name, sub_obj_value)

    def get_value_type(self, path_to_object: List[Union[Any, str]],
                       value: Any) -> NodeType:
        """
           Get the data type of the given value within the data structure.

           This method determines the NodeType of the provided value based on its data characteristics.
           It checks whether the value is a scalar, a dictionary, a list, or None. In the case of lists,
           it further analyzes the elements to determine if it's a list of scalars or a list of dictionaries.

           Args:
               path_to_object (List[Union[Any, str]]): The path to the current object within the data structure.
                   This list represents the sequence of keys or indices to reach the current object from the root.
               value (Any): The value to determine the data type for.
           """
        if is_scalar(value):
            node_type = NodeType.SCALAR
        elif is_dict(value):
            node_type = NodeType.DICT
        elif value is None:
            node_type = NodeType.NULL
        elif is_list(value) and len(value) > 0:
            final_element_type = self.get_value_type_of_array_elements(path_to_object, value)
            if final_element_type == NodeType.DICT:
                node_type = NodeType.LIST_OF_DICTS
            elif final_element_type == NodeType.SCALAR:
                node_type = NodeType.LIST_OF_SCALARS
            else:
                node_type = NodeType.LIST
        elif is_list(value):
            node_type = NodeType.LIST
        else:
            raise JsonParserException(f"Unsupported data in path {path_to_object}")
        return node_type

    def get_value_type_of_array_elements(self, path_to_object: List[Any], element_list: List[Any]) -> NodeType:
        """
        Get the common data type of elements within a list in the data structure.

        This method determines the common NodeType of elements in the provided list. It iterates
        through the elements and calls the `get_value_type` method to determine the data type of
        each element. If all elements have the same data type, it returns that data type; otherwise,
        it raises a JsonParserException indicating that the list has inconsistent element data types.

        Args:
            path_to_object (List[Any]): The path to the current object within the data structure.
                This list represents the sequence of keys or indices to reach the current object from the root.
            element_list (List[Any]): The list containing elements to analyze.

        Returns:
            NodeType: The common data type of elements within the list.
        """
        final_element_type = NodeType.NULL
        for element in element_list:
            element_type = self.get_value_type(path_to_object, element)
            if element_type != final_element_type:
                if final_element_type == NodeType.NULL or element_type == NodeType.NULL:
                    final_element_type = final_element_type if final_element_type != NodeType.NULL else element_type
                else:
                    raise JsonParserException(f"Value types of list {path_to_object} "
                                              f"are inconsistent : {element_list}")
        return final_element_type

    def _perform_node_type_upgrade(self, node, expected_type, real_type):
        """
        Upgrade the data type of a node in the node hierarchy.

        This method upgrades the data type of the provided node to the expected data type based
        on the comparison of the current data type (`real_type`) and the expected data type
        (`expected_type`). The method checks for specific scenarios where upgrading the data type
        is possible, such as converting a NodeType.LIST to NodeType.LIST_OF_SCALARS or
        NodeType.LIST_OF_DICTS.

        Args:
            node: The node object to upgrade in the node hierarchy.
            expected_type (NodeType): The expected data type of the node.
            real_type (NodeType): The current data type of the node.
        """
        if expected_type == NodeType.NULL:
            self.upgrade_node_type(node, real_type)
        elif expected_type == NodeType.LIST and real_type == NodeType.LIST_OF_SCALARS:
            self.upgrade_node_type(node, NodeType.LIST_OF_SCALARS)
        elif expected_type == NodeType.LIST and real_type == NodeType.LIST_OF_DICTS:
            self.upgrade_node_type(node, NodeType.LIST_OF_DICTS)
        elif expected_type == NodeType.LIST and real_type == NodeType.SCALAR:
            self.upgrade_node_type(node, NodeType.LIST_OF_SCALARS)
        elif expected_type == NodeType.LIST and real_type == NodeType.DICT:
            self.upgrade_node_type(node, NodeType.LIST_OF_DICTS)
        elif expected_type == NodeType.DICT and real_type == NodeType.LIST_OF_DICTS:
            self.upgrade_node_type(node, NodeType.LIST_OF_DICTS)
        elif expected_type == NodeType.DICT and real_type == NodeType.LIST:
            self.upgrade_node_type(node, NodeType.LIST_OF_DICTS)
        elif expected_type == NodeType.SCALAR and real_type == NodeType.LIST_OF_SCALARS:
            self.upgrade_node_type(node, NodeType.LIST_OF_SCALARS)
        elif expected_type == NodeType.SCALAR and real_type == NodeType.LIST:
            self.upgrade_node_type(node, NodeType.LIST_OF_SCALARS)

    @staticmethod
    def _check_node_type_upgrade(node, expected_type, real_type):
        """
        Check if upgrading the data type of a node is compatible.

        This method checks whether upgrading the data type of the provided node to the expected
        data type is a compatible operation. It analyzes the current data type (`real_type`) and
        the expected data type (`expected_type`) and determines if the upgrade scenario is supported.

        Args:
            node: The node object to check for data type upgrade compatibility.
            expected_type (NodeType): The expected data type of the node.
            real_type (NodeType): The current data type of the node.

        Returns:
            bool: True if the data type upgrade is compatible; otherwise, raises a JsonParserException.

        """
        varying_node_types = [expected_type, real_type]

        one_type_is_dict = NodeType.DICT in varying_node_types
        one_type_is_scalar = NodeType.SCALAR in varying_node_types
        one_type_is_list_of_scalars = NodeType.LIST_OF_SCALARS in varying_node_types
        one_type_is_list_of_dicts = NodeType.LIST_OF_DICTS in varying_node_types

        if one_type_is_dict and one_type_is_scalar:
            is_compatible = False
        elif one_type_is_dict and one_type_is_list_of_scalars:
            is_compatible = False
        elif one_type_is_scalar and one_type_is_list_of_dicts:
            is_compatible = False
        elif one_type_is_list_of_scalars and one_type_is_list_of_dicts:
            is_compatible = False
        else:
            is_compatible = True

        if not is_compatible:
            raise JsonParserException(f"Incompatible types of {expected_type} and {real_type} "
                                      f"in node_path {node.path}")
        return True

    def update_with_table_mapping(self, table_mapping: TableMapping, parent_path: Optional[List] = None) -> None:
        if not parent_path:
            parent_path = []

        self._process_column_mappings(table_mapping, parent_path)
        self._process_user_data_mappings(table_mapping, parent_path)
        self._process_child_table_mappings(table_mapping, parent_path)

    def get_node_dict(self, path_to_object: List, parent_path: List = None) -> Dict:
        if parent_path:
            path_to_object = parent_path + path_to_object
        if len(path_to_object) > 0:
            node = self.node_hierarchy.get("children").get(path_to_object[0], {})
        else:
            node = self.node_hierarchy
        if len(path_to_object) > 1:
            for path_step in path_to_object[1:]:
                node = node.get("children").get(path_step, {})
        return node or None

    def add_node(self, path_to_object: List[str], value_type: NodeType, force_type: bool = False,
                 destination_name: Optional[str] = None,
                 is_primary_key: bool = False, default_value: Optional[str] = None) -> Dict[str, Node]:
        def recursive_add(node_dict, path, value_node):
            if len(path) == 1:
                node_dict[path[0]] = value_node
            else:
                first_level = path[0]
                if first_level not in node_dict:
                    node_dict[first_level] = {"children": {}}
                recursive_add(node_dict[first_level]["children"], path[1:], value_node)

        parent_name = None
        if len(path_to_object) > 1:
            parent_node = self.get_node_dict(path_to_object[:-1])
            if parent_node.get("node").data_type == NodeType.DICT:
                parent_name = parent_node.get("node").header_name

        new_node = {"node": Node(path_to_object, path_to_object[-1], value_type,
                                 parent_name=parent_name, force_type=force_type, destination_name=destination_name,
                                 is_primary_key=is_primary_key, default_value=default_value),
                    "children": {}}

        # Start the recursive addition
        recursive_add(self.node_hierarchy["children"], path_to_object, new_node)

        return new_node

    def upgrade_node_type_recursive(self, hierarchy, path, new_node_type):
        if len(path) == 1:
            hierarchy["children"][path[0]]["node"].data_type = new_node_type
        else:
            next_level = hierarchy["children"][path[0]]
            self.upgrade_node_type_recursive(next_level, path[1:], new_node_type)

    def upgrade_node_type(self, node, new_node_type):
        node_dict_to_update = self.get_node_dict(node.path)

        node_dict_to_update["node"].data_type = new_node_type

        self.upgrade_node_type_recursive(self.node_hierarchy, node.path, new_node_type)

    @staticmethod
    def create_path_to_child_object(parent_path: List[str], child_name: str) -> List[str]:
        new_path = copy.copy(parent_path)
        new_path.append(child_name)
        return new_path

    @staticmethod
    def is_nested_node_name(node_name: str) -> bool:
        return "." in node_name

    def get_column_types(self, path: List[str]) -> Dict[str, Tuple[NodeType, bool]]:
        nodes = self.get_node_dict(path)
        if not nodes:
            return {}
        headers = {}
        for node in nodes.get("children"):
            decoded_name = nodes.get("children")[node].get("node").data_name

            data_type = nodes.get("children")[node].get("node").data_type
            force_type = nodes.get("children")[node].get("node").force_type
            headers[decoded_name] = (data_type, force_type)
        return headers

    def get_table_name(self, path: List[str]) -> str:
        name = self.root_name
        for p in path:
            name += f"_{p}"
        return name

    def _process_column_mappings(self, table_mapping: TableMapping, parent_path: List[str]) -> None:
        for column_name in table_mapping.column_mappings:
            if self.is_nested_node_name(column_name):
                self._process_nested_column_mapping(column_name, parent_path, table_mapping)
            else:
                self._process_column_mapping(column_name, parent_path, table_mapping)

    def _process_column_mapping(self, column_name: str, parent_path: List[str],
                                table_mapping: TableMapping) -> None:
        path = self.create_path_to_child_object(parent_path, column_name)

        force_type = column_name in table_mapping.force_types
        is_primary_key = column_name in table_mapping.primary_keys
        destination_name = table_mapping.column_mappings.get(column_name)

        self.add_node(path, NodeType.SCALAR, force_type=force_type, destination_name=destination_name,
                      is_primary_key=is_primary_key)

    def _process_nested_column_mapping(self, column_name: str, parent_path: List[Any],
                                       table_mapping: TableMapping) -> None:
        split_name = column_name.split(".")
        paths_added = []

        for i, item in enumerate(split_name):
            paths_added.append(item)
            node = self.get_node_dict(paths_added, parent_path)
            if not node:
                path = parent_path.copy()
                path.extend(paths_added)
                force_type = column_name in table_mapping.force_types
                is_primary_key = column_name in table_mapping.primary_keys
                destination_name = table_mapping.column_mappings.get(column_name)

                node_type = NodeType.DICT if i + 1 != len(split_name) else NodeType.SCALAR
                self.add_node(path, node_type, force_type=force_type, destination_name=destination_name,
                              is_primary_key=is_primary_key)

    def _process_user_data_mappings(self, table_mapping: TableMapping, parent_path: List[str]) -> None:
        for user_data_name, default_value in table_mapping.user_data.items():
            path = parent_path + [user_data_name]
            self.add_node(path, NodeType.SCALAR, destination_name=user_data_name, default_value=default_value)

    def _process_child_table_mappings(self, table_mapping: TableMapping, parent_path: List[str]) -> None:
        for child_table in table_mapping.child_tables:
            path = parent_path.copy()
            path.append(child_table)
            self.add_node(path, NodeType.LIST)
            self.update_with_table_mapping(table_mapping.child_tables.get(child_table), path)

    def _get_table_mapping_of_node_hierarchy(self, node_hierarchy=None) -> Dict:
        if not node_hierarchy:
            node_hierarchy = self.node_hierarchy
        table_name = node_hierarchy.get("node").header_name

        primary_keys = []
        force_types = []
        columns = {}
        child_tables = {}
        for child in node_hierarchy.get("children"):
            (child_columns, child_child_tables, child_primary_keys,
             child_force_types) = self._analyze_child_node_mapping(node_hierarchy, child)
            columns.update(child_columns)
            child_tables.update(child_child_tables)
            force_types.extend(child_force_types)
            primary_keys.extend(child_primary_keys)
        return {"table_name": table_name,
                "column_mappings": columns,
                "primary_keys": primary_keys,
                "force_types": force_types,
                "child_tables": child_tables}

    def _analyze_child_node_mapping(self, node_hierarchy, child_name):
        primary_keys = []
        force_types = []
        columns = {}
        child_tables = {}

        child_node = node_hierarchy.get("children").get(child_name).get("node")
        if child_node.data_type == NodeType.SCALAR:
            columns[child_name] = child_node.header_name
            if child_node.force_type:
                force_types.append(child_name)
            if child_node.is_primary_key:
                primary_keys.append(child_name)

        if child_node.data_type in [NodeType.LIST, NodeType.LIST_OF_SCALARS, NodeType.LIST_OF_DICTS]:
            child_tables[child_name] = self._get_table_mapping_of_node_hierarchy(
                node_hierarchy=node_hierarchy.get("children").get(child_name))

        if child_node.data_type == NodeType.DICT:
            (child_columns,
             child_child_tables,
             child_primary_keys,
             child_force_types) = self._get_table_mapping_of_dict_node(node_hierarchy.get("children").get(child_name))
            columns.update(child_columns)
            child_tables.update(child_child_tables)
            force_types.extend(child_force_types)
            primary_keys.extend(child_primary_keys)

        return columns, child_tables, primary_keys, force_types

    def _get_table_mapping_of_dict_node(self, node_thing):
        dict_node_name = node_thing.get("node").data_name
        primary_keys = []
        force_types = []
        columns = {}
        child_tables = {}
        for child in node_thing.get("children"):
            (child_columns, child_child_tables,
             child_primary_keys,
             child_force_types) = self._analyze_child_node_mapping(node_thing, child)
            columns.update(child_columns)
            child_tables.update(child_child_tables)
            force_types.extend(child_force_types)
            primary_keys.extend(child_primary_keys)

        primary_keys = self.add_prefix_to_list_items(primary_keys, dict_node_name + ".")
        force_types = self.add_prefix_to_list_items(force_types, dict_node_name + ".")
        columns = self.add_prefix_to_dict_keys(columns, dict_node_name + ".")
        child_tables = self.add_prefix_to_dict_keys(child_tables, dict_node_name + ".")
        return columns, child_tables, primary_keys, force_types

    def get_column_mappings_at_path(self, node_path: List[str]) -> Dict[str, str]:
        headers = {}
        node_data = self.get_node_dict(node_path) or {}
        if "node" in node_data:
            node_type = node_data.get("node").data_type
            if node_type == NodeType.SCALAR:
                headers[".".join(node_data.get("node").path)] = node_data.get("node").header_name

        for node_name, data in node_data.get("children").items():
            if data.get("node").data_type == NodeType.DICT:
                ch = self.get_column_mappings_at_path(self.create_path_to_child_object(node_path, node_name))
                headers.update(ch)
            elif data.get("node").data_type == NodeType.LIST:
                continue
            else:
                headers[".".join(data.get("node").path)] = data.get("node").header_name

        return headers

    @staticmethod
    def add_prefix_to_list_items(input_list, prefix):
        return [prefix + item for item in input_list]

    @staticmethod
    def add_prefix_to_dict_keys(input_dict, prefix):
        return {prefix + key: value for key, value in input_dict.items()}

The Analyzer class is designed to analyze a data structure and build a node hierarchy that represents the structure and data types of the objects within it. It can be used to create mappings for tabular data and handle complex nested structures.

The Analyzer works based on a node hierarchy, where each node represents an object or a field in the data structure. The hierarchy is built by analyzing the provided data using the analyze_object method, or by having it be initialized by a predefined table mapping by the user.

The Analyzer supports different data types, such as dictionaries, lists, scalars, and nested structures. It can handle various scenarios, including lists of dictionaries, lists of scalars, and more. The data types are represented using the NodeType enum.

To use the Analyzer, you can initialize an instance with an optional root name and TableMapping. The TableMapping helps to specify the mapping of the data structure to tabular data. Then, call the analyze_object method to analyze each object within the data structure. The get_mapping_dict_fom_structure method can be used to get the resulting mapping dictionary.

The class also provides methods to upgrade node data types and get column mappings at specific paths in the hierarchy.

Initialize the Analyzer object.

Args

table_mapping : Optional[TableMapping]
The TableMapping object to use for the initialization of the node_hierarchy
root_name : Optional[str]
The root name for the data structure (optional).

Static methods

def add_prefix_to_dict_keys(input_dict, prefix)
Expand source code
@staticmethod
def add_prefix_to_dict_keys(input_dict, prefix):
    return {prefix + key: value for key, value in input_dict.items()}
def add_prefix_to_list_items(input_list, prefix)
Expand source code
@staticmethod
def add_prefix_to_list_items(input_list, prefix):
    return [prefix + item for item in input_list]
def create_path_to_child_object(parent_path: List[str], child_name: str) ‑> List[str]
Expand source code
@staticmethod
def create_path_to_child_object(parent_path: List[str], child_name: str) -> List[str]:
    new_path = copy.copy(parent_path)
    new_path.append(child_name)
    return new_path
def is_nested_node_name(node_name: str) ‑> bool
Expand source code
@staticmethod
def is_nested_node_name(node_name: str) -> bool:
    return "." in node_name

Methods

def add_node(self,
path_to_object: List[str],
value_type: NodeType,
force_type: bool = False,
destination_name: str | None = None,
is_primary_key: bool = False,
default_value: str | None = None) ‑> Dict[str, Node]
Expand source code
def add_node(self, path_to_object: List[str], value_type: NodeType, force_type: bool = False,
             destination_name: Optional[str] = None,
             is_primary_key: bool = False, default_value: Optional[str] = None) -> Dict[str, Node]:
    def recursive_add(node_dict, path, value_node):
        if len(path) == 1:
            node_dict[path[0]] = value_node
        else:
            first_level = path[0]
            if first_level not in node_dict:
                node_dict[first_level] = {"children": {}}
            recursive_add(node_dict[first_level]["children"], path[1:], value_node)

    parent_name = None
    if len(path_to_object) > 1:
        parent_node = self.get_node_dict(path_to_object[:-1])
        if parent_node.get("node").data_type == NodeType.DICT:
            parent_name = parent_node.get("node").header_name

    new_node = {"node": Node(path_to_object, path_to_object[-1], value_type,
                             parent_name=parent_name, force_type=force_type, destination_name=destination_name,
                             is_primary_key=is_primary_key, default_value=default_value),
                "children": {}}

    # Start the recursive addition
    recursive_add(self.node_hierarchy["children"], path_to_object, new_node)

    return new_node
def analyze_object(self, path_to_object: List[Any | str], name: str, value: Any) ‑> None
Expand source code
def analyze_object(self, path_to_object: List[Union[Any, str]], name: str, value: Any) -> None:
    """
        Analyzes an object within the data structure and updates the node hierarchy accordingly.

        This method is a recursive function that analyzes each object and its nested objects
        within the data structure. It updates the node hierarchy with the appropriate data types
        and mappings for each object.

        Args:
            path_to_object (List[Union[Any, str]]): The path to the current object within the data structure.
                This list represents the sequence of keys or indices to reach the current object from the root.
            name (str): The name of the current object. In the case of dictionaries, it represents the key name.
                In the case of lists, it represents the index.
            value (Any): The value of the current object. It can be a scalar, dictionary, list, or None..
        """

    object_path = self.create_path_to_child_object(path_to_object, name)

    expected_node = self.get_node_dict(object_path)
    if not expected_node:
        real_type = self.get_value_type(path_to_object, value)
        real_node = self.add_node(object_path, real_type)
    else:
        real_type = self.get_value_type(path_to_object, value)
        expected_type = expected_node.get("node").data_type
        if real_type != expected_type and not expected_node.get("node").force_type:
            if self._check_node_type_upgrade(expected_node.get('node'), expected_type, real_type):
                self._perform_node_type_upgrade(expected_node.get('node'), expected_type, real_type)
        real_node = self.get_node_dict(object_path)

    if real_node["node"].data_type == NodeType.DICT:
        for sub_obj_name, sub_obj_value in value.items():
            self.analyze_object(object_path, sub_obj_name, sub_obj_value)

Analyzes an object within the data structure and updates the node hierarchy accordingly.

This method is a recursive function that analyzes each object and its nested objects within the data structure. It updates the node hierarchy with the appropriate data types and mappings for each object.

Args

path_to_object : List[Union[Any, str]]
The path to the current object within the data structure. This list represents the sequence of keys or indices to reach the current object from the root.
name : str
The name of the current object. In the case of dictionaries, it represents the key name. In the case of lists, it represents the index.
value : Any
The value of the current object. It can be a scalar, dictionary, list, or None..
def get_column_mappings_at_path(self, node_path: List[str]) ‑> Dict[str, str]
Expand source code
def get_column_mappings_at_path(self, node_path: List[str]) -> Dict[str, str]:
    headers = {}
    node_data = self.get_node_dict(node_path) or {}
    if "node" in node_data:
        node_type = node_data.get("node").data_type
        if node_type == NodeType.SCALAR:
            headers[".".join(node_data.get("node").path)] = node_data.get("node").header_name

    for node_name, data in node_data.get("children").items():
        if data.get("node").data_type == NodeType.DICT:
            ch = self.get_column_mappings_at_path(self.create_path_to_child_object(node_path, node_name))
            headers.update(ch)
        elif data.get("node").data_type == NodeType.LIST:
            continue
        else:
            headers[".".join(data.get("node").path)] = data.get("node").header_name

    return headers
def get_column_types(self, path: List[str]) ‑> Dict[str, Tuple[NodeType, bool]]
Expand source code
def get_column_types(self, path: List[str]) -> Dict[str, Tuple[NodeType, bool]]:
    nodes = self.get_node_dict(path)
    if not nodes:
        return {}
    headers = {}
    for node in nodes.get("children"):
        decoded_name = nodes.get("children")[node].get("node").data_name

        data_type = nodes.get("children")[node].get("node").data_type
        force_type = nodes.get("children")[node].get("node").force_type
        headers[decoded_name] = (data_type, force_type)
    return headers
def get_mapping_dict_fom_structure(self) ‑> Dict
Expand source code
def get_mapping_dict_fom_structure(self) -> Dict:
    return self._get_table_mapping_of_node_hierarchy()
def get_node_dict(self, path_to_object: List, parent_path: List = None) ‑> Dict
Expand source code
def get_node_dict(self, path_to_object: List, parent_path: List = None) -> Dict:
    if parent_path:
        path_to_object = parent_path + path_to_object
    if len(path_to_object) > 0:
        node = self.node_hierarchy.get("children").get(path_to_object[0], {})
    else:
        node = self.node_hierarchy
    if len(path_to_object) > 1:
        for path_step in path_to_object[1:]:
            node = node.get("children").get(path_step, {})
    return node or None
def get_table_name(self, path: List[str]) ‑> str
Expand source code
def get_table_name(self, path: List[str]) -> str:
    name = self.root_name
    for p in path:
        name += f"_{p}"
    return name
def get_value_type(self, path_to_object: List[Any | str], value: Any) ‑> NodeType
Expand source code
def get_value_type(self, path_to_object: List[Union[Any, str]],
                   value: Any) -> NodeType:
    """
       Get the data type of the given value within the data structure.

       This method determines the NodeType of the provided value based on its data characteristics.
       It checks whether the value is a scalar, a dictionary, a list, or None. In the case of lists,
       it further analyzes the elements to determine if it's a list of scalars or a list of dictionaries.

       Args:
           path_to_object (List[Union[Any, str]]): The path to the current object within the data structure.
               This list represents the sequence of keys or indices to reach the current object from the root.
           value (Any): The value to determine the data type for.
       """
    if is_scalar(value):
        node_type = NodeType.SCALAR
    elif is_dict(value):
        node_type = NodeType.DICT
    elif value is None:
        node_type = NodeType.NULL
    elif is_list(value) and len(value) > 0:
        final_element_type = self.get_value_type_of_array_elements(path_to_object, value)
        if final_element_type == NodeType.DICT:
            node_type = NodeType.LIST_OF_DICTS
        elif final_element_type == NodeType.SCALAR:
            node_type = NodeType.LIST_OF_SCALARS
        else:
            node_type = NodeType.LIST
    elif is_list(value):
        node_type = NodeType.LIST
    else:
        raise JsonParserException(f"Unsupported data in path {path_to_object}")
    return node_type

Get the data type of the given value within the data structure.

This method determines the NodeType of the provided value based on its data characteristics. It checks whether the value is a scalar, a dictionary, a list, or None. In the case of lists, it further analyzes the elements to determine if it's a list of scalars or a list of dictionaries.

Args

path_to_object : List[Union[Any, str]]
The path to the current object within the data structure. This list represents the sequence of keys or indices to reach the current object from the root.
value : Any
The value to determine the data type for.
def get_value_type_of_array_elements(self, path_to_object: List[Any], element_list: List[Any]) ‑> NodeType
Expand source code
def get_value_type_of_array_elements(self, path_to_object: List[Any], element_list: List[Any]) -> NodeType:
    """
    Get the common data type of elements within a list in the data structure.

    This method determines the common NodeType of elements in the provided list. It iterates
    through the elements and calls the `get_value_type` method to determine the data type of
    each element. If all elements have the same data type, it returns that data type; otherwise,
    it raises a JsonParserException indicating that the list has inconsistent element data types.

    Args:
        path_to_object (List[Any]): The path to the current object within the data structure.
            This list represents the sequence of keys or indices to reach the current object from the root.
        element_list (List[Any]): The list containing elements to analyze.

    Returns:
        NodeType: The common data type of elements within the list.
    """
    final_element_type = NodeType.NULL
    for element in element_list:
        element_type = self.get_value_type(path_to_object, element)
        if element_type != final_element_type:
            if final_element_type == NodeType.NULL or element_type == NodeType.NULL:
                final_element_type = final_element_type if final_element_type != NodeType.NULL else element_type
            else:
                raise JsonParserException(f"Value types of list {path_to_object} "
                                          f"are inconsistent : {element_list}")
    return final_element_type

Get the common data type of elements within a list in the data structure.

This method determines the common NodeType of elements in the provided list. It iterates through the elements and calls the get_value_type method to determine the data type of each element. If all elements have the same data type, it returns that data type; otherwise, it raises a JsonParserException indicating that the list has inconsistent element data types.

Args

path_to_object : List[Any]
The path to the current object within the data structure. This list represents the sequence of keys or indices to reach the current object from the root.
element_list : List[Any]
The list containing elements to analyze.

Returns

NodeType
The common data type of elements within the list.
def update_with_table_mapping(self,
table_mapping: TableMapping,
parent_path: List | None = None) ‑> None
Expand source code
def update_with_table_mapping(self, table_mapping: TableMapping, parent_path: Optional[List] = None) -> None:
    if not parent_path:
        parent_path = []

    self._process_column_mappings(table_mapping, parent_path)
    self._process_user_data_mappings(table_mapping, parent_path)
    self._process_child_table_mappings(table_mapping, parent_path)
def upgrade_node_type(self, node, new_node_type)
Expand source code
def upgrade_node_type(self, node, new_node_type):
    node_dict_to_update = self.get_node_dict(node.path)

    node_dict_to_update["node"].data_type = new_node_type

    self.upgrade_node_type_recursive(self.node_hierarchy, node.path, new_node_type)
def upgrade_node_type_recursive(self, hierarchy, path, new_node_type)
Expand source code
def upgrade_node_type_recursive(self, hierarchy, path, new_node_type):
    if len(path) == 1:
        hierarchy["children"][path[0]]["node"].data_type = new_node_type
    else:
        next_level = hierarchy["children"][path[0]]
        self.upgrade_node_type_recursive(next_level, path[1:], new_node_type)