Source code for ontolearn.concept_abstract_syntax_tree

from abc import ABC, abstractmethod
import random
from random import choice
from typing import List, Optional
from ontolearn.knowledge_base import KnowledgeBase
from owlapy.render import DLSyntaxObjectRenderer
from owlapy.class_expression import (OWLClass, OWLClassExpression, OWLObjectUnionOf, OWLObjectIntersectionOf, 
                                     OWLObjectSomeValuesFrom, OWLObjectAllValuesFrom, OWLObjectComplementOf, OWLCardinalityRestriction)
import pathlib
import json

[docs] class Expr(ABC):
[docs] @abstractmethod def to_string(self): pass
[docs] @abstractmethod def to_dict(self): pass
[docs] class Atoms(Expr): def __init__(self, name): self.name = name
[docs] def to_string(self): return self.name
[docs] def __repr__(self): return self.to_string()
[docs] def to_dict(self): return {"type": OWLClass.__name__, "name": self.name}
[docs] class Not(Expr): def __init__(self, expr: Expr): self.expr = expr
[docs] def to_string(self): return f{self.expr.to_string()}"
[docs] def __repr__(self): return self.to_string()
[docs] def to_dict(self): return {"type": OWLObjectComplementOf.__name__, "expr": self.expr.to_dict()}
[docs] class And(Expr): def __init__(self, left: Expr, right: Expr): self.left = left self.right = right
[docs] def to_string(self): return f"({self.left.to_string()}{self.right.to_string()})"
[docs] def __repr__(self): return self.to_string()
[docs] def to_dict(self): return {"type": OWLObjectIntersectionOf.__name__, "left": self.left.to_dict(), "right": self.right.to_dict()}
[docs] class Or(Expr): def __init__(self, left: Expr, right: Expr): self.left = left self.right = right
[docs] def to_string(self): return f"({self.left.to_string()}{self.right.to_string()})"
[docs] def __repr__(self): return self.to_string()
[docs] def to_dict(self): return {"type": OWLObjectUnionOf.__name__, "left": self.left.to_dict(), "right": self.right.to_dict()}
[docs] class Exists(Expr): def __init__(self, role: str, filler: Expr): self.role = role self.filler = filler
[docs] def to_string(self): return f"∃{self.role}.{self.filler.to_string()}"
[docs] def __repr__(self): return self.to_string()
[docs] def to_dict(self): return {"type": OWLObjectSomeValuesFrom.__name__, "role": self.role, "filler": self.filler.to_dict()}
[docs] class Forall(Expr): def __init__(self, role: str, filler: Expr): self.role = role self.filler = filler
[docs] def to_string(self): return f"∀{self.role}.{self.filler.to_string()}"
[docs] def __repr__(self): return self.to_string()
[docs] def to_dict(self): return {"type": OWLObjectAllValuesFrom.__name__, "role": self.role, "filler": self.filler.to_dict()}
[docs] class Cardinality(Expr): def __init__(self, kind: str, n: int, role: str, filler: Expr): self.kind = kind self.n = n self.role = role self.filler = filler
[docs] def to_string(self): return f"{self.kind}{self.n} {self.role}.{self.filler.to_string()}"
[docs] def __repr__(self): return f"({self.kind}{self.n} {self.role}.{self.filler})"
[docs] def to_dict(self): return { "type": OWLCardinalityRestriction.__name__, "kind": self.kind, "n": self.n, "role": self.role, "filler": self.filler.to_dict() }
[docs] class ConceptAbstractSyntaxTreeBuilder: def __init__(self, knowledge_base:KnowledgeBase, max_length: Optional[int] = None): assert isinstance(knowledge_base, KnowledgeBase) and "A knowledge base instance is required" self.knowledge_base = knowledge_base self.max_length = max_length ontology = self.knowledge_base.ontology atoms_concepts = list(ontology.classes_in_signature()) self.unique_atom_concept_names = {'⊤', '⊥'}.union({DLSyntaxObjectRenderer().render(atom) for atom in atoms_concepts}) # self.atom_concepts_with_negation = self.unique_atom_concept_names | {("¬", atom) for atom in self.unique_atom_concept_names} self.unique_roles = {relation.iri.get_remainder() for relation in ontology.object_properties_in_signature()} self.negation = {"¬"} self.binary_ops = {"⊓", "⊔"} self.quantifiers = {"∃", "∀"} self.cardinals = {"≤", "≥"} self.parenthesis = {"(", ")"} self.dot = {'.'} self.digits = {str(i) for i in range(10)} # TODO: handle concrete roles and other extended vocabs self.vocabs = self.unique_atom_concept_names | self.unique_roles | self.binary_ops | self.negation | self.quantifiers | self.parenthesis | self.dot | self.cardinals | self.digits self.atom_concepts_with_negation = None def _negate_unique_atomic_concepts(self, replace_with_negation=False): if replace_with_negation: return self.unique_atom_concept_names | {("¬", atom) for atom in self.unique_atom_concept_names} return self.unique_atom_concept_names def _current_token(self): return self.tokens[self.index] if self.index < self.length else None def _advance(self): self.index += 1 def _sanitize_tokens(self, tokens): return [token for token in (t.strip() for t in tokens) if token] def _strip_trailing_parentheses(self, concept_str:str): if concept_str.startswith('(') and concept_str.endswith(')'): concept_str = concept_str[1:-1] return concept_str def _fix_mid_tokens_errors(self, tokens: list[str]) -> list[str]: container = [] i = 0 while i < len(tokens): prev_token = tokens[i - 1] if i - 1 >= 0 else None token = tokens[i] next_token = tokens[i + 1] if i + 1 < len(tokens) else None next_next_token = tokens[i + 2] if i + 2 < len(tokens) else None if (prev_token == '(' and token in self.binary_ops and next_token == '.' and next_next_token): if next_next_token in self.unique_atom_concept_names | self.negation: i += 2 else: container.append(choice(list(self.unique_atom_concept_names))) container.append(token) i += 1 i += 1 continue if prev_token in self.unique_roles and token == '(': if next_token: if next_token in self.unique_atom_concept_names | self.negation: container.append('.') container.append(token) elif next_token in self.dot and next_next_token and next_next_token in self.unique_atom_concept_names | self.negation: i += 1 container.append('.') container.append(token) i += 1 continue if prev_token == '(' and token == '.' and next_token == ')': container.append(choice(list(self.unique_atom_concept_names))) i += 1 continue if token == prev_token and token in self.binary_ops | self.quantifiers | self.dot: i += 1 continue if prev_token in self.binary_ops and token in self.binary_ops: i += 1 continue if token == '.' and prev_token in {'(', ')'}: i += 1 continue if prev_token == '(' and token in self.binary_ops: i += 1 continue if prev_token in self.binary_ops and token == ')': container.pop() container.append(token) i += 1 continue if prev_token == ')' and token == '(': container.append(choice(list(self.binary_ops))) container.append(token) i += 1 continue if (prev_token == ')' and token not in self.binary_ops and next_token == '('): container.append(choice(list(self.binary_ops))) i += 1 continue if (prev_token in self.unique_atom_concept_names | {')'} and token in self.unique_atom_concept_names | self.negation | {'('} | self.dot): container.append(choice(list(self.binary_ops))) if token in self.dot: i += 1 else: container.append(token) i += 1 continue if prev_token in self.binary_ops and token in self.quantifiers and next_token in self.binary_ops: container.append(choice(list(self.unique_atom_concept_names))) i += 1 continue container.append(token) i += 1 return container def _postprocess_tail_fix(self, tokens: list[str], max_length: int) -> list[str]: def is_incomplete_tail(toks): if not toks: return True return toks[-1] in self.binary_ops | self.quantifiers | self.negation | {'.', '(', *self.unique_roles} def minimal_completion_after(toks): last = toks[-1] if toks else None remaining = max_length - len(toks) if last is None: return [choice(list(self.unique_atom_concept_names))] if last in self.binary_ops: return [choice(list(self.unique_atom_concept_names))] if remaining >= 1 else [] if last in self.quantifiers: return [choice(list(self.unique_roles)), '.', choice(list(self.unique_atom_concept_names))] if remaining >= 3 else [] # if last in self.digits: # return [choice(list(self.unique_roles)), '.', choice(list(self.unique_atom_concept_names))] if remaining >= 3 else [] # if last in self.cardinals: # return ['1', choice(list(self.unique_roles)), '.', choice(list(self.unique_atom_concept_names))] if last in self.unique_roles: if len(toks) >= 2 and toks[-2] in self.quantifiers: return ['.', choice(list(self.unique_atom_concept_names))] if remaining >= 2 else [] return [] if last == '.': if len(toks) >= 2 and toks[-2] in self.unique_roles: return [choice(list(self.unique_atom_concept_names))] if remaining >= 1 else [] return [] if last in self.negation: return [choice(list(self.unique_atom_concept_names))] if remaining >= 1 else [] if last == '(': return [choice(list(self.unique_atom_concept_names)), ')'] if remaining >= 2 else [] return [] if len(tokens) == max_length and not is_incomplete_tail(tokens): return tokens while len(tokens) < max_length and is_incomplete_tail(tokens): patch = minimal_completion_after(tokens) if not patch: break tokens += patch tokens = tokens[:max_length] if len(tokens) == max_length and is_incomplete_tail(tokens): for i in reversed(range(len(tokens))): if not is_incomplete_tail(tokens[:i]): tokens = tokens[:i] break return tokens
[docs] def balance_flatten_parentheses(self, sequences: list[str], max_length: int = None) -> list[str]: stack, result = [], [] for sequence in sequences: if sequence == '(': stack.append(len(result)) result.append(sequence) elif sequence == ')': if stack: stack.pop() result.append(sequence) else: result.append(sequence) if stack: if max_length is not None: for pos in reversed(stack): if len(result) < max_length: result.append(')') else: result.pop(pos) else: for pos in reversed(stack): result.pop(pos) i = 0 while i < len(result) - 3: if result[i] == '(' and result[i + 1] == '(': j = i + 2 depth = 1 while j < len(result) and depth > 0: if result[j] == '(': depth += 1 elif result[j] == ')': depth -= 1 j += 1 if j < len(result) and result[j] == ')': result = result[:i+1] + result[i+2:j] + result[j+1:] continue i += 1 return result
[docs] def parse(self, token_sequence:List[str], relax_parentheses:bool=False, enforce_validity:Optional[bool]=False, replace_with_negation:bool=False): assert isinstance(token_sequence, list) and len(token_sequence) > 0, "Token sequence must be a non-empty list of non-empty strings" tokens = (token for token in token_sequence if token.strip() not in {'(', ')'}) if relax_parentheses else token_sequence self.tokens = self._sanitize_tokens(tokens) if not self.max_length: self.max_length = len(self.tokens) + 10 if enforce_validity: self.tokens = self._fix_mid_tokens_errors(self._enforce(replace_with_negation=replace_with_negation)) self.tokens = self.balance_flatten_parentheses(self._postprocess_tail_fix(self.tokens.copy(), self.max_length)) self.index = 0 self.length = len(self.tokens) try: ast = self._parse_expression() if self.index != self.length: return None, {"error": "Extra tokens remain after generation.", "expr": self.render_tokens_as_class_expr(self.tokens[:self.index])} return self._strip_trailing_parentheses(str(ast)), {"type": OWLClassExpression.__name__, "concept": ast.to_dict()} except Exception as e: # can be extented to the full tokens return None, {"error": str(e), "expr": self.render_tokens_as_class_expr(self.tokens[:self.index + 1])}
def _parse_expression(self): node = self._parse_term() while self._current_token() in self.binary_ops: operation = self._current_token() self._advance() right = self._parse_term() node = And(node, right) if operation == '⊓' else Or(node, right) return node def _parse_term(self): token = self._current_token() if token is None: raise Exception("Unexpected end of tokens during generation.") if token in self.negation: self._advance() return Not(self._parse_term()) if token in self.quantifiers: quantifier = token self._advance() if self._current_token() not in self.unique_roles: raise Exception(f"Expected role after quantifier, got '{self._current_token()}'.") role = self._current_token() self._advance() if self._current_token() != '.': # list(self.dot)[0] raise Exception("Expected '.' after role in quantified expression.") self._advance() filler = self._parse_term() return Exists(role, filler) if quantifier == '∃' else Forall(role, filler) if token in self.cardinals: kind = token self._advance() num_token = self._current_token() if num_token is None or not num_token.isdigit(): raise Exception(f"Expected number after '{kind}', got '{num_token}'.") number = int(num_token) self._advance() role = self._current_token() if role not in self.unique_roles: raise Exception(f"Expected role after number in cardinality, got '{role}'.") self._advance() if self._current_token() != '.': raise Exception("Expected '.' after role in cardinality.") self._advance() filler = self._parse_term() return Cardinality(kind, number, role, filler) if token == '(': self._advance() expression = self._parse_expression() if self._current_token() != ')': raise Exception("Expected ')' after expression.") self._advance() return expression if token in self.unique_atom_concept_names: self._advance() return Atoms(token) raise Exception(f"Unexpected token '{token}' at position {self.index}.")
[docs] def render_tokens_as_class_expr(self, _tokens): formatted_tokens = [] indx = 0 while indx < len(_tokens): token = _tokens[indx] if token in self.quantifiers: formatted_tokens.append(token) indx += 1 if indx < len(_tokens): formatted_tokens.append(_tokens[indx]) indx += 1 if indx < len(_tokens) and _tokens[indx] in self.dot: formatted_tokens.append(_tokens[indx]) indx += 1 continue if token in self.negation: formatted_tokens.append(token) elif token in self.binary_ops: formatted_tokens.append(f" {token} ") elif token in self.parenthesis: formatted_tokens.append(token) else: if formatted_tokens and formatted_tokens[-1] not in {"(", " "}: formatted_tokens.append(token) else: formatted_tokens.append(token) indx += 1 return "".join(formatted_tokens).replace(" ", " ").strip()
def _lookahead_grammar_strategy(self, context_tokens): if not context_tokens: return self.negation | self.quantifiers | self.cardinals | self.unique_atom_concept_names | {'('} last = context_tokens[-1] if last in self.quantifiers: return self.unique_roles if last in self.cardinals: return self.digits if last in self.digits: return self.digits | self.unique_roles if last in self.unique_roles: return self.dot if last == '.': return self.negation | self.quantifiers | self.cardinals | self.unique_atom_concept_names | {'('} if last == '(': return self.negation | self.quantifiers | self.cardinals | self.unique_atom_concept_names | {'('} if last in self.unique_atom_concept_names: return self.binary_ops | {')'} if last in self.binary_ops: return self.negation | self.quantifiers | self.cardinals | self.unique_atom_concept_names | {'('} if last in self.negation: return self.unique_atom_concept_names | self.quantifiers | self.cardinals | {'('} if last == ')': return self.binary_ops | {')'} return self.vocabs def _is_valid_next_token(self, token, context_tokens): valid_candidate_tokens = sorted(self._lookahead_grammar_strategy(context_tokens)) return token in valid_candidate_tokens def _enforce(self, max_length:Optional[int]=None, replace_with_negation:bool=False): if not max_length: max_length = len(self.tokens) + 50 self.atom_concepts_with_negation = self._negate_unique_atomic_concepts(replace_with_negation=replace_with_negation) corrected_tokens, curr_valid_cum = [], [] choices, cap = None, None cap_curr_val = {'atom': 1, 'neg_atom': 2, 'role': 3, 'roleCard': 4} indx = 0 while indx < len(self.tokens) and len(corrected_tokens) < max_length: token = self.tokens[indx] prev_token = self.tokens[indx-1] if indx != 0 else None ahead_token = self.tokens[indx+1] if (indx+1) < len(self.tokens) else None next_ahead_token = self.tokens[indx+2] if (indx+2) < len(self.tokens) else None if not curr_valid_cum and token in self.quantifiers | self.negation | self.binary_ops | self.parenthesis | self.unique_atom_concept_names | self.dot | self.unique_roles: if indx != 0 and prev_token: if token in self.negation: if prev_token in self.negation | self.unique_atom_concept_names: ops_choice = random.choice(list(self.binary_ops)) corrected_tokens.extend([ops_choice, token]) indx +=1 continue elif token in self.dot and prev_token not in self.unique_roles: if ahead_token and ahead_token not in {')'}: if ahead_token in self.unique_atom_concept_names | self.negation | {'('}: ops_choice = random.choice(list(self.binary_ops)) corrected_tokens.append(ops_choice) indx +=1 continue elif token in self.unique_roles and prev_token not in self.quantifiers: if ahead_token and prev_token in {')'}: ops_choice = random.choice(list(self.binary_ops)) corrected_tokens.append(ops_choice) if ahead_token in self.dot: quant_choice = random.choice(list(self.quantifiers)) corrected_tokens.extend([quant_choice, token, ahead_token]) curr_valid_cum.extend([token, ahead_token]) cap = 3 choices = self.atom_concepts_with_negation indx += 2 continue elif ahead_token in self.unique_atom_concept_names: indx += 1 continue elif ahead_token in {')'}: indx +=2 continue elif ahead_token and ahead_token in self.negation | self.unique_atom_concept_names: if prev_token in self.binary_ops | self.parenthesis: indx += 1 continue elif token in self.quantifiers and prev_token not in self.binary_ops: ops_choice = random.choice(list(self.binary_ops)) if prev_token in self.unique_atom_concept_names and (ahead_token in self.unique_roles or (next_ahead_token and next_ahead_token in self.unique_roles)): corrected_tokens.extend([ops_choice, token]) choices = self.unique_roles indx += 1 continue elif token in {')'} and prev_token not in self.unique_atom_concept_names: if prev_token in self.quantifiers and ahead_token in self.unique_roles: self.tokens.pop(indx) continue elif token in {'('} and prev_token in {')'} | self.unique_atom_concept_names: ops_choice = random.choice(list(self.binary_ops)) corrected_tokens.extend([ops_choice, token]) choices = self.atom_concepts_with_negation indx += 1 continue if ahead_token: if token in self.quantifiers: if ahead_token in self.dot: role_choice = random.choice(list(self.unique_roles)) corrected_tokens.extend([token, role_choice]) curr_valid_cum.append(role_choice) cap = 3 indx += 1 continue elif ahead_token in self.parenthesis: if prev_token and prev_token in {')'}: corrected_tokens.append(random.choice(list(self.binary_ops))) role_choice = random.choice(list(self.unique_roles)) corrected_tokens.extend([token, role_choice]) curr_valid_cum.append(role_choice) cap = 3 choices = self.dot indx += 1 continue elif ahead_token in self.binary_ops: atomic_choice = random.choice(list(self.atom_concepts_with_negation)) if isinstance(atomic_choice, tuple): corrected_tokens.extend(atomic_choice) else: corrected_tokens.append(atomic_choice) indx += 1 continue elif ahead_token in self.unique_atom_concept_names: self.tokens.pop(indx) continue if token in self.binary_ops: if ahead_token in self.binary_ops: atomic_choice = random.choice(list(self.atom_concepts_with_negation)) if isinstance(atomic_choice, tuple): _token = [token] + list(atomic_choice) else: _token = [token, atomic_choice] corrected_tokens.extend(_token) indx += 2 continue elif ahead_token == ')': atomic_choice = random.choice(list(self.atom_concepts_with_negation)) if isinstance(atomic_choice, tuple): _token = [token] + list(atomic_choice) else: _token = [token, atomic_choice] corrected_tokens.extend(_token) indx += 1 continue elif ahead_token in self.dot: if next_ahead_token: if next_ahead_token in self.parenthesis | self.unique_atom_concept_names: corrected_tokens.append(token) indx += 2 continue if token == ')': if indx != 0: if prev_token and prev_token in self.unique_atom_concept_names: corrected_tokens.append(token) indx += 1 continue elif ahead_token in self.parenthesis | self.quantifiers | self.dot: if ahead_token in {')'}: indx += 1 continue ops_choice = random.choice(list(self.binary_ops)) corrected_tokens.extend([token, ops_choice]) indx += 1 continue else: if ahead_token in self.unique_atom_concept_names: corrected_tokens.append('(') indx +=1 continue elif ahead_token in self.binary_ops: atomic_choice = random.choice(list(self.atom_concepts_with_negation)) if isinstance(atomic_choice, tuple): _token = list(atomic_choice) else: _token = [atomic_choice] corrected_tokens.extend(['('] + _token) indx += 1 continue if token in self.negation: if ahead_token in self.quantifiers: if next_ahead_token and next_ahead_token in self.unique_roles: atomic_choice = random.choice(list(self.unique_atom_concept_names)) ops_choice = random.choice(list(self.binary_ops)) corrected_tokens.extend([token, atomic_choice, ops_choice]) indx += 1 continue else: corrected_tokens.append(token) if ahead_token not in self.unique_atom_concept_names | self.negation: self.tokens[indx + 1] = random.choice(list(self.unique_atom_concept_names)) indx += 1 continue elif ahead_token in self.unique_roles: if next_ahead_token and next_ahead_token in {'.'}: quant_choice = random.choice(list(self.quantifiers)) self.tokens[indx] = quant_choice corrected_tokens.append(quant_choice) indx +=1 continue corrected_tokens.append(token) atomic_choice = random.choice(list(self.unique_atom_concept_names)) self.tokens[indx+1] = atomic_choice indx +=1 continue if token in self.unique_roles and prev_token not in self.quantifiers: if prev_token in self.unique_atom_concept_names and ahead_token in self.unique_atom_concept_names: ops_choice = random.choice(list(self.binary_ops)) corrected_tokens.append(ops_choice) indx +=1 continue if token in self.quantifiers: if curr_valid_cum: if cap == 3 and len(curr_valid_cum) == 2: atomic_choice = random.choice(list(choices)) if isinstance(atomic_choice, tuple): corrected_tokens.extend(atomic_choice) self.tokens[indx] = atomic_choice[1] else: corrected_tokens.append(atomic_choice) self.tokens[indx] = atomic_choice cap, choices, curr_valid_cum = None, None, [] indx +=1 continue if token == '(': if ahead_token or prev_token: if ahead_token in self.negation | self.binary_ops | {')'} | self.dot: if not curr_valid_cum: if indx == 0: if ahead_token not in self.negation | self.unique_atom_concept_names: token = random.choice(list(self.atom_concepts_with_negation)) if isinstance(token, tuple): corrected_tokens.extend(token) else: corrected_tokens.append(token) indx += 1 continue else: if next_ahead_token: if ahead_token in self.negation: if next_ahead_token not in self.unique_atom_concept_names: atomic_choice = random.choice(list(self.unique_atom_concept_names)) if next_ahead_token in self.binary_ops: corrected_tokens.extend([token, atomic_choice]) ahead_token = atomic_choice elif next_ahead_token in self.quantifiers: ops_choice = random.choice(list(self.binary_ops)) corrected_tokens.extend([token, atomic_choice, ops_choice]) ahead_token = ops_choice indx +=2 continue else: if ahead_token in self.binary_ops: if next_ahead_token and next_ahead_token in self.negation | self.unique_atom_concept_names: if prev_token and prev_token in self.binary_ops: indx +=2 continue elif ahead_token in {')'} | self.dot : atomic_choice = random.choice(list(self.atom_concepts_with_negation)) ops_choice = random.choice(list(self.binary_ops)) if isinstance(atomic_choice, tuple): _token = list(atomic_choice) else: _token = [atomic_choice] if prev_token and prev_token not in self.binary_ops: _token = [ops_choice] + _token _token = [token] + _token corrected_tokens.extend(_token if ahead_token in self.dot else _token + [')']) indx += 2 continue else: if ahead_token in {')'}: if all(choice in self.atom_concepts_with_negation for choice in choices): atomic_choice = random.choice(list(choices)) else: atomic_choice = random.choice(list(self.atom_concepts_with_negation)) if isinstance(atomic_choice, tuple): _token = [token] + list(atomic_choice) else: _token = [token, atomic_choice] if prev_token and prev_token in self.unique_roles: _token = ['.'] + _token corrected_tokens.extend(_token) cap, choices, curr_valid_cum = None, None, [] indx += 1 continue elif ahead_token in self.negation: if prev_token and prev_token in self.unique_roles: if next_ahead_token and next_ahead_token in self.unique_atom_concept_names: corrected_tokens.extend([token, '.', ahead_token, next_ahead_token]) cap, choices, curr_valid_cum = None, None, [] indx += 3 continue elif (indx - 2 != 0 and self.tokens[indx-2] in self.unique_roles) and prev_token and prev_token in self.dot: corrected_tokens.extend([token, ahead_token]) if next_ahead_token and (indx + 3) < len(self.tokens): if next_ahead_token in self.unique_atom_concept_names and self.tokens[indx + 3] in {')'}: corrected_tokens.extend([next_ahead_token, self.tokens[indx + 3]]) indx += 2 indx += 1 cap, choices, curr_valid_cum = None, None, [] continue elif cap == 3 and len(curr_valid_cum) == 1: dot_choice = list(choices) if choices in self.dot else '.' corrected_tokens.append(dot_choice) atomic_choice = random.choice(list(self.unique_atom_concept_names)) corrected_tokens.extend([token, ahead_token, atomic_choice]) cap, choices, curr_valid_cum = None, None, [] indx +=2 continue elif ahead_token in self.quantifiers: if curr_valid_cum: if all(choice in self.atom_concepts_with_negation for choice in choices): atomic_choice = random.choice(list(choices)) else: atomic_choice = random.choice(list(self.atom_concepts_with_negation)) if isinstance(atomic_choice, tuple): _token = [token] + list(atomic_choice) else: _token = [token, atomic_choice] if next_ahead_token and next_ahead_token not in {')'} or next_ahead_token not in self.binary_ops: _token += [')'] corrected_tokens.extend(_token) cap, choices, curr_valid_cum = None, None, [] if next_ahead_token and next_ahead_token in self.unique_roles: ops_choice = random.choice(list(self.binary_ops)) corrected_tokens.append(ops_choice) indx +=1 continue indx +=2 continue else: if not next_ahead_token: atomic_choice = random.choice(list(self.unique_atom_concept_names)) corrected_tokens.append(token) self.tokens[indx+1] = atomic_choice indx +=1 continue elif prev_token: if prev_token in self.unique_atom_concept_names: ops_choice = random.choice(list(self.binary_ops)) corrected_tokens.extend([ops_choice, token]) indx +=1 continue if token == ')': if curr_valid_cum: if cap == 3 and len(curr_valid_cum) == 2: atomic_choice = random.choice(list(self.unique_atom_concept_names)) corrected_tokens.append(atomic_choice) cap, choices, curr_valid_cum = None, None, [] indx +=1 continue if not curr_valid_cum and token in self.unique_atom_concept_names | self.unique_roles and ahead_token: if token in self.unique_atom_concept_names: ops_choice = random.choice(list(self.binary_ops)) if ahead_token in self.unique_atom_concept_names and prev_token not in {')'}: corrected_tokens.extend([token, ops_choice]) indx +=1 continue elif ahead_token in self.unique_roles: quant_choice = random.choice(list(self.quantifiers)) corrected_tokens.extend([token, ops_choice, quant_choice]) indx +=1 continue else: _token = [token, '.'] if ahead_token and ahead_token in self.unique_atom_concept_names: corrected_tokens.extend(_token + [ahead_token]) indx +=2 continue elif ahead_token and ahead_token in self.binary_ops: atomic_choice = random.choice(list(self.atom_concepts_with_negation)) if isinstance(atomic_choice, tuple): choice = list(atomic_choice) else: choice = [atomic_choice] corrected_tokens.extend(_token + choice) indx +=1 continue if token not in self.binary_ops | self.parenthesis | self.quantifiers: _token = token if _token in self.negation | self.unique_atom_concept_names: if _token in self.negation: if not cap: cap = cap_curr_val['neg_atom'] choices = self.unique_atom_concept_names else: if not cap: cap = cap_curr_val['atom'] choices = self.binary_ops elif _token in self.digits | self.dot | self.unique_roles: if _token in self.digits: #TODO: Work on this later if not cap: cap = cap_curr_val['roleCard'] if curr_valid_cum and (ahead_token and ahead_token in self.digits): _token += ahead_token indx += 1 choices = self.unique_roles elif _token in self.unique_roles: if not cap: cap = cap_curr_val['role'] choices = self.dot elif _token in self.dot: choices = self.atom_concepts_with_negation curr_valid_cum.append(_token) if not self._is_valid_next_token(token, corrected_tokens): token = random.choice(list(choices)) curr_valid_cum.append(token) if isinstance(token, tuple): corrected_tokens.extend(token) else: corrected_tokens.append(token) if len(curr_valid_cum) == cap or (curr_valid_cum and curr_valid_cum[0] == '.' and len(curr_valid_cum) == 2): cap, choices, curr_valid_cum = None, None, [] indx +=1 return corrected_tokens