Source code for ontolearn.learners.clip

# -----------------------------------------------------------------------------
# MIT License
#
# Copyright (c) 2024 Ontolearn Team
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------

"""CLIP: Concept Learner with Integrated Length Prediction."""

import os
import time
import pandas as pd
import torch
from datetime import datetime
from typing import List, Tuple, Iterable, Optional, Union

from torch.utils.data import DataLoader
from torch.functional import F
from torch.nn.utils.rnn import pad_sequence
from owlapy.owl_individual import OWLNamedIndividual
from owlapy.abstracts import AbstractOWLReasoner
from owlapy.utils import ConceptOperandSorter

from ontolearn.abstracts import AbstractKnowledgeBase, AbstractScorer, BaseRefinement, AbstractHeuristic
from ontolearn.data_struct import CLIPDataset, CLIPDatasetInference
from ontolearn.refinement_operators import ExpressRefinement
from ontolearn.learning_problem import PosNegLPStandard
from ontolearn.clip_architectures import LengthLearner_LSTM, LengthLearner_GRU, LengthLearner_CNN, \
    LengthLearner_SetTransformer
from ontolearn.clip_trainer import CLIPTrainer
from ontolearn.learners import CELOE
from ontolearn.search import OENode

_concept_operand_sorter = ConceptOperandSorter()


[docs] class CLIP(CELOE): """Concept Learner with Integrated Length Prediction. This algorithm extends the CELOE algorithm by using concept length predictors and a different refinement operator, i.e., ExpressRefinement Attributes: best_descriptions (EvaluatedDescriptionSet[OENode, QualityOrderedNode]): Best hypotheses ordered. best_only (bool): If False pick only nodes with quality < 1.0, else pick without quality restrictions. calculate_min_max (bool): Calculate minimum and maximum horizontal expansion? Statistical purpose only. heuristic_func (AbstractHeuristic): Function to guide the search heuristic. heuristic_queue (SortedSet[OENode]): A sorted set that compares the nodes based on Heuristic. iter_bound (int): Limit to stop the algorithm after n refinement steps are done. kb (AbstractKnowledgeBase): The knowledge base that the concept learner is using. max_child_length (int): Limit the length of concepts generated by the refinement operator. max_he (int): Maximal value of horizontal expansion. max_num_of_concepts_tested (int) Limit to stop the algorithm after n concepts tested. max_runtime (int): Limit to stop the algorithm after n seconds. min_he (int): Minimal value of horizontal expansion. name (str): Name of the model = 'celoe_python'. _number_of_tested_concepts (int): Yes, you got it. This stores the number of tested concepts. operator (BaseRefinement): Operator used to generate refinements. quality_func (AbstractScorer) The quality function to be used. reasoner (AbstractOWLReasoner): The reasoner that this model is using. search_tree (Dict[OWLClassExpression, TreeNode[OENode]]): Dict to store the TreeNode for a class expression. start_class (OWLClassExpression): The starting class expression for the refinement operation. start_time (float): The time when :meth:`fit` starts the execution. Used to calculate the total time :meth:`fit` takes to execute. terminate_on_goal (bool): Whether to stop the algorithm if a perfect solution is found. """ __slots__ = 'best_descriptions', 'max_he', 'min_he', 'best_only', 'calculate_min_max', 'heuristic_queue', \ 'search_tree', '_learning_problem', '_max_runtime', '_seen_norm_concepts', 'predictor_name', \ 'pretrained_predictor_name', 'load_pretrained', 'output_size', 'num_examples', 'path_of_embeddings', \ 'instance_embeddings', 'input_size', 'device', 'length_predictor', 'num_workers', 'knowledge_base' name = 'CLIP' def __init__(self, knowledge_base: AbstractKnowledgeBase, reasoner: Optional[AbstractOWLReasoner] = None, refinement_operator: Optional[BaseRefinement[OENode]] = ExpressRefinement, quality_func: Optional[AbstractScorer] = None, heuristic_func: Optional[AbstractHeuristic] = None, terminate_on_goal: Optional[bool] = None, iter_bound: Optional[int] = None, max_num_of_concepts_tested: Optional[int] = None, max_runtime: Optional[int] = None, max_results: int = 10, best_only: bool = False, calculate_min_max: bool = True, path_of_embeddings="", predictor_name=None, pretrained_predictor_name=["SetTransformer", "LSTM", "GRU", "CNN"], load_pretrained=False, num_workers=4, num_examples=1000, output_size=15 ): super().__init__(knowledge_base, reasoner, refinement_operator, quality_func, heuristic_func, terminate_on_goal, iter_bound, max_num_of_concepts_tested, max_runtime, max_results, best_only, calculate_min_max) self.predictor_name = predictor_name self.pretrained_predictor_name = pretrained_predictor_name self.knowledge_base = knowledge_base self.load_pretrained = load_pretrained self.num_workers = num_workers self.output_size = output_size self.num_examples = num_examples self.path_of_embeddings = path_of_embeddings if self.path_of_embeddings: assert os.path.isfile(self.path_of_embeddings), '!!! Wrong path for CLIP embeddings' self.instance_embeddings = pd.read_csv(path_of_embeddings, index_col=0) self.input_size = self.instance_embeddings.shape[1] self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.length_predictor = self.get_length_predictor()
[docs] def get_length_predictor(self): def load_model(predictor_name, load_pretrained): if predictor_name is None or not self.path_of_embeddings: return [] if predictor_name == 'SetTransformer': model = LengthLearner_SetTransformer(self.input_size, self.output_size, proj_dim=256, num_heads=4, num_seeds=1, m=32) elif predictor_name == 'GRU': model = LengthLearner_GRU(self.input_size, self.output_size, proj_dim=256, rnn_n_layers=2, drop_prob=0.2) elif predictor_name == 'LSTM': model = LengthLearner_LSTM(self.input_size, self.output_size, proj_dim=256, rnn_n_layers=2, drop_prob=0.2) elif predictor_name == 'CNN': model = LengthLearner_CNN(self.input_size, self.output_size, self.num_examples, proj_dim=256, kernel_size=[[5, 7], [5, 7]], stride=[[3, 3], [3, 3]]) path_of_trained_models = self.path_of_embeddings.split("embeddings")[ 0] + "trained_models/trained_" + predictor_name + ".pt" if load_pretrained and os.path.isfile(path_of_trained_models): model.load_state_dict(torch.load(path_of_trained_models, map_location=self.device, weights_only=True)) model.eval() print("\n Loaded length predictor!") return model if not self.load_pretrained: return [load_model(self.predictor_name, self.load_pretrained)] elif self.load_pretrained and isinstance(self.pretrained_predictor_name, str): return [load_model(self.pretrained_predictor_name, self.load_pretrained)] elif self.load_pretrained and isinstance(self.pretrained_predictor_name, list): return [load_model(name, self.load_pretrained) for name in self.pretrained_predictor_name]
[docs] def refresh(self): self.length_predictor = self.get_length_predictor()
[docs] def collate_batch(self, batch): # pragma: no cover pos_emb_list = [] neg_emb_list = [] target_labels = [] for pos_emb, neg_emb, label in batch: if pos_emb.ndim != 2: pos_emb = pos_emb.reshape(1, -1) if neg_emb.ndim != 2: neg_emb = neg_emb.reshape(1, -1) pos_emb_list.append(pos_emb) neg_emb_list.append(neg_emb) target_labels.append(label) pos_emb_list[0] = F.pad(pos_emb_list[0], (0, 0, 0, self.num_examples - pos_emb_list[0].shape[0]), "constant", 0) pos_emb_list = pad_sequence(pos_emb_list, batch_first=True, padding_value=0) neg_emb_list[0] = F.pad(neg_emb_list[0], (0, 0, 0, self.num_examples - neg_emb_list[0].shape[0]), "constant", 0) neg_emb_list = pad_sequence(neg_emb_list, batch_first=True, padding_value=0) return pos_emb_list, neg_emb_list, torch.LongTensor(target_labels)
[docs] def collate_batch_inference(self, batch): # pragma: no cover pos_emb_list = [] neg_emb_list = [] for pos_emb, neg_emb in batch: if pos_emb.ndim != 2: pos_emb = pos_emb.reshape(1, -1) if neg_emb.ndim != 2: neg_emb = neg_emb.reshape(1, -1) pos_emb_list.append(pos_emb) neg_emb_list.append(neg_emb) pos_emb_list[0] = F.pad(pos_emb_list[0], (0, 0, 0, self.num_examples - pos_emb_list[0].shape[0]), "constant", 0) pos_emb_list = pad_sequence(pos_emb_list, batch_first=True, padding_value=0) neg_emb_list[0] = F.pad(neg_emb_list[0], (0, 0, 0, self.num_examples - neg_emb_list[0].shape[0]), "constant", 0) neg_emb_list = pad_sequence(neg_emb_list, batch_first=True, padding_value=0) return pos_emb_list, neg_emb_list
[docs] def pos_neg_to_tensor(self, pos: Union[List[OWLNamedIndividual], List[str]], neg: Union[List[OWLNamedIndividual], List[str]]): if isinstance(pos[0], OWLNamedIndividual): pos_str = [ind.str.split("/")[-1] for ind in pos][:self.num_examples] neg_str = [ind.str.split("/")[-1] for ind in neg][:self.num_examples] elif isinstance(pos[0], str): pos_str = pos[:self.num_examples] neg_str = neg[:self.num_examples] else: raise ValueError(f"Invalid input type, was expecting OWLNamedIndividual or str but found {type(pos[0])}") dataset = CLIPDatasetInference([("", pos_str, neg_str)], self.instance_embeddings, self.num_examples, False, False) dataloader = DataLoader(dataset, batch_size=1, num_workers=self.num_workers, collate_fn=self.collate_batch_inference, shuffle=False) x_pos, x_neg = next(iter(dataloader)) return x_pos, x_neg
[docs] def predict_length(self, models, x_pos, x_neg): for i, model in enumerate(models): model.eval() model.to(self.device) x_pos = x_pos.to(self.device) x_neg = x_neg.to(self.device) if i == 0: scores = model(x_pos, x_neg) else: sc = model(x_pos, x_neg) scores = scores + sc scores = scores / len(models) prediction = int(scores.argmax(1).cpu()) print(f"\n***** Predicted length: {prediction} *****\n") return prediction
[docs] def fit(self, *args, **kwargs): """ Find hypotheses that explain pos and neg. """ self.clean() max_runtime = kwargs.pop("max_runtime", None) learning_problem = self.construct_learning_problem(PosNegLPStandard, args, kwargs) assert not self.search_tree self._learning_problem = learning_problem.encode_kb(self.kb) if max_runtime is not None: self._max_runtime = max_runtime else: self._max_runtime = self.max_runtime if (self.pretrained_predictor_name is not None) and self.length_predictor[0] != []: x_pos, x_neg = self.pos_neg_to_tensor(list(self._learning_problem.kb_pos)[:self.num_examples], list(self._learning_problem.kb_neg)[:self.num_examples]) max_length = self.predict_length(self.length_predictor, x_pos, x_neg) self.operator.max_child_length = max_length print(f'***** Predicted length: {max_length} *****') else: print('\n!!! No length predictor provided, running CLIP without length predictor !!!') root = self.make_node(_concept_operand_sorter.sort(self.start_class), is_root=True) self._add_node(root, None) assert len(self.heuristic_queue) == 1 self.start_time = time.time() for j in range(1, self.iter_bound): most_promising = self.next_node_to_expand(j) tree_parent = self.tree_node(most_promising) minimum_length = most_promising.h_exp for ref in self.downward_refinement(most_promising): # we ignore all refinements with lower length # (this also avoids duplicate node children) if ref.len < minimum_length: # ignoring refinement, it does not satisfy minimum_length condition continue # note: tree_parent has to be equal to node_tree_parent(ref.parent_node)! added = self._add_node(ref, tree_parent) goal_found = added and ref.quality == 1.0 if goal_found and self.terminate_on_goal: return self.terminate() if self.calculate_min_max: # This is purely a statistical function, it does not influence CELOE self.update_min_max_horiz_exp(most_promising) if time.time() - self.start_time > self._max_runtime: return self.terminate() if self.number_of_tested_concepts >= self.max_num_of_concepts_tested: return self.terminate() return self.terminate()
[docs] def train(self, data: Iterable[List[Tuple]], epochs=300, batch_size=256, learning_rate=1e-3, decay_rate=0.0, clip_value=5.0, save_model=True, storage_path=None, optimizer='Adam', record_runtime=True, example_sizes=None, shuffle_examples=False): train_dataset = CLIPDataset(data, self.instance_embeddings, num_examples=self.num_examples, shuffle_examples=shuffle_examples, example_sizes=example_sizes) train_dataloader = DataLoader(train_dataset, batch_size=batch_size, num_workers=self.num_workers, collate_fn=self.collate_batch, shuffle=True) if storage_path is None: currentDateAndTime = datetime.now() storage_path = f'CLIP-Experiment-{currentDateAndTime.strftime("%H-%M-%S")}' elif not os.path.exists(storage_path) and (record_runtime or save_model): os.mkdir(storage_path) trainer = CLIPTrainer(self, epochs=epochs, learning_rate=learning_rate, decay_rate=decay_rate, clip_value=clip_value, storage_path=storage_path) trainer.train(train_dataloader, save_model, optimizer, record_runtime)