from dataclasses import dataclass
import functools
from typing import Any
from lxml import etree
from ontolearn.learners.spell_kit import o2p_ontology, o2p_owl_parser
from ontolearn.learners.spell_kit.o2p_ontology import (
ClassIdentifier,
Intersection,
NameFactory,
Ontology,
Restriction,
SomeValues,
SubClassOf,
Thing,
TopClass,
)
namespaces = {
"owl": "http://www.w3.org/2002/07/owl#",
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
}
Signature = tuple[list[str], list[str]]
[docs]
@dataclass(slots=True)
class Structure:
max_ind: int
cn_ext: dict[str, set[int]]
rn_ext: dict[int, set[tuple[int, str]]]
indmap: dict[str, int]
nsmap: dict[str | None, str]
[docs]
def ind(A: Structure) -> range:
return range(A.max_ind)
[docs]
def conceptnames(sigma: Signature) -> list[str]:
return sigma[0]
[docs]
def rolenames(sigma: Signature) -> list[str]:
return sigma[1]
[docs]
def conceptname_ext(A: Structure, cn: str) -> set[int]:
return A.cn_ext[cn]
[docs]
def expand_namespace(namespace: str, item: str):
return "{%s}%s" % (namespaces.get(namespace), item)
[docs]
@functools.cache
def tag2name(tag: str):
q = etree.QName(tag)
res = "{}{}".format(q.namespace, q.localname)
return res
[docs]
def name2sparql(name: str):
name = name.replace("{", "")
name = name.replace("}", "")
return "<{}>".format(name)
[docs]
def expand_curie(curie, nsmap):
assert ":" in curie
s = curie.split(":")
assert s[0] in nsmap
s[0] = nsmap[s[0]]
return "".join(s)
[docs]
def map_ind_name(A: Structure, name: str) -> int:
if "://" not in name and ":" in name:
name = expand_curie(name, A.nsmap)
return A.indmap[name]
[docs]
def add_ns(n: str):
a = n.split("/")
an = a[: len(a) - 1]
return "{{{}/}}{}".format("/".join(an), a[len(a) - 1])
[docs]
class ABoxBuilder:
A: Structure
indmap: dict[str, int]
role_names = set()
def __init__(self):
self.indmap = {}
self.A = Structure(max_ind=0, cn_ext={}, rn_ext={}, indmap={}, nsmap={})
[docs]
def map_ind(self, a: str):
if a not in self.indmap:
n = self.A.max_ind
self.indmap[a] = n
self.A.max_ind += 1
self.A.rn_ext[n] = set()
self.A.indmap[a] = n
return self.indmap[a]
[docs]
def declare_cn(self, cn):
assert "{" not in cn
if cn not in self.A.cn_ext:
self.A.cn_ext[cn] = set()
return
[docs]
def declare_rn(self, rn):
self.role_names.add(rn)
return
[docs]
def concept_assertion(self, a: int, concept: str):
self.declare_cn(concept)
self.A.cn_ext[concept].add(a)
[docs]
def role_assertion(self, idx1: int, ind2: str, role: str):
assert "{" not in role
idx2 = self.indmap.get(ind2)
if not idx2:
idx2 = self.map_ind(ind2)
self.A.rn_ext[idx1].add((idx2, role))
tag_onto = expand_namespace("owl", "Ontology")
tag_ni = expand_namespace("owl", "NamedIndividual")
tag_type = expand_namespace("rdf", "type")
tag_class = expand_namespace("owl", "Class")
tag_thing = expand_namespace("owl", "Thing")
tag_object_prop = expand_namespace("owl", "ObjectProperty")
tag_data_prop = expand_namespace("owl", "DatatypeProperty")
tag_annotation_prop = expand_namespace("owl", "AnnotationProperty")
attr_resource = expand_namespace("rdf", "resource")
attr_about = expand_namespace("rdf", "about")
attr_datatype = expand_namespace("rdf", "datatype")
[docs]
def make_res_absolute(nsmap: dict[Any, str], res: str) -> str:
if res[0] == "#":
assert None in nsmap
return nsmap[None] + res[1:]
else:
return res
[docs]
def load_owl(file: str):
reader = o2p_owl_parser.OWLReader("")
onto = o2p_ontology.Ontology()
facts = 0
nsmap = {}
abox = ABoxBuilder()
num = len(abox.indmap)
print("Loaded {} individuals".format(num), end="\r")
abox.declare_cn("http://www.w3.org/2002/07/owl#NamedIndividual")
for _, elem in etree.iterparse(file, events=("end",), remove_blank_text=True):
# We are only interested in top-level statements
if elem.getparent() != None and elem.getparent().getparent() != None:
continue
if elem.tag == tag_onto:
nsmap = elem.nsmap
if elem.tag == tag_class:
if attr_about in elem.attrib:
abox.declare_cn(make_res_absolute(nsmap, elem.attrib[attr_about]))
for r in reader.parse_rule(elem):
onto.add_rule(r)
elif elem.tag == tag_object_prop:
abox.declare_rn(make_res_absolute(nsmap, elem.attrib[attr_about]))
onto.add_property(reader.parse_property(elem))
elem.clear()
elif elem.tag == tag_data_prop:
# TODO: handle dataproperties here
elem.clear()
elif elem.tag == tag_annotation_prop:
elem.clear()
elif (
elem.tag == tag_ni
or elem.tag == tag_thing
or (attr_about in elem.attrib and elem.tag != tag_onto)
):
a = make_res_absolute(nsmap, elem.attrib[attr_about])
ind_idx = abox.map_ind(a)
if elem.tag != tag_ni and elem.tag != tag_thing:
facts += 1
abox.concept_assertion(ind_idx, tag2name(elem.tag))
num = len(abox.indmap)
if num % 100000 == 0:
print("\rLoaded {} individuals".format(num), end="\r")
for child in elem:
if child.tag in tag_type:
# TODO: handle complex concepts here
if attr_resource not in child.attrib:
continue
conceptname = make_res_absolute(nsmap, child.attrib[attr_resource])
facts += 1
abox.concept_assertion(ind_idx, conceptname)
elif attr_datatype in child.attrib:
# TODO: handle dataproperties here
continue
elif attr_resource in child.attrib:
role = tag2name(child.tag)
other = make_res_absolute(nsmap, child.attrib[attr_resource])
facts += 1
if role in abox.role_names:
abox.role_assertion(ind_idx, other, role)
elem.clear()
num = len(abox.indmap)
abox.A.nsmap = nsmap
print("\rLoaded {} individuals and {} facts".format(num, facts))
return onto, abox
[docs]
@functools.cache
def structure_from_owl(file) -> Structure:
onto, abox = load_owl(file)
tbox = construct_normalized_tbox(onto)
tbox.saturate()
compact_canonical_model(abox, tbox)
return abox.A
# ELTBox
[docs]
class TBox:
fresh_names: set[str]
implic: dict[str, set[str]]
conjs: dict[str, set[frozenset[str]]]
top: str
cns: set[str]
rns: set[str]
rBrhs: dict[str, set[tuple[str, str]]]
rBlhs: dict[str, set[tuple[str, str]]]
range_cn_ctr: int
ranges: dict[str, set[str]]
role_incs: dict[str, set[str]]
def __init__(self, top: str):
self.fresh_names = set()
self.implic = {}
self.conjs = {}
self.top = top
self.rBrhs = {}
self.rBlhs = {}
self.ranges = {}
self.range_cn_ctr = 0
self.role_incs = {}
self.cns = set()
self.rns = set()
self.register_cn(top)
self.register_rn(tag2name(expand_namespace("owl", "sameAs")))
[docs]
def non_empty_conjs(self):
return {A for A in self.conjs.keys() if len(self.conjs[A]) > 0}
[docs]
def non_empty_lhs(self):
return {A for A in self.rBlhs.keys() if len(self.rBlhs[A]) > 0}
[docs]
def non_empty_rhs(self):
return {A for A in self.rBrhs.keys() if len(self.rBrhs[A]) > 0}
[docs]
def register_cn(self, A: str):
if A not in self.cns:
assert "{" not in A
self.cns.add(A)
self.implic[A] = set([A, self.top])
self.conjs[A] = set()
self.rBrhs[A] = set()
self.rBlhs[A] = set()
[docs]
def register_rn(self, r: str):
if r not in self.rns:
self.rns.add(r)
self.ranges[r] = set()
self.role_incs[r] = set([r])
[docs]
def add_axiom1(self, A: str, B: str):
self.register_cn(A)
self.register_cn(B)
self.implic[A].add(B)
[docs]
def add_axiom2(self, A1: str, A2: str, B: str):
self.register_cn(A1)
self.register_cn(A2)
self.register_cn(B)
self.conjs[B].add(frozenset([A1, A2]))
[docs]
def add_axiom3(self, A: str, r: str, B: str):
self.register_cn(A)
self.register_cn(B)
self.register_rn(r)
self.rBrhs[A].add((r, B))
[docs]
def add_axiom4(self, r: str, A: str, B: str):
self.register_cn(A)
self.register_cn(B)
self.register_rn(r)
self.rBlhs[B].add((r, A))
[docs]
def add_range_restriction(self, r: str, A: str):
self.register_cn(A)
self.register_rn(r)
self.ranges[r].add(A)
[docs]
def add_role_inc(self, r: str, s: str):
self.register_rn(r)
self.register_rn(s)
self.role_incs[r].add(s)
[docs]
def fresh_cn(self) -> str:
self.range_cn_ctr += 1
name = "Fresh#R{}".format(self.range_cn_ctr)
self.fresh_names.add(name)
return name
[docs]
def saturate_role_incs(self):
for r in self.rns:
if r not in self.role_incs:
self.role_incs[r] = set()
change = True
while change:
change = False
for r in self.rns:
toAdd = set()
for s in self.role_incs[r]:
for t in self.role_incs[s]:
if t not in self.role_incs[r]:
change = True
toAdd.add(t)
self.role_incs[r] |= toAdd
# Add implied range restrictions
for r in self.rns:
for s in self.role_incs[r]:
for A in self.ranges[s]:
self.add_range_restriction(r, A)
# Add implied domain restrictions
for B in self.cns:
toAdd = set()
for r, A in self.rBlhs[B]:
for s in self.rns:
if r != s and r in self.role_incs[s]:
toAdd.add((s, A))
self.rBlhs[B] |= toAdd
[docs]
def saturate(self):
self.saturate_role_incs()
# Extra rule to handle range restrictions
# From
# A \sqsubseteq \exists r.B and \exists r-.\top implies C
# It follows that
# A \sqsubseteq \exists r.X and X \sqsubseteq B \sqcap C
for A, S in list(self.rBrhs.items()):
to_add = set()
for r, B in S:
if r not in self.ranges.keys():
continue
X = self.fresh_cn()
self.add_axiom1(X, B)
for C in self.ranges[r]:
self.add_axiom1(X, C)
to_add.add((r, X))
for r, X in to_add:
self.add_axiom3(A, r, X)
# TODO: implement faster algorithm
change = True
while change == True:
change = False
# CR3
for A1 in self.cns:
add = set()
for A2 in self.implic[A1]:
for A3 in self.implic[A2]:
if A3 not in self.implic[A1]:
add.add(A3)
if len(add) > 0:
change = True
self.implic[A1] |= add
# CR4
for B in self.non_empty_conjs():
for A in self.cns:
if B not in self.implic[A]:
for s in self.conjs[B]:
if s.issubset(self.implic[A]):
self.add_axiom1(A, B)
change = True
# CR5
for A in self.non_empty_rhs():
for B in self.non_empty_lhs():
if B not in self.implic[A]:
for r, A1 in self.rBrhs[A]:
for r2, B1 in self.rBlhs[B]:
if r == r2 and B1 in self.implic[A1]:
self.add_axiom1(A, B)
change = True
[docs]
def construct_normalized_tbox(onto: Ontology):
ignored_rules = 0
ignored_domain = 0
ignored_range = 0
cis = 0
onto = onto.normalize()
# TODO: functional roles, reasoning rules for ELI, etc
# TODO: inverse roles in ABox
t = TBox(tag2name(expand_namespace("owl", "Thing")))
t.fresh_names = set(NameFactory.created_names)
for rule in onto.rules:
if type(rule) != SubClassOf:
ignored_rules += 1
# print("Ignoring tbox rule {}".format(rule))
continue
if type(rule.subject) == ClassIdentifier:
if type(rule.object) == ClassIdentifier:
cis += 1
t.add_axiom1(rule.subject.identifier, rule.object.identifier)
if type(rule.object) == Restriction:
if (
type(rule.object.quantifier) == SomeValues
and rule.object.prop.identifier != None
):
role = rule.object.prop.identifier
B = rule.object.quantifier.from_class.identifier
cis += 1
t.add_axiom3(rule.subject.identifier, role, B)
else:
ignored_rules += 1
# print("Ignoring tbox rule with rhs {}".format(rule.object))
pass
# TODO handle inverse roles here
elif type(rule.subject) == Restriction:
if (
type(rule.subject.quantifier) == SomeValues
and rule.subject.prop.identifier != None
):
role = rule.subject.prop.identifier
B = rule.subject.quantifier.from_class.identifier
cis += 1
t.add_axiom4(role, B, rule.object.identifier)
else:
ignored_rules += 1
# print("Ignoring tbox rule with lhs {}".format(rule.subject))
# Inverse roles here
elif type(rule.subject) == Intersection:
assert len(rule.subject.children) == 2
A1 = rule.subject.children[0].identifier
A2 = rule.subject.children[1].identifier
B = rule.object.identifier
cis += 1
t.add_axiom2(A1, A2, B)
else:
ignored_rules += 1
# print("Ignoring tbox rule with lhs {}".format(rule.subject))
for r in onto.properties:
t.register_rn(r.identifier)
for s in onto.subproperties:
t.add_role_inc(s.subject.identifier, s.object.identifier)
for a, b in onto.property_domains:
if type(b) != ClassIdentifier:
ignored_domain += 1
# print("Ignoring domain restriction with concept {}".format(b))
continue
role = a
B = t.top
A = b.identifier
t.add_axiom4(role, B, A)
for a, b in onto.property_ranges:
if type(b) == Thing or type(b) == TopClass:
continue
if type(b) != ClassIdentifier:
ignored_range += 1
# print("Ignoring range restriction with concept {}".format(b))
continue
t.add_range_restriction(a, b.identifier)
if ignored_rules > 0:
print(
"Ignoring {} TBox statements due to unsupported features".format(
ignored_rules
)
)
if ignored_domain > 0:
print(
"Ignoring {} domain restrictions due to unsupported features".format(
ignored_domain
)
)
if ignored_range > 0:
print(
"Ignoring {} range restrictions due to unsupported features".format(
ignored_range
)
)
print(
"Loaded {} concept names, {} role names, {} concept inclusions".format(
len(t.cns), len(t.rns), cis
)
)
return t
[docs]
def compact_canonical_model(abox: ABoxBuilder, tbox: TBox):
# TODO refactor. Should be more obviously correct
for cn in tbox.cns:
abox.declare_cn(cn)
# Saturate concept names in ABox
for A in tbox.implic.keys():
for B in tbox.implic[A]:
if A == B:
continue
for a in conceptname_ext(abox.A, A):
if a in conceptname_ext(abox.A, B):
continue
abox.concept_assertion(a, B)
# Apply range restrictions to ABox
for a in ind(abox.A):
for b, r in abox.A.rn_ext[a]:
if r in tbox.ranges.keys():
for B in tbox.ranges[r]:
abox.concept_assertion(b, B)
rev_succs: dict[int, dict[str, set[int]]] = {b: {} for b in ind(abox.A)}
for a in ind(abox.A):
for b, r in abox.A.rn_ext[a]:
if r not in rev_succs[b]:
rev_succs[b][r] = set()
rev_succs[b][r].add(a)
# Propagate concept names through ABox
# TODO faster algorithm for ABox saturation
change = True
while change:
change = False
for A, S in tbox.rBlhs.items():
for r, B in S:
B_ext = set(conceptname_ext(abox.A, B))
for b in B_ext:
if r not in rev_succs[b]:
continue
for a in rev_succs[b][r]:
if a not in conceptname_ext(abox.A, A):
change = True
for C in tbox.implic[A]:
abox.concept_assertion(a, C)
# Create an anonymous individual for every \exists r. B on the rhs CIs
for A, S in tbox.rBrhs.items():
for r, B in S:
idx = abox.map_ind(B)
for C in tbox.implic[B]:
abox.concept_assertion(idx, C)
# Connect anonymous individuals
for A, S in tbox.rBrhs.items():
for r, B in S:
for a in conceptname_ext(abox.A, A):
abox.role_assertion(a, B, r)
# Saturate with role inclusions
for a in ind(abox.A):
toadd = set()
for b, r in abox.A.rn_ext[a]:
for s in tbox.role_incs[r]:
toadd.add((b, s))
abox.A.rn_ext[a] |= toadd
# Remove fresh concept names from model
for A in tbox.fresh_names:
abox.A.cn_ext[A] = set()
[docs]
def structure_to_dot(A: Structure, indmap: dict[str, int]):
print("digraph D {")
for name, val in indmap.items():
if "#" in name:
print('N{} [label="{}"];'.format(val, name.split("#")[1]))
else:
print('N{} [label="{}"];'.format(val, name))
for a in ind(A):
for b, r in A.rn_ext[a]:
if "#" in r:
r = r.split("#")[1]
print('N{} -> N{} [label="{}"];'.format(a, b, r))
print("}")
[docs]
def not_owl_thing(cn):
return "/Thing>" not in cn and "#Thing>" not in cn
[docs]
def solution2sparql(q: Structure):
clauses: list[str] = []
for a in ind(q):
for cn in q.cn_ext.keys():
if a in q.cn_ext[cn] and not_owl_thing(name2sparql(cn)):
clauses.append("?{} a {} .".format(a, name2sparql(cn)))
for b, rn in q.rn_ext[a]:
clauses.append("?{} {} ?{} .".format(a, name2sparql(rn), b))
if len(clauses) == 0:
clauses.append("?0 a <http://www.w3.org/2002/07/owl#Thing> .")
return "SELECT DISTINCT ?0 WHERE {{\n {}\n}}".format("\n ".join(clauses))
# Returns A restricted to individuals that can be reached in k steps from a
# Renames individuals
[docs]
def restrict_to_neighborhood(k: int, A: Structure, starts: list[int]):
cns = [cn for cn in A.cn_ext.keys() if A.cn_ext[cn]]
# This has its own distance calculation to avoid computing the distance
# for the entirety of A
inds = set(starts)
dist = {a: 0 for a in starts}
for r in range(k):
step = set()
for i1 in inds:
for i2, rn in A.rn_ext[i1]:
step.add(i2)
inds = inds.union(step)
for i in step:
if i in dist:
dist[i] = min(r + 1, dist[i])
else:
dist[i] = r + 1
mapping = {old_ind: new_ind for (new_ind, old_ind) in enumerate(inds)}
n_indmap = {
name: mapping[old_ind]
for name, old_ind in A.indmap.items()
if old_ind in mapping
}
B = Structure(
max_ind=len(inds),
cn_ext={cn: set() for cn in cns},
rn_ext={a: set() for a in range(len(inds))},
indmap=n_indmap,
nsmap=A.nsmap,
)
for cn in cns:
B.cn_ext[cn] = {mapping[ind] for ind in A.cn_ext[cn] & inds}
for i1 in inds:
B.rn_ext[mapping[i1]] = set()
for i2, rn in A.rn_ext[i1]:
if i2 in inds and dist[i1] < k:
B.rn_ext[mapping[i1]].add((mapping[i2], rn))
return (B, mapping)
[docs]
def generate_all_trees(order: int):
layout = list(range(order))
while layout is not None:
yield levels_to_preds(layout)
layout = next_rooted_tree(layout)
[docs]
def next_rooted_tree(predecessor: list[int]):
p = len(predecessor) - 1
while predecessor[p] == 1:
p -= 1
if p == 0:
return None
q = p - 1
while predecessor[q] != predecessor[p] - 1:
q -= 1
result = list(predecessor)
for i in range(p, len(result)):
result[i] = result[i - p + q]
return result
[docs]
def levels_to_preds(layout: list[int]) -> list[int]:
result = [0] * (len(layout) - 1)
stack = []
for i in range(len(layout)):
if stack:
while layout[stack[-1]] >= layout[i]:
stack.pop()
result[i - 1] = stack[-1]
stack.append(i)
return result
[docs]
def copy_structure(A: Structure) -> Structure:
cns = {}
for cn in A.cn_ext.keys():
cns[cn] = set(A.cn_ext[cn])
rns = {}
for a in ind(A):
rns[a] = set(A.rn_ext[a])
# TODO not a deep copy
return Structure(
max_ind=A.max_ind, cn_ext=cns, rn_ext=rns, indmap=A.indmap, nsmap=A.nsmap
)