Source code for dicee.read_preprocess_save_load_kg.preprocess

import pandas as pd
import polars as pl
from .util import (timeit, pandas_dataframe_indexer, dataset_sanity_checking, 
                   get_er_vocab, get_re_vocab, get_ee_vocab, apply_reciprocal_or_noise, 
                   polars_dataframe_indexer)
from dicee.static_funcs import numpy_data_type_changer
import numpy as np
import concurrent
from typing import List, Tuple
from typing import Union


[docs] class PreprocessKG: """ Preprocess the data in memory """ def __init__(self, kg): self.kg = kg
[docs] def start(self) -> None: """ Preprocess train, valid and test datasets stored in knowledge graph instance Parameter --------- Returns ------- None """ # Process if self.kg.byte_pair_encoding and self.kg.padding: self.preprocess_with_byte_pair_encoding_with_padding() elif self.kg.byte_pair_encoding: self.preprocess_with_byte_pair_encoding() elif self.kg.backend == "polars": self.preprocess_with_polars() elif self.kg.backend in ["pandas", "rdflib"]: self.preprocess_with_pandas() else: raise KeyError(f'{self.kg.backend} not found') if self.kg.eval_model: if self.kg.byte_pair_encoding: data = [] data.extend(self.kg.raw_train_set.values.tolist()) if self.kg.raw_valid_set is not None: data.extend(self.kg.raw_valid_set.values.tolist()) if self.kg.raw_test_set is not None: data.extend(self.kg.raw_test_set.values.tolist()) else: if isinstance(self.kg.valid_set, np.ndarray) and isinstance(self.kg.test_set, np.ndarray): data = np.concatenate([self.kg.train_set, self.kg.valid_set, self.kg.test_set]) else: data = self.kg.train_set print('Submit er-vocab, re-vocab, and ee-vocab via ProcessPoolExecutor...') # We need to benchmark the benefits of using futures ? executor = concurrent.futures.ProcessPoolExecutor() self.kg.er_vocab = executor.submit(get_er_vocab, data, self.kg.path_for_serialization + '/er_vocab.p') self.kg.re_vocab = executor.submit(get_re_vocab, data, self.kg.path_for_serialization + '/re_vocab.p') self.kg.ee_vocab = executor.submit(get_ee_vocab, data, self.kg.path_for_serialization + '/ee_vocab.p') assert isinstance(self.kg.raw_train_set, (pd.DataFrame, pl.DataFrame)) if self.kg.byte_pair_encoding and self.kg.padding: assert isinstance(self.kg.train_set, list) assert isinstance(self.kg.train_set[0], tuple) assert isinstance(self.kg.train_set[0][0], tuple) assert isinstance(self.kg.train_set[0][1], tuple) assert isinstance(self.kg.train_set[0][2], tuple) if self.kg.training_technique == "NegSample": """No need to do anything""" elif self.kg.training_technique == "KvsAll": # Construct the training data: A single data point is a unique pair of # a sequence of sub-words representing an entity # a sequence of sub-words representing a relation # Mapping from a sequence of bpe entity to its unique integer index entity_to_idx = {shaped_bpe_ent: idx for idx, (str_ent, bpe_ent, shaped_bpe_ent) in enumerate(self.kg.ordered_bpe_entities)} er_tails = dict() # Iterate over bpe encoded triples to obtain a mapping from pair of bpe entity and relation to # indices of bpe entities bpe_entities = [] for (h, r, t) in self.kg.train_set: er_tails.setdefault((h, r), list()).append(entity_to_idx[t]) bpe_entities.append(t) # Generate a training data self.kg.train_set = [] self.kg.train_target_indices = [] for (shaped_bpe_h, shaped_bpe_r), list_of_indices_of_tails in er_tails.items(): self.kg.train_set.append((shaped_bpe_h, shaped_bpe_r)) # List of integers denoting the index of shaped_bpe_entities self.kg.train_target_indices.append(list_of_indices_of_tails) self.kg.train_set = np.array(self.kg.train_set) self.kg.target_dim = len(self.kg.ordered_bpe_entities) elif self.kg.training_technique == "AllvsAll": entity_to_idx = {shaped_bpe_ent: idx for idx, (str_ent, bpe_ent, shaped_bpe_ent) in enumerate(self.kg.ordered_bpe_entities)} er_tails = dict() bpe_entities = [] for (h, r, t) in self.kg.train_set: er_tails.setdefault((h, r), list()).append(entity_to_idx[t]) bpe_entities.append(t) bpe_tokens = {shaped_bpe_token for (str_entity, bpe_entity, shaped_bpe_token) in self.kg.ordered_bpe_entities + self.kg.ordered_bpe_relations} # Iterate over all for i in bpe_tokens: for j in bpe_tokens: if er_tails.get((i, j), None) is None: er_tails[(i, j)] = list() # Generate a training data self.kg.train_set = [] self.kg.train_target_indices = [] for (shaped_bpe_h, shaped_bpe_r), list_of_indices_of_tails in er_tails.items(): self.kg.train_set.append((shaped_bpe_h, shaped_bpe_r)) # List of integers denoting the index of shaped_bpe_entities self.kg.train_target_indices.append(list_of_indices_of_tails) self.kg.train_set = np.array(self.kg.train_set) self.kg.target_dim = len(self.kg.ordered_bpe_entities) else: raise NotImplementedError( f" Scoring technique {self.self.kg.training_technique} with BPE not implemented") if self.kg.max_length_subword_tokens is None and self.kg.byte_pair_encoding: self.kg.max_length_subword_tokens = len(self.kg.train_set[0][0]) elif self.kg.byte_pair_encoding: space_token = self.kg.enc.encode(" ")[0] end_token = self.kg.enc.encode(".")[0] triples = [] for (h, r, t) in self.kg.train_set: x = [*h, space_token, *r, space_token, *t, end_token] triples.extend(x) self.kg.train_set = np.array(triples) else: # Clear raw data to free memory self.kg.raw_train_set = None self.kg.raw_valid_set = None self.kg.raw_test_set = None
@staticmethod def __replace_values_df(df: pd.DataFrame = None, f=None) -> Union[ None, List[Tuple[Tuple[int], Tuple[int], Tuple[int]]]]: """ Map a n by 3 pandas dataframe containing n triples into a list of n tuples where each tuple contains three tuples corresdoing to sequence of sub-word list representing head entity relation, and tail entity respectivly. Parameters ---------- df: pandas.Dataframe f: an encoder function. Returns ------- """ if df is None: return [] else: bpe_triples = list(df.map(lambda x: tuple(f(x))).itertuples(index=False, name=None)) assert isinstance(bpe_triples, list) assert isinstance(bpe_triples[0], tuple) assert len(bpe_triples[0]) == 3 assert isinstance(bpe_triples[0][0], tuple) assert isinstance(bpe_triples[0][0][0], int) return bpe_triples def __finding_max_token(self, concat_of_train_val_test) -> int: max_length_subword_tokens = 0 for i in concat_of_train_val_test: max_token_length_per_triple = max(len(i[0]), len(i[1]), len(i[2])) if max_token_length_per_triple > max_length_subword_tokens: max_length_subword_tokens = max_token_length_per_triple return max_length_subword_tokens def __padding_in_place(self, x, max_length_subword_tokens, bpe_subwords_to_shaped_bpe_entities, bpe_subwords_to_shaped_bpe_relations): for i, (s, p, o) in enumerate(x): if len(s) < max_length_subword_tokens: s_encoded = s + tuple(self.kg.dummy_id for _ in range(max_length_subword_tokens - len(s))) else: s_encoded = s if len(p) < max_length_subword_tokens: p_encoded = p + tuple(self.kg.dummy_id for _ in range(max_length_subword_tokens - len(p))) else: p_encoded = p if len(o) < max_length_subword_tokens: o_encoded = o + tuple(self.kg.dummy_id for _ in range(max_length_subword_tokens - len(o))) else: o_encoded = o bpe_subwords_to_shaped_bpe_entities[o] = o_encoded bpe_subwords_to_shaped_bpe_entities[s] = s_encoded bpe_subwords_to_shaped_bpe_relations[p] = p_encoded x[i] = (s_encoded, p_encoded, o_encoded) return x
[docs] def preprocess_with_byte_pair_encoding(self): assert isinstance(self.kg.raw_train_set, pd.DataFrame) assert self.kg.raw_train_set.columns.tolist() == ['subject', 'relation', 'object'] # Add reciprocal or noisy triples self.kg.raw_train_set = apply_reciprocal_or_noise(add_reciprocal=self.kg.add_reciprocal, eval_model=self.kg.eval_model, df=self.kg.raw_train_set, info="Train") self.kg.raw_valid_set = apply_reciprocal_or_noise(add_reciprocal=self.kg.add_reciprocal, eval_model=self.kg.eval_model, df=self.kg.raw_valid_set, info="Validation") self.kg.raw_test_set = apply_reciprocal_or_noise(add_reciprocal=self.kg.add_reciprocal, eval_model=self.kg.eval_model, df=self.kg.raw_test_set, info="Test") # Transform DataFrames to list of tuples with BPE encoding self.kg.train_set = self.__replace_values_df(df=self.kg.raw_train_set, f=self.kg.enc.encode) self.kg.valid_set = self.__replace_values_df(df=self.kg.raw_valid_set, f=self.kg.enc.encode) self.kg.test_set = self.__replace_values_df(df=self.kg.raw_test_set, f=self.kg.enc.encode)
[docs] @timeit def preprocess_with_byte_pair_encoding_with_padding(self) -> None: """Preprocess with byte pair encoding and add padding""" self.preprocess_with_byte_pair_encoding() self.kg.max_length_subword_tokens = self.__finding_max_token( self.kg.train_set + self.kg.valid_set + self.kg.test_set) # Store padded bpe entities and relations bpe_subwords_to_shaped_bpe_entities = dict() bpe_subwords_to_shaped_bpe_relations = dict() print("The longest sequence of sub-word units of entities and relations is ", self.kg.max_length_subword_tokens) # Padding self.kg.train_set = self.__padding_in_place(self.kg.train_set, self.kg.max_length_subword_tokens, bpe_subwords_to_shaped_bpe_entities, bpe_subwords_to_shaped_bpe_relations) if self.kg.valid_set is not None: self.kg.valid_set = self.__padding_in_place(self.kg.valid_set, self.kg.max_length_subword_tokens, bpe_subwords_to_shaped_bpe_entities, bpe_subwords_to_shaped_bpe_relations) if self.kg.test_set is not None: self.kg.test_set = self.__padding_in_place(self.kg.test_set, self.kg.max_length_subword_tokens, bpe_subwords_to_shaped_bpe_entities, bpe_subwords_to_shaped_bpe_relations) # Store str_entity, bpe_entity, padded_bpe_entity self.kg.ordered_bpe_entities = sorted([(self.kg.enc.decode(k), k, v) for k, v in bpe_subwords_to_shaped_bpe_entities.items()], key=lambda x: x[0]) self.kg.ordered_bpe_relations = sorted([(self.kg.enc.decode(k), k, v) for k, v in bpe_subwords_to_shaped_bpe_relations.items()], key=lambda x: x[0]) del bpe_subwords_to_shaped_bpe_entities del bpe_subwords_to_shaped_bpe_relations
[docs] @timeit def preprocess_with_pandas(self) -> None: """Preprocess with pandas: add reciprocal triples, construct vocabulary, and index datasets""" # Add reciprocal or noisy triples self.kg.raw_train_set = apply_reciprocal_or_noise(add_reciprocal=self.kg.add_reciprocal, eval_model=self.kg.eval_model, df=self.kg.raw_train_set, info="Train") self.kg.raw_valid_set = apply_reciprocal_or_noise(add_reciprocal=self.kg.add_reciprocal, eval_model=self.kg.eval_model, df=self.kg.raw_valid_set, info="Validation") self.kg.raw_test_set = apply_reciprocal_or_noise(add_reciprocal=self.kg.add_reciprocal, eval_model=self.kg.eval_model, df=self.kg.raw_test_set, info="Test") # Construct vocabulary self.sequential_vocabulary_construction() self.kg.num_entities, self.kg.num_relations = len(self.kg.entity_to_idx), len(self.kg.relation_to_idx) max_idx = max(self.kg.num_entities, self.kg.num_relations) # Index and convert datasets def index_and_convert(df, name): print(f'Indexing {name} data with shape {df.shape}...') indexed = pandas_dataframe_indexer(df, self.kg.entity_to_idx, self.kg.relation_to_idx).values dataset_sanity_checking(indexed, self.kg.num_entities, self.kg.num_relations) return numpy_data_type_changer(indexed, num=max_idx) self.kg.train_set = index_and_convert(self.kg.raw_train_set, "train") if self.kg.raw_valid_set is not None: self.kg.valid_set = index_and_convert(self.kg.raw_valid_set, "valid") if self.kg.raw_test_set is not None: self.kg.test_set = index_and_convert(self.kg.raw_test_set, "test")
[docs] @timeit def preprocess_with_polars(self) -> None: """Preprocess with polars: add reciprocal triples and create indexed datasets""" print(f'*** Preprocessing Train Data:{self.kg.raw_train_set.shape} with Polars ***') # Add reciprocal triples if self.kg.add_reciprocal and self.kg.eval_model: def add_reciprocal(df): if df is not None: return df.extend(df.select([ pl.col("object").alias('subject'), pl.col("relation") + '_inverse', pl.col("subject").alias('object') ])) return df print('Adding Reciprocal Triples...') self.kg.raw_train_set = add_reciprocal(self.kg.raw_train_set) self.kg.raw_valid_set = add_reciprocal(self.kg.raw_valid_set) self.kg.raw_test_set = add_reciprocal(self.kg.raw_test_set) # Type checking assert isinstance(self.kg.raw_train_set, pl.DataFrame) assert self.kg.raw_valid_set is None or isinstance(self.kg.raw_valid_set, pl.DataFrame) assert self.kg.raw_test_set is None or isinstance(self.kg.raw_test_set, pl.DataFrame) # Concatenate all splits for vocabulary construction print('Concat Splits...') splits = [self.kg.raw_train_set] if self.kg.raw_valid_set is not None: splits.append(self.kg.raw_valid_set) if self.kg.raw_test_set is not None: splits.append(self.kg.raw_test_set) df_str_kg = pl.concat(splits) # Build entity vocabulary (sorted alphabetically for deterministic indexing) print("Collecting entities...") subjects = df_str_kg.select(pl.col("subject").unique().alias("entity")) objects = df_str_kg.select(pl.col("object").unique().alias("entity")) self.kg.entity_to_idx = pl.concat([subjects, objects], how="vertical").unique().sort("entity") self.kg.entity_to_idx = self.kg.entity_to_idx.with_row_index("index").select(["index", "entity"]) print(f"Unique entities: {len(self.kg.entity_to_idx)}") # Build relation vocabulary (sorted alphabetically for deterministic indexing) print('Relation Indexing...') self.kg.relation_to_idx = df_str_kg.select(pl.col("relation").unique()).sort("relation") self.kg.relation_to_idx = self.kg.relation_to_idx.with_row_index("index").select(["index", "relation"]) del df_str_kg # Index datasets print(f'Indexing Training Data {self.kg.raw_train_set.shape}...') self.kg.train_set = polars_dataframe_indexer(self.kg.raw_train_set, self.kg.entity_to_idx, self.kg.relation_to_idx).to_numpy() if self.kg.raw_valid_set is not None: print(f'Indexing Val Data {self.kg.raw_valid_set.shape}...') self.kg.valid_set = polars_dataframe_indexer(self.kg.raw_valid_set, self.kg.entity_to_idx, self.kg.relation_to_idx).to_numpy() if self.kg.raw_test_set is not None: print(f'Indexing Test Data {self.kg.raw_test_set.shape}...') self.kg.test_set = polars_dataframe_indexer(self.kg.raw_test_set, self.kg.entity_to_idx, self.kg.relation_to_idx).to_numpy() self.kg.num_entities, self.kg.num_relations = len(self.kg.entity_to_idx), len(self.kg.relation_to_idx) print(f'*** Preprocessing Train Data:{self.kg.train_set.shape} with Polars DONE ***')
[docs] def sequential_vocabulary_construction(self) -> None: """ (1) Read input data into memory (2) Remove triples with a condition (3) Serialize vocabularies in a pandas dataframe where => the index is integer and => a single column is string (e.g. URI) """ assert isinstance(self.kg.raw_train_set, pd.DataFrame) assert self.kg.raw_valid_set is None or isinstance(self.kg.raw_valid_set, pd.DataFrame) assert self.kg.raw_test_set is None or isinstance(self.kg.raw_test_set, pd.DataFrame) # Concatenate all data splits print('Concatenating data to build vocabulary...') splits = [self.kg.raw_train_set] if self.kg.raw_valid_set is not None: splits.append(self.kg.raw_valid_set) if self.kg.raw_test_set is not None: splits.append(self.kg.raw_test_set) df_str_kg = pd.concat(splits, ignore_index=True) print('Creating a mapping from entities to integer indexes...') # (5) Create a bijection mapping from entities of (2) to integer indexes. # Build entity vocabulary (sorted alphabetically for deterministic indexing) # This ensures the same entity always gets the same index regardless of input order self.kg.entity_to_idx = pd.concat((df_str_kg['subject'],df_str_kg['object'])).drop_duplicates().sort_values().reset_index(drop=True).to_frame("entity") # Build relation vocabulary (sorted alphabetically for deterministic indexing) self.kg.relation_to_idx = df_str_kg['relation'].drop_duplicates().sort_values().reset_index(drop=True).to_frame("relation")
# del ordered_list