Source code for owlapy.agen_kg.graph_extracting_models.domain_graph_extractor

from typing import List, Union, Optional
import os
import dspy
import uuid
from pathlib import Path

from owlapy.class_expression import OWLClass
from owlapy.iri import IRI
from owlapy.owl_axiom import OWLObjectPropertyAssertionAxiom, OWLClassAssertionAxiom, OWLDataPropertyAssertionAxiom, \
    OWLSubClassOfAxiom
from owlapy.owl_individual import OWLNamedIndividual
from owlapy.owl_ontology import Ontology
from owlapy.owl_property import OWLObjectProperty, OWLDataProperty
from owlapy.agen_kg.signatures import (Entity, Triple, TypeAssertion, TypeGeneration, Literal, SPLTriples, Domain,
    DomainSpecificFewShotGenerator)
from owlapy.agen_kg.helper import extract_hierarchy_from_dbpedia, task_example_mapping
from owlapy.agen_kg.domain_examples_cache import DomainExamplesCache
from owlapy.agen_kg.graph_extractor import GraphExtractor



[docs] class DomainGraphExtractor(GraphExtractor): def __init__(self, enable_logging=False, examples_cache_dir: Optional[str] = None): """ A module to extract an RDF graph from domain-specific text input. Args: enable_logging: Whether to enable logging. examples_cache_dir: Directory to cache domain-specific examples. If None, uses current working directory. """ super().__init__(enable_logging) self.domain_detector = dspy.Predict(Domain) self.few_shot_generator = dspy.Predict(DomainSpecificFewShotGenerator) self.entity_extractor = dspy.Predict(Entity) self.triples_extractor = dspy.Predict(Triple) self.type_asserter = dspy.Predict(TypeAssertion) self.type_generator = dspy.Predict(TypeGeneration) self.literal_extractor = dspy.Predict(Literal) self.spl_triples_extractor = dspy.Predict(SPLTriples) # Initialize examples cache manager self.examples_cache = DomainExamplesCache(cache_dir=examples_cache_dir)
[docs] def generate_domain_specific_examples(self, domain: str) -> dict: """ Generate domain-specific few-shot examples for all task types. Automatically caches examples to disk for future reuse. If examples have been previously generated for the domain, they will be loaded from cache. Args: domain: The domain for which to generate examples. Returns: Dictionary containing few-shot examples for each task type, keyed by: 'entity_extraction', 'triples_extraction', 'type_assertion', 'type_generation', 'literal_extraction', 'triples_with_numeric_literals_extraction' """ # Check if examples are already cached cached_examples = self.examples_cache.load_examples(domain) if cached_examples is not None: if self.logging: cache_file = self.examples_cache.get_cache_file_path(domain) print(f"DomainGraphExtractor: INFO :: Loaded cached examples for domain '{domain}' from {cache_file}") return cached_examples # Generate new examples if not cached examples = {} task_types = [ 'entity_extraction', 'triples_extraction', 'type_assertion', 'type_generation', 'literal_extraction', 'triples_with_numeric_literals_extraction' ] if self.logging: print(f"DomainGraphExtractor: INFO :: Generating domain-specific few-shot examples for domain: {domain}") for task_type in task_types: result = self.few_shot_generator( domain=domain, task_type=task_type, num_examples=2, examples_example_structure=task_example_mapping[task_type] ) examples[task_type] = result.few_shot_examples if self.logging: print(f"DomainGraphExtractor: INFO :: Generated examples for {task_type}") # Save examples to cache if self.examples_cache.save_examples(domain, examples): if self.logging: cache_file = self.examples_cache.get_cache_file_path(domain) print(f"DomainGraphExtractor: INFO :: Cached examples for domain '{domain}' to {cache_file}") else: if self.logging: print(f"DomainGraphExtractor: WARNING :: Failed to cache examples for domain '{domain}'") return examples
[docs] def clear_domain_cache(self, domain: str) -> bool: """ Clear the cached examples for a specific domain. Args: domain: The domain for which to clear cached examples. Returns: True if the cache was cleared successfully, False otherwise. """ success = self.examples_cache.clear_domain_cache(domain) if success and self.logging: print(f"DomainGraphExtractor: INFO :: Cleared cache for domain '{domain}'") return success
[docs] def clear_all_domain_caches(self) -> bool: """ Clear all cached domain examples. Returns: True if all caches were cleared successfully, False otherwise. """ success = self.examples_cache.clear_all_caches() if success and self.logging: print("DomainGraphExtractor: INFO :: Cleared all domain example caches") return success
[docs] def list_cached_domains(self) -> list: """ List all domains that have cached examples. Returns: List of domain names with cached examples. """ domains = self.examples_cache.list_cached_domains() if self.logging and domains: print(f"DomainGraphExtractor: INFO :: Cached domains: {', '.join(domains)}") return domains
[docs] def is_domain_cached(self, domain: str) -> bool: """ Check if examples exist for a domain in the cache. Args: domain: The domain to check. Returns: True if examples are cached for the domain, False otherwise. """ return self.examples_cache.examples_exist(domain)
[docs] def get_cache_file_path(self, domain: str) -> str: """ Get the full path to the cache file for a domain. Args: domain: The domain name. Returns: String path to the cache file. """ return self.examples_cache.get_cache_file_path(domain)
[docs] def generate_ontology(self, text: Union[str, Path], domain: str = None, query: str = None, ontology_namespace=f"http://ontology.local/{uuid.uuid4()}#", entity_types: List[str] = None, generate_types=False, extract_spl_triples=False, create_class_hierarchy=False, use_chunking: bool = None, use_incremental_merging = False, fact_reassurance: bool = True, save_path="generated_ontology.owl") -> Ontology: """ Generate a domain-specific ontology from text. Supports automatic chunking for large texts that exceed the LLM's context window. Args: text: Input text or file path to extract ontology from. Supports files: .txt, .pdf, .docx, .doc, .rtf, .html, .htm domain: The domain of the text. If None, will be detected automatically. query: A custom prompt to give directions to the agent. ontology_namespace: Namespace for the ontology. entity_types: List of entity types to assign. generate_types: Whether to generate types automatically. extract_spl_triples: Whether to extract subject-property-literal triples. create_class_hierarchy: Whether to create class hierarchy from DBpedia. use_chunking: Whether to use text chunking for large documents. - None (default): Auto-detect based on text size (uses auto_chunk_threshold). - True: Force chunking even for smaller texts. - False: Disable chunking (may fail for very large texts). use_incremental_merging: Whether to use incremental merging when chunking is enabled (default: False). fact_reassurance: Whether to perform the coherence check on triples (default: True). save_path: Path to save the ontology. Returns: Generated Ontology object. """ self.plan_decompose(query) # Load text from file if necessary if isinstance(text, (str, Path)): # Check if it's a file path try: source_path = Path(text) if not isinstance(text, Path) else text if source_path.is_file(): text = self.load_text(text) # else: treat as raw text string except OSError: pass # Determine whether to use chunking if use_chunking is None: use_chunking = self.should_chunk_text(text) self.use_incremental_merging = use_incremental_merging if use_chunking: chunks = self.chunk_text(text) if self.logging: chunk_info = self.get_chunking_info(text) print(f"DomainGraphExtractor: INFO :: Text will be processed in {chunk_info['num_chunks']} chunks") print(f"DomainGraphExtractor: INFO :: Total chars: {chunk_info['total_chars']}, " f"Est. tokens: {chunk_info['estimated_tokens']}") else: chunks = [text] # Use a representative sample for domain detection domain_detection_text = text[:self.auto_chunk_threshold] if len(text) > self.auto_chunk_threshold else text # Step 1: Detect domain if not provided if domain is None: domain_result = self.domain_detector(text=domain_detection_text) domain = domain_result.domain if self.logging: print(f"DomainGraphExtractor: INFO :: Detected domain: {domain}") else: if self.logging: print(f"DomainGraphExtractor: INFO :: Using provided domain: {domain}") # Step 2: Generate domain-specific few-shot examples if not provided generated_examples = self.generate_domain_specific_examples(domain) # Use generated examples only if user hasn't provided their own examples_for_entity_extraction = generated_examples['entity_extraction'] examples_for_triples_extraction = generated_examples['triples_extraction'] examples_for_type_assertion = generated_examples['type_assertion'] examples_for_type_generation = generated_examples['type_generation'] examples_for_literal_extraction = generated_examples['literal_extraction'] examples_for_spl_triples_extraction = generated_examples['triples_with_numeric_literals_extraction'] # Step 3: Extract entities (from chunks if needed) # if self.logging: # print( # "DomainGraphExtractor: INFO :: In the generated triples, you may see entities or literals that were not " # "part of the extracted entities or literals. They are filtered before added to the ontology.") chunk_summaries = None if use_chunking and len(chunks) > 1: entities, chunk_summaries = self._extract_entities_from_chunks( chunks, examples_for_entity_extraction, "DomainGraphExtractor", task_instructions=self.entity_extraction_instructions ) else: entities = self.entity_extractor(text=text, few_shot_examples=examples_for_entity_extraction, task_instructions=self.entity_extraction_instructions).entities if self.logging: print(f"DomainGraphExtractor: INFO :: Generated the following entities: {entities}") # Step 4: Cluster entities to identify and merge duplicates # Use summaries if available from chunked extraction, or create clustering context if chunk_summaries: clustering_context = self.create_combined_summary(chunk_summaries) else: clustering_context = self.get_clustering_context(text) canonical_entities = self.filter_entities(entities, clustering_context) if self.logging and len(entities) != len(canonical_entities): print(f"DomainGraphExtractor: INFO :: After filtering: {canonical_entities}") # Step 5: Extract triples using canonical entities (from chunks if needed) if use_chunking and len(chunks) > 1: triples = self._extract_triples_from_chunks( chunks, canonical_entities, examples_for_triples_extraction, "DomainGraphExtractor", chunk_summaries, task_instructions=self.triple_extraction_instructions ) else: triples = self.triples_extractor(text=text, entities=canonical_entities, few_shot_examples=examples_for_triples_extraction, task_instructions=self.triple_extraction_instructions).triples if self.logging: print(f"DomainGraphExtractor: INFO :: Generated the following triples: {triples}") # Step 5.5: Cluster relations (object properties) and update triples programmatically BEFORE coherence check relations = list(set([triple[1] for triple in triples])) relation_mapping = self.cluster_relations(relations, clustering_context, query) # Update triples with canonical relations updated_triples = [(triple[0], relation_mapping.get(triple[1], triple[1]), triple[2]) for triple in triples] if self.logging and len(relations) != len(set(relation_mapping.values())): print(f"DomainGraphExtractor: INFO :: After relation clustering: {list(set(relation_mapping.values()))}") # Step 6: Check coherence of the relation-normalized triples if fact_reassurance: coherent_triples = self.check_coherence(updated_triples, clustering_context, self.fact_checking_instructions) if self.logging: print(f"DomainGraphExtractor: INFO :: After coherence check, kept {len(coherent_triples)} triples") else: coherent_triples = updated_triples if self.logging: print(f"DomainGraphExtractor: INFO :: Skipped coherence check, using all {len(coherent_triples)} triples") # Step 7: Create ontology and add triples onto = Ontology(ontology_iri=IRI.create("http://example.com/ontogen"), load=False) for triple in coherent_triples: subject = OWLNamedIndividual(ontology_namespace + self.snake_case(triple[0])) prop = OWLObjectProperty(ontology_namespace + self.snake_case(triple[1])) object = OWLNamedIndividual(ontology_namespace + self.snake_case(triple[2])) # TODO: `and triple[2] in canonical_entities` is removed from the condition below # because its was too strict. May need to reconsider that decision. if triple[0] in canonical_entities: ax = OWLObjectPropertyAssertionAxiom(subject, prop, object) onto.add_axiom(ax) # Step 8: Handle type assertions if entity_types is not None or generate_types: type_assertions = None if use_chunking and len(chunks) > 1: type_assertions = self._extract_types_from_chunks( chunks, canonical_entities, entity_types, generate_types, examples_for_type_assertion, examples_for_type_generation, "DomainGraphExtractor", chunk_summaries, task_instructions_assertion=self.type_assertion_instructions, task_instructions_generation=self.type_generation_instructions ) else: if entity_types is not None and not generate_types: type_assertions = self.type_asserter(text=text, entities=canonical_entities, entity_types=entity_types, task_instructions=self.type_assertion_instructions, few_shot_examples=examples_for_type_assertion).pairs if self.logging: print( f"DomainGraphExtractor: INFO :: Assigned types for entities as following: {type_assertions}") elif generate_types: type_assertions = self.type_generator(text=text, entities=canonical_entities, task_instructions=self.type_generation_instructions, few_shot_examples=examples_for_type_generation).pairs if self.logging: print( f"DomainGraphExtractor: INFO :: Finished generating types and assigned them to entities as following: {type_assertions}") # Cluster types and update type assertions programmatically types = list(set([pair[1] for pair in type_assertions])) type_mapping = self.cluster_types(types, clustering_context) # Update type assertions with canonical types type_assertions = [(pair[0], type_mapping.get(pair[1], pair[1])) for pair in type_assertions] if self.logging and len(types) != len(set(type_mapping.values())): print(f"DomainGraphExtractor: INFO :: After type clustering: {list(set(type_mapping.values()))}") # Add class assertion axioms for pair in type_assertions: subject = OWLNamedIndividual(ontology_namespace + self.snake_case(pair[0])) entity_type = OWLClass(ontology_namespace + self.format_type_name(pair[1])) ax = OWLClassAssertionAxiom(subject, entity_type) try: onto.add_axiom(ax) except Exception as e: print(e) print(f"Subject: {subject}, Entity Type: {entity_type}") # Step 9: Extract SPL triples if requested if extract_spl_triples: # Extract literals (from chunks if needed) if use_chunking and len(chunks) > 1: literals = self._extract_literals_from_chunks( chunks, examples_for_literal_extraction, "DomainGraphExtractor", task_instructions=self.literal_extraction_instructions ) else: literals = self.literal_extractor(text=text, task_instructions=self.literal_extraction_instructions, few_shot_examples=examples_for_literal_extraction).l_values if self.logging: print(f"DomainGraphExtractor: INFO :: Generated the following numeric literals: {literals}") # Extract SPL triples (from chunks if needed) if use_chunking and len(chunks) > 1: spl_triples = self._extract_spl_triples_from_chunks( chunks, canonical_entities, literals, examples_for_spl_triples_extraction, "DomainGraphExtractor", task_instructions=self.triple_with_literal_extraction_instructions ) else: spl_triples = self.spl_triples_extractor(text=text, entities=canonical_entities, numeric_literals=literals, task_instructions=self.triple_with_literal_extraction_instructions, few_shot_examples=examples_for_spl_triples_extraction).triples if self.logging: print(f"DomainGraphExtractor: INFO :: Generated the following s-p-l triples: {spl_triples}") # Cluster relations (data properties) in SPL triples and update programmatically spl_relations = list(set([triple[1] for triple in spl_triples])) spl_relation_mapping = self.cluster_relations(spl_relations, clustering_context) # Update SPL triples with canonical relations spl_triples = [(triple[0], spl_relation_mapping.get(triple[1], triple[1]), triple[2]) for triple in spl_triples] if self.logging and len(spl_relations) != len(set(spl_relation_mapping.values())): print( f"DomainGraphExtractor: INFO :: After SPL relation clustering: " f"{list(set(spl_relation_mapping.values()))}") for triple in spl_triples: subject = OWLNamedIndividual(ontology_namespace + self.snake_case(triple[0])) prop = OWLDataProperty(ontology_namespace + self.snake_case(triple[1])) literal = self.get_corresponding_literal(triple[2]) if triple[2] in literals: try: ax = OWLDataPropertyAssertionAxiom(subject, prop, literal) onto.add_axiom(ax) except Exception: pass # Step 10: Create class hierarchy if requested if create_class_hierarchy: for cls in onto.classes_in_signature(): try: superclasses, subclasses = extract_hierarchy_from_dbpedia(cls.remainder) except Exception: continue if self.logging: print( f"DomainGraphExtractor: INFO :: For class {cls.remainder} found superclasses: {[IRI.create(s).remainder for s in superclasses]} and subclasses: {[IRI.create(s).remainder for s in subclasses]}") for superclass in superclasses: dbpedia_class_remainder = IRI.create(superclass).remainder sup_cls = OWLClass(ontology_namespace + dbpedia_class_remainder) ax = OWLSubClassOfAxiom(cls, sup_cls) try: onto.add_axiom(ax) except Exception: pass for subclass in subclasses: dbpedia_class_remainder = IRI.create(subclass).remainder sub_cls = OWLClass(ontology_namespace + dbpedia_class_remainder) ax = OWLSubClassOfAxiom(sub_cls, cls) try: onto.add_axiom(ax) except Exception: pass # Step 11: Save ontology onto.save(path=save_path) if self.logging: print( f"DomainGraphExtractor: INFO :: Successfully saved the ontology at {os.path.join(os.getcwd(), save_path)}") return onto