# -----------------------------------------------------------------------------
# MIT License
#
# Copyright (c) 2024 Ontolearn Team
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------
"""
NERO - Neural Class Expression Learning with Reinforcement.
This module implements NERO, a neural-symbolic concept learner that combines
neural networks with symbolic reasoning for OWL class expression learning.
"""
from typing import Dict, List, Set, Tuple, Optional
import time
import torch
from owlapy import dl_to_owl_expression
from owlapy.class_expression import OWLThing
from owlapy.owl_individual import OWLNamedIndividual
from owlapy.render import DLSyntaxObjectRenderer
from ontolearn.knowledge_base import KnowledgeBase
from ontolearn.learning_problem import PosNegLPStandard
from ontolearn.nero_architectures import DeepSet, SetTransformerNet
from ontolearn.nero_utils import SearchTree, TargetClassExpression
from ontolearn.refinement_operators import NERORefinement
from ontolearn.utils.static_funcs import compute_f1_score
# =============================================================================
# NERO Main Class
# =============================================================================
[docs]
class NERO:
"""
NERO - Neural Class Expression Learning with Reinforcement.
NERO combines neural networks with symbolic reasoning for learning OWL class expressions.
It uses set-based neural architectures (DeepSet or SetTransformer) to predict quality scores
for candidate class expressions.
Args:
knowledge_base: The knowledge base to learn from
num_embedding_dim: Dimensionality of entity embeddings (default: 50)
neural_architecture: Neural architecture to use ('DeepSet' or 'SetTransformer', default: 'DeepSet')
learning_rate: Learning rate for training (default: 0.001)
num_epochs: Number of training epochs (default: 100)
batch_size: Batch size for training (default: 32)
num_workers: Number of workers for data loading (default: 4)
quality_func: Quality function for evaluating expressions (default: F1-score)
max_runtime: Maximum runtime in seconds (default: None)
verbose: Verbosity level (default: 0)
"""
name = 'NERO'
def __init__(self,
knowledge_base: KnowledgeBase,
namespace = None,
num_embedding_dim: int = 50,
neural_architecture: str = 'DeepSet',
learning_rate: float = 0.001,
num_epochs: int = 100,
batch_size: int = 32,
num_workers: int = 4,
quality_func=None,
max_runtime: Optional[int] = 10,
verbose: int = 0):
self.kb = knowledge_base
self.ns = namespace
self.num_embedding_dim = num_embedding_dim
self.neural_architecture = neural_architecture
self.learning_rate = learning_rate
self.num_epochs = num_epochs
self.batch_size = batch_size
self.num_workers = num_workers
self.max_runtime = max_runtime
self.verbose = verbose
self.search_tree = SearchTree()
self.refinement_op = None
# Quality function
if quality_func is None:
self.quality_func = compute_f1_score
else:
self.quality_func = quality_func
# Device
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Model components (initialized during training)
self.model = None
self.instance_idx_mapping = None
self.idx_to_instance_mapping = None
self.target_class_expressions = None
self.expression = {}
self._best_predictions = None
# Training state
self._is_trained = False
if self.verbose > 0:
print(f"NERO initialized with {self.neural_architecture} architecture")
print(f"Device: {self.device}")
def _initialize_instance_mapping(self):
"""Initialize mapping from individuals to indices."""
self.instance_idx_mapping = {
ind.str: i
for i, ind in enumerate(self.kb.individuals())
}
self.idx_to_instance_mapping = {
v: k for k, v in self.instance_idx_mapping.items()
}
def _initialize_refinement_operator(self):
"""Initialize the refinement operator."""
if self.refinement_op is None:
if self.verbose > 0:
print("Initializing refinement operator...")
self.refinement_op = NERORefinement(self.kb)
self.expression.update(self.refinement_op.expressions)
def _extract_target_expressions(self) -> List[TargetClassExpression]:
"""Extract target class expressions from the knowledge base."""
renderer = DLSyntaxObjectRenderer()
target_expressions = []
# Get all named classes
for idx, owl_class in enumerate(self.kb.ontology.classes_in_signature()):
individuals = set(
ind.str
for ind in self.kb.individuals(owl_class)
)
idx_individuals = set(
self.instance_idx_mapping[iri]
for iri in individuals
)
target_exp = TargetClassExpression(
label_id=idx,
name=renderer.render(owl_class),
str_individuals=individuals,
idx_individuals=idx_individuals,
expression_chain=[renderer.render(OWLThing)],
length=1,
_type='atomic_expression'
)
target_expressions.append(target_exp)
return target_expressions
def _create_model(self, num_outputs: int) -> torch.nn.Module:
"""Create the neural model based on architecture choice."""
num_instances = len(self.instance_idx_mapping)
if self.neural_architecture == 'DeepSet':
model = DeepSet(
num_instances=num_instances,
num_embedding_dim=self.num_embedding_dim,
num_outputs=num_outputs
)
elif self.neural_architecture == 'SetTransformer':
model = SetTransformerNet(
num_instances=num_instances,
num_embedding_dim=self.num_embedding_dim,
num_outputs=num_outputs
)
else:
raise ValueError(f"Unknown architecture: {self.neural_architecture}")
return model
[docs]
def train(self, learning_problems: List[Tuple[List[str], List[str]]]):
"""
Train the NERO model on learning problems.
Args:
learning_problems: List of (positive_examples, negative_examples) tuples
"""
if self.verbose > 0:
print("Training NERO model...")
start_time = time.time()
# Initialize mappings
self._initialize_instance_mapping()
# Extract target expressions
self.target_class_expressions = self._extract_target_expressions()
if len(self.target_class_expressions) == 0:
raise ValueError("No target class expressions found in knowledge base")
# Create model
num_outputs = len(self.target_class_expressions)
self.model = self._create_model(num_outputs)
self.model.to(self.device)
self.model.train()
# Setup optimizer and loss
optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)
loss_func = torch.nn.MSELoss()
# Convert learning problems to tensors
X_pos_list, X_neg_list, Y_list = [], [], []
for pos_examples, neg_examples in learning_problems:
pos_idx = [self.instance_idx_mapping[uri] for uri in pos_examples]
neg_idx = [self.instance_idx_mapping[uri] for uri in neg_examples]
# Compute labels (F1 scores for each target expression)
labels = []
for target_exp in self.target_class_expressions:
f1 = self.quality_func(
individuals=target_exp.str_individuals,
pos=set(pos_examples),
neg=set(neg_examples)
)
labels.append(f1)
X_pos_list.append(pos_idx)
X_neg_list.append(neg_idx)
Y_list.append(labels)
# Pad sequences to same length
max_pos_len = max(len(x) for x in X_pos_list) if X_pos_list else 0
max_neg_len = max(len(x) for x in X_neg_list) if X_neg_list else 0
X_pos_padded = torch.zeros(len(X_pos_list), max_pos_len, dtype=torch.long)
X_neg_padded = torch.zeros(len(X_neg_list), max_neg_len, dtype=torch.long)
for i, (pos, neg) in enumerate(zip(X_pos_list, X_neg_list)):
X_pos_padded[i, :len(pos)] = torch.tensor(pos)
X_neg_padded[i, :len(neg)] = torch.tensor(neg)
Y = torch.tensor(Y_list, dtype=torch.float32)
# Training loop
dataset = torch.utils.data.TensorDataset(X_pos_padded, X_neg_padded, Y)
dataloader = torch.utils.data.DataLoader(
dataset, batch_size=self.batch_size, shuffle=True
)
for epoch in range(self.num_epochs):
epoch_loss = 0.0
for xpos, xneg, y in dataloader:
xpos = xpos.to(self.device)
xneg = xneg.to(self.device)
y = y.to(self.device)
optimizer.zero_grad()
predictions = self.model(xpos, xneg)
loss = loss_func(predictions, y)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
if self.verbose > 0 and (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}/{self.num_epochs}, Loss: {epoch_loss:.4f}")
self.model.eval()
self._is_trained = True
if self.verbose > 0:
training_time = time.time() - start_time
print(f"Training completed in {training_time:.2f} seconds")
[docs]
def search(self, pos: Set[str], neg: Set[str], top_k: int = 10, max_child_length: int = 10, max_queue_size: int = 10000) -> Dict:
"""
Perform reinforcement learning-based search for complex class expressions.
Uses neural predictions to initialize and guide the search.
"""
if self.verbose > 0:
print("Starting reinforcement learning search...")
start_time = time.time()
self.search_tree = SearchTree()
# Initialize with neural predictions OR top refinements
if self._is_trained and len(self.target_class_expressions) > 0:
idx_pos = torch.LongTensor([[self.instance_idx_mapping[i] for i in pos]])
idx_neg = torch.LongTensor([[self.instance_idx_mapping[i] for i in neg]])
with torch.no_grad():
sort_scores, sort_idxs = torch.topk(
self.model(idx_pos, idx_neg).flatten(),
min(top_k, len(self.target_class_expressions)),
largest=True
)
# Initialize queue with top neural predictions
for idx_target in sort_idxs.detach().numpy():
target_ce = self.target_class_expressions[idx_target]
try:
target_ce.quality = self.quality_func(
individuals=target_ce.str_individuals, pos=pos, neg=neg
)
self.search_tree.put(target_ce)
except (AttributeError, KeyError):
continue
# Always add top refinements to initial queue for diversity
# This ensures we explore complex expressions, not just atomic ones
if self.refinement_op and self.refinement_op.top_refinements:
for length, refinement_set in self.refinement_op.top_refinements.items():
for concept in refinement_set:
if concept.name not in self.search_tree.gate:
try:
concept.quality = self.quality_func(
individuals=concept.str_individuals, pos=pos, neg=neg
)
self.search_tree.put(concept)
except (AttributeError, KeyError):
continue
if len(self.search_tree.gate) == 0:
raise ValueError("Failed to initialize search tree with any concepts")
best_hypothesis = None
best_quality = -1.0
# Find initial best
for name, expr in list(self.search_tree.gate.items()):
if expr.quality > best_quality:
best_quality = expr.quality
best_hypothesis = expr
if self.verbose > 0:
print(f"Initial best hypothesis: {best_hypothesis.name} (Quality: {best_quality:.3f})")
print(f"Initial queue size: {len(self.search_tree.gate)}")
# Iterative refinement-based search
iteration = 0
max_iterations = 100000
num_explored = len(self.search_tree.gate)
# More generous exploration: explore up to 50% more concepts, not just 10%
exploration_limit = num_explored + max(num_explored // 2, 100)
while (not self.search_tree.items_in_queue.empty() and
(time.time() - start_time) < self.max_runtime and
iteration < max_iterations and
num_explored < exploration_limit):
iteration += 1
queue_size = self.search_tree.items_in_queue.qsize()
if queue_size > max_queue_size:
if self.verbose > 0:
print(f"WARNING: Queue size ({queue_size}) exceeded max. Stopping search.")
break
try:
current_concept = self.search_tree.get()
except Exception:
break
if current_concept.length > max_child_length:
continue
# Generate refinements
refined_concepts = self.refinement_op.refine(current_concept)
if not refined_concepts:
continue
# Limit refinements
if len(refined_concepts) > 100:
refined_concepts = refined_concepts[:100]
# Evaluate refinements
for next_concept in refined_concepts:
if next_concept.name not in self.search_tree.gate:
try:
quality = self.quality_func(
individuals=next_concept.str_individuals, pos=pos, neg=neg
)
next_concept.quality = quality
if next_concept.length <= max_child_length and queue_size < max_queue_size:
self.search_tree.put(next_concept)
num_explored += 1
if quality > best_quality:
best_quality = quality
best_hypothesis = next_concept
if self.verbose > 1:
print(f"New best at iter {iteration}: {best_hypothesis.name} (Quality: {best_quality:.3f})")
# Early stopping if perfect solution found
if quality >= 1.0:
if self.verbose > 0:
print(f"Perfect solution found at iteration {iteration}")
runtime = time.time() - start_time
return {
'Prediction': best_hypothesis.name,
'F-measure': best_quality,
'Runtime': runtime,
'Quality': best_quality
}
except (AttributeError, KeyError, Exception) as e:
# Skip concepts that cause errors during evaluation
if self.verbose > 1:
print(f"Skipping concept due to error: {e}")
continue
if iteration % 10 == 0:
time.sleep(0.001)
if self.verbose > 0 and iteration % 1000 == 0:
elapsed = time.time() - start_time
print(f"Iter {iteration}, Queue: {queue_size}, Best: {best_quality:.3f}, Time: {elapsed:.1f}s")
runtime = time.time() - start_time
if self.verbose > 0:
print(f"Search finished in {runtime:.2f}s after {iteration} iterations.")
print(f"Best: {best_hypothesis.name} (Quality: {best_quality:.3f})")
return {
'Prediction': best_hypothesis.name,
'F-measure': best_quality,
'Runtime': runtime,
'Quality': best_quality
}
[docs]
def search_with_smart_init(self, pos: Set[str], neg: Set[str], top_k: int = 10) -> Dict:
"""
Search with smart initialization from neural predictions (model.py compatible).
This uses neural model predictions to guide the symbolic refinement search.
"""
if not self._is_trained:
return self.search(pos, neg, top_k=top_k)
start_time = time.time()
# Get neural predictions
idx_pos = torch.LongTensor([[self.instance_idx_mapping[i] for i in pos]])
idx_neg = torch.LongTensor([[self.instance_idx_mapping[i] for i in neg]])
with torch.no_grad():
sort_scores, sort_idxs = torch.topk(
self.model(idx_pos, idx_neg).flatten(),
min(top_k, len(self.target_class_expressions)),
largest=True
)
# Initialize priority queue with top predictions
top_predictions = SearchTree()
for idx_target in sort_idxs.detach().numpy():
target_ce = self.target_class_expressions[idx_target]
target_ce.quality = self.quality_func(
individuals=target_ce.str_individuals, pos=pos, neg=neg
)
top_predictions.put(target_ce)
# Refine top predictions
n = len(top_predictions)
exploration_budget = n # Explore same amount as initial predictions
refinements_explored = SearchTree()
while len(top_predictions) > (n * 0.99) and exploration_budget > 0:
current = top_predictions.get()
for refined in self.refinement_op.refine(current):
if refined.name in refinements_explored.gate or refined.name in top_predictions.gate:
continue
refined.quality = self.quality_func(
individuals=refined.str_individuals, pos=pos, neg=neg
)
exploration_budget -= 1
refinements_explored.put(refined)
top_predictions.put(refined)
if exploration_budget <= 0:
break
if exploration_budget <= 0:
break
best_pred = top_predictions.get()
runtime = time.time() - start_time
return {
'Prediction': best_pred.name,
'F-measure': best_pred.quality,
'Runtime': runtime,
'Quality': best_pred.quality
}
[docs]
def fit(self, learning_problem: PosNegLPStandard, max_runtime: Optional[int] = None):
"""
Fit the model to a learning problem (Ontolearn-compatible interface).
This now includes training the neural model and performing the search.
"""
if max_runtime:
self.max_runtime = max_runtime
# 1. Initialize components
self._initialize_instance_mapping()
self._initialize_refinement_operator()
# 2. Train the neural model (as a policy guide, though not fully used in this search impl)
pos_examples = [ind.str for ind in learning_problem.pos]
neg_examples = [ind.str for ind in learning_problem.neg]
self.train([(pos_examples, neg_examples)])
# 3. Perform the search to find the best complex expression
self._best_predictions = self.search(pos=set(pos_examples), neg=set(neg_examples))
return self
[docs]
def best_hypothesis(self) -> Optional[str]:
"""
Return the best hypothesis (Ontolearn-compatible interface).
Returns:
The best predicted class expression as a string
"""
if not self._is_trained or self._best_predictions is None:
return None
assert self.ns is not None, "Namespace must be set for OWL expression conversion"
return dl_to_owl_expression(self._best_predictions['Prediction'], self.ns)
[docs]
def best_hypothesis_quality(self) -> float:
"""
Return the quality of the best hypothesis.
Returns:
The F-measure/quality of the best prediction
"""
if not self._is_trained or self._best_predictions is None:
return 0.0
return self._best_predictions['Quality']
[docs]
def forward(self, xpos: torch.Tensor, xneg: torch.Tensor) -> torch.Tensor:
"""
Forward pass through the neural model.
Args:
xpos: Tensor of positive example indices
xneg: Tensor of negative example indices
Returns:
Predictions for target class expressions
"""
return self.model(xpos, xneg)
[docs]
def positive_expression_embeddings(self, individuals: List[str]) -> torch.Tensor:
"""
Get embeddings for positive individuals.
Args:
individuals: List of individual URIs
Returns:
Tensor of embeddings
"""
indices = torch.LongTensor([[self.instance_idx_mapping[i] for i in individuals]])
return self.model.positive_expression_embeddings(indices)
[docs]
def negative_expression_embeddings(self, individuals: List[str]) -> torch.Tensor:
"""
Get embeddings for negative individuals.
Args:
individuals: List of individual URIs
Returns:
Tensor of embeddings
"""
indices = torch.LongTensor([[self.instance_idx_mapping[i] for i in individuals]])
return self.model.negative_expression_embeddings(indices)
[docs]
def downward_refine(self, expression, max_length: Optional[int] = None) -> Set:
"""
Top-down/downward refinement operator from original NERO.
This implements the refinement logic from model.py:
∀s ∈ StateSpace : ρ(s) ⊆ {s^i ∈ StateSpace | s^i ⊑ s}
Args:
expression: Expression to refine
max_length: Maximum length constraint for refinements
Returns:
Set of refined expressions
"""
if isinstance(expression, str):
if expression == '⊤':
return set() # Can return top_refinements if needed
return set()
refinements = set()
# Get expression from refinement operator's expressions dict
if hasattr(expression, 'name') and expression.name in self.refinement_op.expressions:
expression = self.refinement_op.expressions[expression.name]
# Get all refinements from the refinement operator
refined_list = self.refinement_op.refine(expression)
for ref in refined_list:
if max_length is None or ref.length <= max_length:
refinements.add(ref)
return refinements
[docs]
def upward_refine(self, expression) -> Set:
"""
Bottom-up/upward refinement operator from original NERO.
This implements the generalization logic:
∀s ∈ StateSpace : ρ(s) ⊆ {s^i ∈ StateSpace | s ⊑ s^i}
Args:
expression: Expression to generalize
Returns:
Set of generalized expressions
"""
# Note: Upward refinement is less commonly used in NERO's main search
# It's included for completeness but the main search uses downward refinement
refinements = set()
# This would require implementing upward refinement in the refinement operator
# For now, return empty set as it's not critical for the main search functionality
return refinements
[docs]
def search_with_init(self, top_prediction_queue: SearchTree, set_pos: Set[str], set_neg: Set[str]) -> SearchTree:
"""
Standard search with smart initialization (from original model.py).
This is the key search method that combines neural predictions with symbolic refinement.
Args:
top_prediction_queue: Priority queue initialized with neural predictions
set_pos: Set of positive examples
set_neg: Set of negative examples
Returns:
SearchTree with explored and refined expressions
"""
# Initialize final predictions
top_predictions = SearchTree()
top_predictions.extend_queue(top_prediction_queue)
refinements_of_top_predictions = SearchTree()
n = len(top_prediction_queue)
exploration_counter = n
# Iterate over advantageous states
while len(top_prediction_queue) > (n * 0.99): # explore only 1 percent
# Get top ranked Description Logic Expression
c = top_prediction_queue.get()
# Refine with length constraint
for a in self.downward_refine(c, max_length=c.length + 3):
if a.name in refinements_of_top_predictions.gate or a.name in top_predictions.gate:
# Already seen
continue
else:
a.quality = self.quality_func(
individuals=a.str_individuals,
pos=set_pos,
neg=set_neg
)
# Add refinements
exploration_counter -= 1
refinements_of_top_predictions.put(a, key=-a.quality)
top_predictions.put(a, key=-a.quality)
if exploration_counter == 0:
break
if exploration_counter == 0:
break
return top_predictions
[docs]
def fit_from_iterable(self, pos: List[str], neg: List[str], top_k: int = 10, use_search: str = 'SmartInit') -> Dict:
"""
Fit method compatible with original NERO's model.py interface.
This implements the complete prediction pipeline from the original NERO:
1. Neural prediction to get top-k candidates
2. Quality evaluation
3. Optional symbolic search for refinement
Args:
pos: List of positive example URIs
neg: List of negative example URIs
top_k: Number of top neural predictions to consider
use_search: Search strategy ('SmartInit', 'None', or None)
Returns:
Dictionary with prediction results
"""
if not self._is_trained:
raise ValueError("Model must be trained before calling fit_from_iterable")
start_time = time.time()
set_pos, set_neg = set(pos), set(neg)
# Get neural predictions
idx_pos = torch.LongTensor([[self.instance_idx_mapping[i] for i in pos]])
idx_neg = torch.LongTensor([[self.instance_idx_mapping[i] for i in neg]])
goal_found = False
top_prediction_queue = SearchTree()
# Get top-k predictions from neural model
with torch.no_grad():
sort_scores, sort_idxs = torch.topk(
self.forward(xpos=idx_pos, xneg=idx_neg).flatten(),
min(top_k, len(self.target_class_expressions)),
largest=True
)
# Evaluate predictions
for idx_target in sort_idxs.detach().numpy():
target_ce = self.target_class_expressions[idx_target]
target_ce.quality = self.quality_func(
individuals=target_ce.str_individuals,
pos=set_pos,
neg=set_neg
)
top_prediction_queue.put(target_ce, key=-target_ce.quality)
if target_ce.quality == 1.0:
goal_found = True
break
assert len(top_prediction_queue) > 0
# If goal not found, perform search
if not goal_found:
if use_search == 'SmartInit':
best_pred = top_prediction_queue.get()
top_prediction_queue.put(best_pred, key=-best_pred.quality)
top_prediction_queue = self.search_with_init(
top_prediction_queue, set_pos, set_neg
)
best_constructed_expression = top_prediction_queue.get()
if best_constructed_expression.quality > best_pred.quality:
best_pred = best_constructed_expression
elif use_search == 'None' or use_search is None:
best_pred = top_prediction_queue.get()
else:
raise ValueError(f"Unknown search strategy: {use_search}")
else:
best_pred = top_prediction_queue.get()
runtime = time.time() - start_time
report = {
'Prediction': best_pred.name,
'Instances': best_pred.str_individuals,
'F-measure': round(best_pred.quality, 3),
'Runtime': round(runtime, 3),
'Quality': best_pred.quality
}
return report
[docs]
def predict(self, pos: Set[OWLNamedIndividual], neg: Set[OWLNamedIndividual], top_k: int = 10) -> Dict:
"""
Predict class expressions for given positive and negative examples.
This now uses the search mechanism.
"""
if not self._is_trained:
self.fit(PosNegLPStandard(pos=pos, neg=neg))
return self._best_predictions
[docs]
def __str__(self):
return f"NERO(architecture={self.neural_architecture}, embedding_dim={self.num_embedding_dim})"
[docs]
def __repr__(self):
return self.__str__()