Source code for forayer.input_output.from_to_gradoop

"""Write Gradoop file format and read Gradoop file format."""
import csv
import os
from collections import defaultdict, namedtuple
from typing import Dict, List, Tuple, Union

from forayer.knowledge_graph import KG
from forayer.utils.dict_help import nested_ddict, nested_ddict2dict

TYPE_CONVERSION = {
    "string": str,
    "long": int,
    "int": int,
    "float": float,
    "double": float,
    "boolean": lambda x: x == "true",
}

INV_TYPES = {
    str: "string",
    int: "int",
    float: "float",
    bool: "boolean",
}

VertexLine = namedtuple("VertexLine", ["id", "graph_ids", "type", "props"])
EdgeLine = namedtuple(
    "EdgeLine", ["id", "graph_ids", "source_id", "target_id", "type", "props"]
)


[docs]def int_to_gradoop_id(value: int) -> str: """Casts to Gradoop id, which are 12 byte hexadecimal strings. Parameters ---------- value : int Value to cast Returns ------- str 12 byte hexadecimal string (without leading '0x'). """ # see https://stackoverflow.com/a/12638477 # first two characters are leading 0x return f"{value:#0{26}x}"[2:]
[docs]def is_gradoop_id(value) -> bool: """Check if value is a valid Gradoop id. Gradoop ids are 12 byte hexadecimal strings Parameters ---------- value Value to check Returns ------- bool True if is valid Gradoop id """ if isinstance(value, str) and len(value) == 24: try: int(value, 16) return True except ValueError: return False return False
def _load_metadata(path: str) -> Dict[str, Dict[str, List[Tuple[str, str]]]]: metadata: Dict[str, Dict] = { "g": defaultdict(list), "v": defaultdict(list), "e": defaultdict(list), } with open(path, "r") as in_file: reader = csv.reader(in_file, delimiter=";") for row in reader: if len(row) == 2: gve, label = row properties = [] else: gve, label, keys = row properties = [tuple(p.split(":")) for p in keys.split(",")] if gve not in ["g", "v", "e"]: raise ValueError( f"Unknown node type specifier '{gve}', only 'g','v' or 'e' allowed" ) metadata[gve][label] = properties # cast to normal dict return { "g": dict(metadata["g"]), "v": dict(metadata["v"]), "e": dict(metadata["e"]), } def _prop_creation(element_type, label, metadata, properties, is_edge=False): props = {} if is_edge else {"_label": label} for prop_name_type, prop_value in zip( metadata[element_type][label], properties.split("|") ): if len(prop_name_type) == 1: continue prop_name, prop_type = prop_name_type if prop_value != "": props[prop_name] = TYPE_CONVERSION[prop_type](prop_value) return props def _load_graphs(path: str, metadata: Dict[str, Dict[str, List[Tuple[str, str]]]]): graphs = {} with open(path, "r") as in_file: reader = csv.reader(in_file, delimiter=";") for row in reader: g_id, label, properties = row graphs[g_id] = _prop_creation("g", label, metadata, properties) return graphs def _graph_containment(graphs: str): return graphs[1:-1].split(",") def _load_vertices(path: str, metadata: Dict[str, Dict[str, List[Tuple[str, str]]]]): graph_vertices = nested_ddict() with open(path, "r") as in_file: reader = csv.reader(in_file, delimiter=";") for row in reader: v_id, graphs, label, properties = row v_props = _prop_creation("v", label, metadata, properties) for g in _graph_containment(graphs): graph_vertices[g][v_id] = v_props return nested_ddict2dict(graph_vertices) def _load_edges(path: str, metadata: Dict[str, Dict[str, List[Tuple[str, str]]]]): graph_edges = nested_ddict() with open(path, "r") as in_file: reader = csv.reader(in_file, delimiter=";") for row in reader: e_id, graphs, source, target, label, properties = row e_props = _prop_creation("e", label, metadata, properties, is_edge=True) for g in _graph_containment(graphs): graph_edges[g][source][target][label] = e_props return nested_ddict2dict(graph_edges)
[docs]def load_from_csv_datasource( folder_path: str, graph_name_property: str = None ) -> Dict[str, KG]: """Load Gradoop graph from csv datasource. Parameters ---------- folder_path : str Path for folder that contains graph. graph_name_property : str Name of graph property that will be used to name graphs. If None use graph id. Returns ------- Dict[str,KG] Dictionary of knowledge graphs. """ graphs_csv_path = os.path.join(folder_path, "graphs.csv") vertices_csv_path = os.path.join(folder_path, "vertices.csv") edges_csv_path = os.path.join(folder_path, "edges.csv") metadata_csv_path = os.path.join(folder_path, "metadata.csv") metadata = _load_metadata(metadata_csv_path) graphs = _load_graphs(graphs_csv_path, metadata) vertices = _load_vertices(vertices_csv_path, metadata) edges = _load_edges(edges_csv_path, metadata) if graph_name_property is not None: return { g: KG( entities=vertices[g], rel=edges[g], name=graphs[g][graph_name_property], ) for g in graphs.keys() } else: return { g: KG(entities=vertices[g], rel=edges[g], name=g) for g in graphs.keys() }
def _gather_edge_metadata(kgs: Dict[str, KG], attribute_type_mapping: Dict = None): e_metadata = nested_ddict() for kg in kgs.values(): for _, rel_dict in kg.rel.items(): for _, rel_prop_dict in rel_dict.items(): if isinstance(rel_prop_dict, str): # in this case # relation does not have attributes # and simply has the relation name # which we use as edge type for gradoop e_metadata[rel_prop_dict] = {} continue for e_type, inner_prop_dict in rel_prop_dict.items(): for e_prop_name, e_prop_value in inner_prop_dict.items(): if ( attribute_type_mapping is not None and e_type in attribute_type_mapping and e_prop_name in attribute_type_mapping[e_type] ): e_metadata[e_type][e_prop_name] = attribute_type_mapping[ e_type ][e_prop_name] elif e_metadata[e_type][e_prop_name] != {} and e_metadata[ e_type ][e_prop_name] != type(e_prop_value): raise ValueError( f"Inconsistent typing for {e_prop_name} in relation" f" type {e_type}" ) else: e_metadata[e_type][e_prop_name] = type(e_prop_value) return nested_ddict2dict(e_metadata) def _gather_vertex_metadata( kgs: Dict[str, KG], label_attr: str = "_label", attribute_type_mapping: Dict = None, vertex_id_attr_name: str = None, ): v_metadata = nested_ddict() for kg in kgs.values(): for e_id, e_attr_dict in kg.entities.items(): if label_attr not in e_attr_dict: raise ValueError( f"Entity {e_id} does not contain the required label attribute" f" '{label_attr}'" ) else: cur_label = e_attr_dict[label_attr] for attr_name, attr_val in e_attr_dict.items(): if attr_name == label_attr: continue elif ( attribute_type_mapping is not None and cur_label in attribute_type_mapping and attr_name in attribute_type_mapping[cur_label] ): v_metadata[cur_label][attr_name] = attribute_type_mapping[ cur_label ][attr_name] elif v_metadata[cur_label][attr_name] != {} and v_metadata[cur_label][ attr_name ] != type(attr_val): raise ValueError( f"Inconsistent typing for {attr_name} in Entity type" f" {cur_label}" ) else: v_metadata[cur_label][attr_name] = type(attr_val) if vertex_id_attr_name is not None: v_metadata[cur_label][vertex_id_attr_name] = str return nested_ddict2dict(v_metadata) def _fix_metadata_order(metadata: Dict): fixed_metadata = {} for element_type, ele_meta in metadata.items(): v_out = {} for inner_type, attr_dict in ele_meta.items(): inner_list: List[List] = [[], []] # name, type for a_name, a_type in attr_dict.items(): inner_list[0].append(a_name) inner_list[1].append(a_type) v_out[inner_type] = inner_list fixed_metadata[element_type] = v_out return fixed_metadata def _create_metadata( kgs: Dict[str, KG], label_attr: str = "_label", attribute_type_mapping: Dict = None, vertex_id_attr_name: str = None, graph_name_as_property: str = None, ): edge_type_mapping = None vertex_type_mapping = None if attribute_type_mapping is not None: edge_type_mapping = ( None if "e" not in attribute_type_mapping else attribute_type_mapping["e"] ) vertex_type_mapping = ( None if "v" not in attribute_type_mapping else attribute_type_mapping["v"] ) edge_metadata = _gather_edge_metadata(kgs, attribute_type_mapping=edge_type_mapping) vertex_metadata = _gather_vertex_metadata( kgs, label_attr=label_attr, attribute_type_mapping=vertex_type_mapping, vertex_id_attr_name=vertex_id_attr_name, ) graph_metadata = {} for i, k in enumerate(kgs.values(), start=1): props = {} if graph_name_as_property is None else {graph_name_as_property: str} if k.name is None: graph_metadata[f"graph{i}"] = props else: graph_metadata[k.name] = props return _fix_metadata_order( {"g": graph_metadata, "e": edge_metadata, "v": vertex_metadata} ) def _create_vertex_lines( kgs: Dict[str, KG], label_attr: str, vertex_metadata: Dict, vertex_id_attr_name: str ): v_dict: Dict = {} vid_to_gid = {} for k_name, kg in kgs.items(): for e_id, e_attr_dict in kg.entities.items(): cur_label = e_attr_dict[label_attr] prop_line = [] for attr_name, exp_type in zip(*vertex_metadata[cur_label]): if attr_name == vertex_id_attr_name: attr_value = e_id else: attr_value = e_attr_dict.get(attr_name, "") if attr_value != "": if exp_type == bool: attr_value = "true" if attr_value else "false" elif not isinstance(exp_type, str): # custom type attr_value = exp_type(attr_value) prop_line.append(str(attr_value)) prop_string = "|".join(prop_line) if e_id in v_dict: if v_dict[e_id].props != prop_string: raise ValueError( f"Entity {e_id} has inconsistent representation across" f" graphs:{prop_string}\n and\n {v_dict[e_id][3]}" ) else: v_dict[e_id].graph_ids.append(k_name) else: if not is_gradoop_id(e_id): grad_id = int_to_gradoop_id(len(v_dict)) vid_to_gid[e_id] = grad_id else: grad_id = e_id v_dict[e_id] = VertexLine(grad_id, [k_name], cur_label, prop_string) return list(v_dict.values()), vid_to_gid def _create_edge_lines(kgs: Dict[str, KG], edge_metadata: Dict, vid_to_gid: Dict): e_dict: Dict = {} for k_name, kg in kgs.items(): for source_id, target_rel_dict in kg.rel.items(): for target_id, rel_dict in target_rel_dict.items(): if isinstance(rel_dict, str): # in this case # relation does not have attributes # and simply has the relation name # which we use as edge type for gradoop rel_dict = {rel_dict: {}} for cur_label, prop_dict in rel_dict.items(): prop_line = [] for attr_name, exp_type in zip(*edge_metadata[cur_label]): attr_value = prop_dict.get(attr_name, "") if attr_value != "": if exp_type == bool: attr_value = "true" if attr_value else "false" elif not isinstance(exp_type, str): # custom type attr_value = exp_type(attr_value) prop_line.append(str(attr_value)) tmp_id = str(source_id) + str(target_id) + str(cur_label) prop_string = "|".join(prop_line) if tmp_id in e_dict: e_dict[tmp_id].graph_ids.append(k_name) else: edge_id = int_to_gradoop_id(len(e_dict)) if vid_to_gid is not None: source_id = vid_to_gid.get(source_id, source_id) target_id = vid_to_gid.get(target_id, target_id) e_dict[tmp_id] = EdgeLine( edge_id, [k_name], source_id, target_id, cur_label, prop_string, ) return list(e_dict.values()) def _create_graph_lines( kgs: Dict[str, KG], graph_metadata: Dict, default_type="graph", graph_name_as_property: str = None, ): g_lines = [] for g_id, kg in kgs.items(): if kg.name is None: g_lines.append((g_id, default_type, "")) else: metadata = graph_metadata[kg.name] # TODO graphs cannot really have attributes yet props = [] for m_att in metadata[0]: if ( graph_name_as_property is not None and m_att == graph_name_as_property ): props.append(kg.name) else: props.append("") prop_string = "|".join(props) g_lines.append((g_id, kg.name, prop_string)) return g_lines def _create_metadata_lines(metadata): m_lines = [] for ele_type, ele_dict in metadata.items(): for inner_type, prop_list in ele_dict.items(): props_list = [] for prop_name, type_class in zip(*prop_list): if isinstance(type_class, str): props_list.append(f"{prop_name}:{type_class}") else: props_list.append(f"{prop_name}:{INV_TYPES[type_class]}") props = ",".join(props_list) m_lines.append((ele_type, inner_type, props)) return m_lines def _kgs_dict_to_gradoop_id(kgs: Dict[str, KG]) -> Dict: return { int_to_gradoop_id(i): kg_name_value[1] for i, kg_name_value in enumerate(kgs.items()) } def _write_lines(lines, out_path): with open(out_path, "w") as out_file: for line in lines: if isinstance(line, (VertexLine, EdgeLine)): line = list(line) line[1] = "[" + ",".join(line[1]) + "]" out_file.write(";".join([str(ele) for ele in line]) + "\n")
[docs]def write_to_csv_datasource( kgs: Union[KG, Dict[str, KG]], out_path: str, label_attr: str = "_label", attribute_type_mapping: Dict = None, vertex_id_attr_name: str = "_forayer_id", default_graph_type: str = "graph", graph_name_as_property: str = None, overwrite: bool = False, ): """Write knowledge graph(s) to Gradoop CSV Datasource. Parameters ---------- kgs : Union[KG, Dict[str, KG]] Knowledge Graph(s) to serialize. out_path : str Folder where this data will be serialized to. label_attr : str, Default = "_label" Vertex attribute to use for Gradoop's special type attribute. attribute_type_mapping : Dict, Default=None Manually set attribute types. vertex_id_attr_name : str, Default="_forayer_id" Save the current entity id as property with this name. If set to None, entity id is not saved. default_graph_type : str Label graphs as this type if they do not have a name. graph_name_as_property : str Save the name of graphs as seperate property. overwrite : bool If True, overwrites existing files at output. """ if os.path.exists(out_path) and not overwrite: raise ValueError(f"Path {out_path} already exists") os.makedirs(out_path, exist_ok=True) graphs_csv_path = os.path.join(out_path, "graphs.csv") vertices_csv_path = os.path.join(out_path, "vertices.csv") edges_csv_path = os.path.join(out_path, "edges.csv") metadata_csv_path = os.path.join(out_path, "metadata.csv") if isinstance(kgs, KG): name = kgs.name if kgs is not None else "0" assert name is not None kgs = {name: kgs} metadata = _create_metadata( kgs=kgs, label_attr=label_attr, attribute_type_mapping=attribute_type_mapping, vertex_id_attr_name=vertex_id_attr_name, graph_name_as_property=graph_name_as_property, ) kg_dict_with_gid = _kgs_dict_to_gradoop_id(kgs) vertex_lines, vid_to_gid = _create_vertex_lines( kgs=kg_dict_with_gid, label_attr=label_attr, vertex_metadata=metadata["v"], vertex_id_attr_name=vertex_id_attr_name, ) edge_lines = _create_edge_lines( kgs=kg_dict_with_gid, edge_metadata=metadata["e"], vid_to_gid=vid_to_gid ) graph_lines = _create_graph_lines( kgs=kg_dict_with_gid, graph_metadata=metadata["g"], default_type=default_graph_type, graph_name_as_property=graph_name_as_property, ) metadata_lines = _create_metadata_lines(metadata) _write_lines(graph_lines, graphs_csv_path) _write_lines(edge_lines, edges_csv_path) _write_lines(vertex_lines, vertices_csv_path) _write_lines(metadata_lines, metadata_csv_path)