from queue import PriorityQueue
from abc import ABC
from typing import Set, List
[docs]
class Role:
"""Represents an OWL object property/role."""
def __init__(self, *, name: str):
assert isinstance(name, str)
self.name = name
[docs]
def __str__(self):
return f'Role at {hex(id(self))} | {self.name}'
[docs]
def __repr__(self):
return self.__str__()
[docs]
class TargetClassExpression:
"""Represents a target class expression for neural training."""
def __init__(self, *, label_id, name: str, idx_individuals: Set = None,
expression_chain: List = None, length: int = None,
str_individuals: Set = None, _type=None):
self.label_id = label_id
self.name = name
self.idx_individuals = idx_individuals
self.str_individuals = str_individuals
self.type = _type
self.expression_chain = expression_chain
self.num_individuals = len(self.str_individuals) if self.str_individuals else 0
self.length = length
self.quality = None
@property
def size(self):
return self.num_individuals
[docs]
def __lt__(self, other):
return self.quality < other.quality
[docs]
def __str__(self):
return f'TargetClassExpression | {self.name} | Indv:{self.num_individuals} | Quality:{self.quality}'
[docs]
def __repr__(self):
return self.__str__()
[docs]
class ClassExpression(ABC):
"""Base class for class expressions."""
def __init__(self, *, name: str, str_individuals: Set, expression_chain: List,
owl_class=None, quality=None, length=None):
assert isinstance(name, str)
assert isinstance(str_individuals, set)
assert isinstance(expression_chain, (list, tuple))
self.name = name
self.str_individuals = str_individuals
self.expression_chain = expression_chain
self.num_individuals = len(self.str_individuals)
self.quality = quality if quality is not None else -1.0
self.owl_class = owl_class
self.length = length if length is not None else len(self.name.split())
self.type = 'class_expression' # Default type, overridden by subclasses
[docs]
def __str__(self):
return f'{self.type} | {self.name} | Indv:{self.num_individuals} | Quality:{self.quality:.3f}'
[docs]
def __repr__(self):
return self.__str__()
@property
def size(self):
return self.num_individuals
[docs]
def __lt__(self, other):
return self.quality < other.quality
[docs]
def __mul__(self, other):
"""Create intersection of two class expressions (A ⊓ B)"""
if self.length <= 2 and other.length <= 2:
name = f'{self.name} ⊓ {other.name}'
elif self.length <= 2 < other.length:
name = f'{self.name} ⊓ ({other.name})'
elif self.length > other.length <= 2:
name = f'({self.name}) ⊓ {other.name}'
elif self.length >= 2 and other.length >= 2:
name = f'({self.name}) ⊓ ({other.name})'
else:
name = f'{self.name} ⊓ {other.name}'
return IntersectionClassExpression(
name=name,
length=self.length + other.length + 1,
concepts=(self, other),
str_individuals=self.str_individuals.intersection(other.str_individuals),
expression_chain=((self.expression_chain + (self.name,)), 'AND',
(other.expression_chain + (other.name,)))
)
[docs]
def __add__(self, other):
"""Create union of two class expressions (A ⊔ B)"""
if self.length <= 2 and other.length <= 2:
name = f'{self.name} ⊔ {other.name}'
elif self.length <= 2 < other.length:
name = f'{self.name} ⊔ ({other.name})'
elif self.length > other.length <= 2:
name = f'({self.name}) ⊔ {other.name}'
elif self.length >= 2 and other.length >= 2:
name = f'({self.name}) ⊔ ({other.name})'
else:
name = f'{self.name} ⊔ {other.name}'
return UnionClassExpression(
name=name,
length=self.length + other.length + 1,
str_individuals=self.str_individuals.union(other.str_individuals),
concepts=(self, other),
expression_chain=((self.expression_chain + (self.name,)), 'OR',
(other.expression_chain + (other.name,)))
)
[docs]
class AtomicExpression(ClassExpression):
"""Represents an atomic class expression."""
def __init__(self, *, name: str, str_individuals: Set, expression_chain: List,
owl_class=None, quality=None, label_id=None, idx_individuals=None):
super().__init__(name=name, str_individuals=str_individuals,
expression_chain=expression_chain, quality=quality, owl_class=owl_class)
self.length = 1
self.type = 'atomic_expression'
self.idx_individuals = idx_individuals
self.label_id = label_id
[docs]
class ComplementOfAtomicExpression(ClassExpression):
"""Represents a negated atomic class expression."""
def __init__(self, *, name: str, atomic_expression, str_individuals: Set,
expression_chain: List, quality=None, owl_class=None,
label_id=None, idx_individuals=None):
super().__init__(name=name, str_individuals=str_individuals,
expression_chain=expression_chain, quality=quality, owl_class=owl_class)
self.atomic_expression = atomic_expression
self.length = 2
self.type = 'negated_expression'
self.label_id = label_id
self.idx_individuals = idx_individuals
[docs]
class UniversalQuantifierExpression(ClassExpression):
"""Represents a universal quantifier expression (∀)."""
def __init__(self, *, name: str, role=None, filler=None, label_id=None,
idx_individuals=None, str_individuals: Set, expression_chain: List, quality=None):
assert isinstance(name, str)
assert isinstance(str_individuals, set)
assert isinstance(expression_chain, (list, tuple))
super().__init__(name=name, str_individuals=str_individuals,
expression_chain=expression_chain, quality=quality)
self.role = role
self.filler = filler
self.type = "universal_quantifier_expression"
self.label_id = label_id
self.idx_individuals = idx_individuals
self.length = 3
[docs]
class ExistentialQuantifierExpression(ClassExpression):
"""Represents an existential quantifier expression (∃)."""
def __init__(self, *, name: str, role=None, filler=None, str_individuals: Set,
expression_chain: List, quality=None, label_id=None, idx_individuals=None):
assert isinstance(name, str)
assert isinstance(str_individuals, set)
assert isinstance(expression_chain, (list, tuple))
super().__init__(name=name, str_individuals=str_individuals,
expression_chain=expression_chain, quality=quality)
self.role = role
self.filler = filler
self.type = "existantial_quantifier_expression"
self.label_id = label_id
self.idx_individuals = idx_individuals
self.length = 3
[docs]
class IntersectionClassExpression(ClassExpression):
"""Represents an intersection of class expressions."""
def __init__(self, *, name: str, length: int, str_individuals: Set,
expression_chain: List, owl_class=None, quality=None,
label_id=None, concepts=None, idx_individuals=None):
super().__init__(name=name, str_individuals=str_individuals,
expression_chain=expression_chain, quality=quality, owl_class=owl_class)
assert length >= 3
self.length = length
self.type = 'intersection_expression'
self.label_id = label_id
self.idx_individuals = idx_individuals
self.concepts = concepts
[docs]
class UnionClassExpression(ClassExpression):
"""Represents a union of class expressions."""
def __init__(self, *, name: str, length: int, str_individuals: Set,
expression_chain: List, owl_class=None, concepts=None,
quality=None, label_id=None, idx_individuals=None):
super().__init__(name=name, str_individuals=str_individuals,
expression_chain=expression_chain, quality=quality, owl_class=owl_class)
assert length >= 3
self.length = length
self.type = 'union_expression'
self.label_id = label_id
self.idx_individuals = idx_individuals
self.concepts = concepts
[docs]
class State:
def __init__(self, quality: float, tce: TargetClassExpression, str_individuals: set):
self.quality = quality
self.tce = tce
self.name = self.tce.name
self.str_individuals = str_individuals
[docs]
def __lt__(self, other):
return self.quality < other.quality
[docs]
def __str__(self):
return f'{self.tce} | Quality:{self.quality:.3f}'
[docs]
class SearchTree:
"""Priority queue for managing search states."""
def __init__(self, maxsize=0):
self.items_in_queue = PriorityQueue(maxsize)
self.gate = dict()
[docs]
def __contains__(self, key):
return key in self.gate
[docs]
def put(self, expression, key=None, condition=None):
if condition is None:
if expression.name not in self.gate:
if key is None:
key = -expression.quality
# The lowest valued entries are retrieved first
self.items_in_queue.put((key, expression))
self.gate[expression.name] = expression
else:
raise ValueError('Define the condition')
[docs]
def get(self):
_, expression = self.items_in_queue.get(timeout=1)
del self.gate[expression.name]
return expression
[docs]
def get_all(self):
return list(self.gate.values())
[docs]
def __len__(self):
return len(self.gate)
[docs]
def __iter__(self):
# No garantie of returing best
return (exp for q, exp in self.items_in_queue.queue)
[docs]
def extend_queue(self, other) -> None:
"""
Extend queue with other queue.
:param other:
"""
for i in other:
self.put(i)