Implementing a new Sink

To create a new Sink, it is necessary to implement the interface Sink.

     
public interface Sink extends TripleBasedSink, QuadBasedSink, UnstructuredDataSink, Closeable {

    public default void addMetaData(Model model) {
        CrawleableUri uri = new CrawleableUri(Constants.DEFAULT_META_DATA_GRAPH_URI);
        StmtIterator iterator = model.listStatements();
        while (iterator.hasNext()) {
            addTriple(uri, iterator.next().asTriple());
        }
        flushMetadata();
    }

    @Override
    public default void close() throws IOException {
        closeSinkForUri(new CrawleableUri(Constants.DEFAULT_META_DATA_GRAPH_URI));
    }
    
    public void flushMetadata();
    
}
      

Please notice that this interface extends others, such as: TripleBasedSink,QuadBasedSink, UnstructuredDataSink, Closeable, requiring the following methods to be implemented:

     
public class CustomSink implements Sink{

	@Override
	public void addTriple(CrawleableUri uri, Triple triple) {
		// TODO Auto-generated method stub
		
	}

	@Override
	public void openSinkForUri(CrawleableUri uri) {
		// TODO Auto-generated method stub
		
	}

	@Override
	public void closeSinkForUri(CrawleableUri uri) {
		// TODO Auto-generated method stub
		
	}

	@Override
	public void addQuad(CrawleableUri uri, Quad quad) {
		// TODO Auto-generated method stub
		
	}

	@Override
	public void addData(CrawleableUri uri, InputStream stream) {
		// TODO Auto-generated method stub
		
	}

	@Override
	public void flushMetadata() {
		// TODO Auto-generated method stub
		
	}

}
    
}
      

Here it is the purpose of each one of the methods:

  • openSinkForUri: The sink sould open a stream for the URI received. In General, the implementation can have a Map <URI,stream>, containing the URIs and their respective streams
  • addTriple: Write triple data into URI's respective stream
  • addQuad: Write quad data into URI's respective stream
  • addData: Stores the data from the given stream for the given URI.
  • flushMetadata: Flushes the current metadata information into the metadata graph.
  • closeSinkForUri: Closes the stream for the given URI.

Good examples of implementations can be seen on the org.dice_research.squirrel.sink.impl.file.FileBasedSink and org.dice_research.squirrel.sink.impl.sparql.SparqlBasedSink. These sinks implmements solutions for File system storare and Sparql Endpoints, respectivelly.

Also, the sink is managed by the spring context. Do not forget to include your implementation in the worker-context.xml

       
  <bean id="sinkBean"
        class="org.dice_research.squirrel.sink.impl.custom.CustomSink" />