from typing import List, Union, Optional
import os
import dspy
import uuid
from pathlib import Path
from owlapy.class_expression import OWLClass
from owlapy.iri import IRI
from owlapy.owl_axiom import OWLObjectPropertyAssertionAxiom, OWLClassAssertionAxiom, OWLDataPropertyAssertionAxiom, \
OWLSubClassOfAxiom
from owlapy.owl_individual import OWLNamedIndividual
from owlapy.owl_ontology import Ontology
from owlapy.owl_property import OWLObjectProperty, OWLDataProperty
from owlapy.agen_kg.signatures import (Entity, Triple, TypeAssertion, TypeGeneration, Literal, SPLTriples, Domain,
DomainSpecificFewShotGenerator)
from owlapy.agen_kg.helper import extract_hierarchy_from_dbpedia, task_example_mapping
from owlapy.agen_kg.domain_examples_cache import DomainExamplesCache
from owlapy.agen_kg.graph_extractor import GraphExtractor
[docs]
class DomainGraphExtractor(GraphExtractor):
def __init__(self, enable_logging=False, examples_cache_dir: Optional[str] = None):
"""
A module to extract an RDF graph from domain-specific text input.
Args:
enable_logging: Whether to enable logging.
examples_cache_dir: Directory to cache domain-specific examples. If None, uses current working directory.
"""
super().__init__(enable_logging)
self.domain_detector = dspy.Predict(Domain)
self.few_shot_generator = dspy.Predict(DomainSpecificFewShotGenerator)
self.entity_extractor = dspy.Predict(Entity)
self.triples_extractor = dspy.Predict(Triple)
self.type_asserter = dspy.Predict(TypeAssertion)
self.type_generator = dspy.Predict(TypeGeneration)
self.literal_extractor = dspy.Predict(Literal)
self.spl_triples_extractor = dspy.Predict(SPLTriples)
# Initialize examples cache manager
self.examples_cache = DomainExamplesCache(cache_dir=examples_cache_dir)
[docs]
def generate_domain_specific_examples(self, domain: str) -> dict:
"""
Generate domain-specific few-shot examples for all task types.
Automatically caches examples to disk for future reuse. If examples have been
previously generated for the domain, they will be loaded from cache.
Args:
domain: The domain for which to generate examples.
Returns:
Dictionary containing few-shot examples for each task type, keyed by:
'entity_extraction', 'triples_extraction', 'type_assertion', 'type_generation',
'literal_extraction', 'triples_with_numeric_literals_extraction'
"""
# Check if examples are already cached
cached_examples = self.examples_cache.load_examples(domain)
if cached_examples is not None:
if self.logging:
cache_file = self.examples_cache.get_cache_file_path(domain)
print(f"DomainGraphExtractor: INFO :: Loaded cached examples for domain '{domain}' from {cache_file}")
return cached_examples
# Generate new examples if not cached
examples = {}
task_types = [
'entity_extraction',
'triples_extraction',
'type_assertion',
'type_generation',
'literal_extraction',
'triples_with_numeric_literals_extraction'
]
if self.logging:
print(f"DomainGraphExtractor: INFO :: Generating domain-specific few-shot examples for domain: {domain}")
for task_type in task_types:
result = self.few_shot_generator(
domain=domain,
task_type=task_type,
num_examples=2,
examples_example_structure=task_example_mapping[task_type]
)
examples[task_type] = result.few_shot_examples
if self.logging:
print(f"DomainGraphExtractor: INFO :: Generated examples for {task_type}")
# Save examples to cache
if self.examples_cache.save_examples(domain, examples):
if self.logging:
cache_file = self.examples_cache.get_cache_file_path(domain)
print(f"DomainGraphExtractor: INFO :: Cached examples for domain '{domain}' to {cache_file}")
else:
if self.logging:
print(f"DomainGraphExtractor: WARNING :: Failed to cache examples for domain '{domain}'")
return examples
[docs]
def clear_domain_cache(self, domain: str) -> bool:
"""
Clear the cached examples for a specific domain.
Args:
domain: The domain for which to clear cached examples.
Returns:
True if the cache was cleared successfully, False otherwise.
"""
success = self.examples_cache.clear_domain_cache(domain)
if success and self.logging:
print(f"DomainGraphExtractor: INFO :: Cleared cache for domain '{domain}'")
return success
[docs]
def clear_all_domain_caches(self) -> bool:
"""
Clear all cached domain examples.
Returns:
True if all caches were cleared successfully, False otherwise.
"""
success = self.examples_cache.clear_all_caches()
if success and self.logging:
print("DomainGraphExtractor: INFO :: Cleared all domain example caches")
return success
[docs]
def list_cached_domains(self) -> list:
"""
List all domains that have cached examples.
Returns:
List of domain names with cached examples.
"""
domains = self.examples_cache.list_cached_domains()
if self.logging and domains:
print(f"DomainGraphExtractor: INFO :: Cached domains: {', '.join(domains)}")
return domains
[docs]
def is_domain_cached(self, domain: str) -> bool:
"""
Check if examples exist for a domain in the cache.
Args:
domain: The domain to check.
Returns:
True if examples are cached for the domain, False otherwise.
"""
return self.examples_cache.examples_exist(domain)
[docs]
def get_cache_file_path(self, domain: str) -> str:
"""
Get the full path to the cache file for a domain.
Args:
domain: The domain name.
Returns:
String path to the cache file.
"""
return self.examples_cache.get_cache_file_path(domain)
[docs]
def generate_ontology(self, text: Union[str, Path],
domain: str = None,
query: str = None,
ontology_namespace=f"http://ontology.local/{uuid.uuid4()}#",
entity_types: List[str] = None,
generate_types=False,
extract_spl_triples=False,
create_class_hierarchy=False,
use_chunking: bool = None,
use_incremental_merging = False,
fact_reassurance: bool = True,
save_path="generated_ontology.owl") -> Ontology:
"""
Generate a domain-specific ontology from text.
Supports automatic chunking for large texts that exceed the LLM's context window.
Args:
text: Input text or file path to extract ontology from.
Supports files: .txt, .pdf, .docx, .doc, .rtf, .html, .htm
domain: The domain of the text. If None, will be detected automatically.
query: A custom prompt to give directions to the agent.
ontology_namespace: Namespace for the ontology.
entity_types: List of entity types to assign.
generate_types: Whether to generate types automatically.
extract_spl_triples: Whether to extract subject-property-literal triples.
create_class_hierarchy: Whether to create class hierarchy from DBpedia.
use_chunking: Whether to use text chunking for large documents.
- None (default): Auto-detect based on text size (uses auto_chunk_threshold).
- True: Force chunking even for smaller texts.
- False: Disable chunking (may fail for very large texts).
use_incremental_merging: Whether to use incremental merging when chunking is enabled (default: False).
fact_reassurance: Whether to perform the coherence check on triples (default: True).
save_path: Path to save the ontology.
Returns:
Generated Ontology object.
"""
self.plan_decompose(query)
# Load text from file if necessary
if isinstance(text, (str, Path)):
# Check if it's a file path
try:
source_path = Path(text) if not isinstance(text, Path) else text
if source_path.is_file():
text = self.load_text(text)
# else: treat as raw text string
except OSError:
pass
# Determine whether to use chunking
if use_chunking is None:
use_chunking = self.should_chunk_text(text)
self.use_incremental_merging = use_incremental_merging
if use_chunking:
chunks = self.chunk_text(text)
if self.logging:
chunk_info = self.get_chunking_info(text)
print(f"DomainGraphExtractor: INFO :: Text will be processed in {chunk_info['num_chunks']} chunks")
print(f"DomainGraphExtractor: INFO :: Total chars: {chunk_info['total_chars']}, "
f"Est. tokens: {chunk_info['estimated_tokens']}")
else:
chunks = [text]
# Use a representative sample for domain detection
domain_detection_text = text[:self.auto_chunk_threshold] if len(text) > self.auto_chunk_threshold else text
# Step 1: Detect domain if not provided
if domain is None:
domain_result = self.domain_detector(text=domain_detection_text)
domain = domain_result.domain
if self.logging:
print(f"DomainGraphExtractor: INFO :: Detected domain: {domain}")
else:
if self.logging:
print(f"DomainGraphExtractor: INFO :: Using provided domain: {domain}")
# Step 2: Generate domain-specific few-shot examples if not provided
generated_examples = self.generate_domain_specific_examples(domain)
# Use generated examples only if user hasn't provided their own
examples_for_entity_extraction = generated_examples['entity_extraction']
examples_for_triples_extraction = generated_examples['triples_extraction']
examples_for_type_assertion = generated_examples['type_assertion']
examples_for_type_generation = generated_examples['type_generation']
examples_for_literal_extraction = generated_examples['literal_extraction']
examples_for_spl_triples_extraction = generated_examples['triples_with_numeric_literals_extraction']
# Step 3: Extract entities (from chunks if needed)
# if self.logging:
# print(
# "DomainGraphExtractor: INFO :: In the generated triples, you may see entities or literals that were not "
# "part of the extracted entities or literals. They are filtered before added to the ontology.")
chunk_summaries = None
if use_chunking and len(chunks) > 1:
entities, chunk_summaries = self._extract_entities_from_chunks(
chunks, examples_for_entity_extraction, "DomainGraphExtractor",
task_instructions=self.entity_extraction_instructions
)
else:
entities = self.entity_extractor(text=text,
few_shot_examples=examples_for_entity_extraction,
task_instructions=self.entity_extraction_instructions).entities
if self.logging:
print(f"DomainGraphExtractor: INFO :: Generated the following entities: {entities}")
# Step 4: Cluster entities to identify and merge duplicates
# Use summaries if available from chunked extraction, or create clustering context
if chunk_summaries:
clustering_context = self.create_combined_summary(chunk_summaries)
else:
clustering_context = self.get_clustering_context(text)
canonical_entities = self.filter_entities(entities, clustering_context)
if self.logging and len(entities) != len(canonical_entities):
print(f"DomainGraphExtractor: INFO :: After filtering: {canonical_entities}")
# Step 5: Extract triples using canonical entities (from chunks if needed)
if use_chunking and len(chunks) > 1:
triples = self._extract_triples_from_chunks(
chunks, canonical_entities, examples_for_triples_extraction,
"DomainGraphExtractor", chunk_summaries,
task_instructions=self.triple_extraction_instructions
)
else:
triples = self.triples_extractor(text=text, entities=canonical_entities,
few_shot_examples=examples_for_triples_extraction,
task_instructions=self.triple_extraction_instructions).triples
if self.logging:
print(f"DomainGraphExtractor: INFO :: Generated the following triples: {triples}")
# Step 5.5: Cluster relations (object properties) and update triples programmatically BEFORE coherence check
relations = list(set([triple[1] for triple in triples]))
relation_mapping = self.cluster_relations(relations, clustering_context, query)
# Update triples with canonical relations
updated_triples = [(triple[0], relation_mapping.get(triple[1], triple[1]), triple[2]) for triple in triples]
if self.logging and len(relations) != len(set(relation_mapping.values())):
print(f"DomainGraphExtractor: INFO :: After relation clustering: {list(set(relation_mapping.values()))}")
# Step 6: Check coherence of the relation-normalized triples
if fact_reassurance:
coherent_triples = self.check_coherence(updated_triples,
clustering_context,
self.fact_checking_instructions)
if self.logging:
print(f"DomainGraphExtractor: INFO :: After coherence check, kept {len(coherent_triples)} triples")
else:
coherent_triples = updated_triples
if self.logging:
print(f"DomainGraphExtractor: INFO :: Skipped coherence check, using all {len(coherent_triples)} triples")
# Step 7: Create ontology and add triples
onto = Ontology(ontology_iri=IRI.create("http://example.com/ontogen"), load=False)
for triple in coherent_triples:
subject = OWLNamedIndividual(ontology_namespace + self.snake_case(triple[0]))
prop = OWLObjectProperty(ontology_namespace + self.snake_case(triple[1]))
object = OWLNamedIndividual(ontology_namespace + self.snake_case(triple[2]))
# TODO: `and triple[2] in canonical_entities` is removed from the condition below
# because its was too strict. May need to reconsider that decision.
if triple[0] in canonical_entities:
ax = OWLObjectPropertyAssertionAxiom(subject, prop, object)
onto.add_axiom(ax)
# Step 8: Handle type assertions
if entity_types is not None or generate_types:
type_assertions = None
if use_chunking and len(chunks) > 1:
type_assertions = self._extract_types_from_chunks(
chunks, canonical_entities, entity_types, generate_types,
examples_for_type_assertion, examples_for_type_generation,
"DomainGraphExtractor", chunk_summaries,
task_instructions_assertion=self.type_assertion_instructions,
task_instructions_generation=self.type_generation_instructions
)
else:
if entity_types is not None and not generate_types:
type_assertions = self.type_asserter(text=text, entities=canonical_entities,
entity_types=entity_types,
task_instructions=self.type_assertion_instructions,
few_shot_examples=examples_for_type_assertion).pairs
if self.logging:
print(
f"DomainGraphExtractor: INFO :: Assigned types for entities as following: {type_assertions}")
elif generate_types:
type_assertions = self.type_generator(text=text, entities=canonical_entities,
task_instructions=self.type_generation_instructions,
few_shot_examples=examples_for_type_generation).pairs
if self.logging:
print(
f"DomainGraphExtractor: INFO :: Finished generating types and assigned them to entities as following: {type_assertions}")
# Cluster types and update type assertions programmatically
types = list(set([pair[1] for pair in type_assertions]))
type_mapping = self.cluster_types(types, clustering_context)
# Update type assertions with canonical types
type_assertions = [(pair[0], type_mapping.get(pair[1], pair[1])) for pair in type_assertions]
if self.logging and len(types) != len(set(type_mapping.values())):
print(f"DomainGraphExtractor: INFO :: After type clustering: {list(set(type_mapping.values()))}")
# Add class assertion axioms
for pair in type_assertions:
subject = OWLNamedIndividual(ontology_namespace + self.snake_case(pair[0]))
entity_type = OWLClass(ontology_namespace + self.format_type_name(pair[1]))
ax = OWLClassAssertionAxiom(subject, entity_type)
try:
onto.add_axiom(ax)
except Exception as e:
print(e)
print(f"Subject: {subject}, Entity Type: {entity_type}")
# Step 9: Extract SPL triples if requested
if extract_spl_triples:
# Extract literals (from chunks if needed)
if use_chunking and len(chunks) > 1:
literals = self._extract_literals_from_chunks(
chunks, examples_for_literal_extraction, "DomainGraphExtractor",
task_instructions=self.literal_extraction_instructions
)
else:
literals = self.literal_extractor(text=text,
task_instructions=self.literal_extraction_instructions,
few_shot_examples=examples_for_literal_extraction).l_values
if self.logging:
print(f"DomainGraphExtractor: INFO :: Generated the following numeric literals: {literals}")
# Extract SPL triples (from chunks if needed)
if use_chunking and len(chunks) > 1:
spl_triples = self._extract_spl_triples_from_chunks(
chunks, canonical_entities, literals,
examples_for_spl_triples_extraction,
"DomainGraphExtractor",
task_instructions=self.triple_with_literal_extraction_instructions
)
else:
spl_triples = self.spl_triples_extractor(text=text, entities=canonical_entities,
numeric_literals=literals,
task_instructions=self.triple_with_literal_extraction_instructions,
few_shot_examples=examples_for_spl_triples_extraction).triples
if self.logging:
print(f"DomainGraphExtractor: INFO :: Generated the following s-p-l triples: {spl_triples}")
# Cluster relations (data properties) in SPL triples and update programmatically
spl_relations = list(set([triple[1] for triple in spl_triples]))
spl_relation_mapping = self.cluster_relations(spl_relations, clustering_context)
# Update SPL triples with canonical relations
spl_triples = [(triple[0], spl_relation_mapping.get(triple[1], triple[1]), triple[2])
for triple in spl_triples]
if self.logging and len(spl_relations) != len(set(spl_relation_mapping.values())):
print(
f"DomainGraphExtractor: INFO :: After SPL relation clustering: "
f"{list(set(spl_relation_mapping.values()))}")
for triple in spl_triples:
subject = OWLNamedIndividual(ontology_namespace + self.snake_case(triple[0]))
prop = OWLDataProperty(ontology_namespace + self.snake_case(triple[1]))
literal = self.get_corresponding_literal(triple[2])
if triple[2] in literals:
try:
ax = OWLDataPropertyAssertionAxiom(subject, prop, literal)
onto.add_axiom(ax)
except Exception:
pass
# Step 10: Create class hierarchy if requested
if create_class_hierarchy:
for cls in onto.classes_in_signature():
try:
superclasses, subclasses = extract_hierarchy_from_dbpedia(cls.remainder)
except Exception:
continue
if self.logging:
print(
f"DomainGraphExtractor: INFO :: For class {cls.remainder} found superclasses: {[IRI.create(s).remainder for s in superclasses]} and subclasses: {[IRI.create(s).remainder for s in subclasses]}")
for superclass in superclasses:
dbpedia_class_remainder = IRI.create(superclass).remainder
sup_cls = OWLClass(ontology_namespace + dbpedia_class_remainder)
ax = OWLSubClassOfAxiom(cls, sup_cls)
try:
onto.add_axiom(ax)
except Exception:
pass
for subclass in subclasses:
dbpedia_class_remainder = IRI.create(subclass).remainder
sub_cls = OWLClass(ontology_namespace + dbpedia_class_remainder)
ax = OWLSubClassOfAxiom(sub_cls, cls)
try:
onto.add_axiom(ax)
except Exception:
pass
# Step 11: Save ontology
onto.save(path=save_path)
if self.logging:
print(
f"DomainGraphExtractor: INFO :: Successfully saved the ontology at {os.path.join(os.getcwd(), save_path)}")
return onto