Documentation

The configuration of TripleWave is controlled through the config.properties file, stored inside the config folder. TripleWave accepts a wide range of configuration parameters, depending on the type of usage. We can broadly group them in three categories: general, input and output. The rest of the page goes into the details according to the following outline:

Installation

Requirements

In order to install TripleWave just clone the repository

git clone https://github.com/streamreasoning/TripleWave.git

Then simply run

npm install

for installing the node dependencies.

Possible bugs

General configuration

These parameters set information about the instance of TripleWave, such as hostname, path and port.

Input

Convert a Web stream

TripleWave allows to generate an RDF stream from an existing stream from the Web. As an example, consider the change stream of Wikipedia. This stream features all the changes that occur on the Wikipedia website. It comprehends not only elements related to the creation or modification of pages (e.g., articles and books), but also events related to users (new registrations and blocked users), and discussions among them.

For example the following JSON excerpt (collected with the API provided here) shows a fragment of the stream of changes of Wikipedia. In particular, it shows that the user Jmorrison230582 modified an article of the English Wikipedia about Naruto: Ultimate Ninja. Furthermore, the delta attribute tell us that the user deleted some words, and the url attribute refers the to the Wikipedia page that describes the event.

{
"page": "Naruto: Ultimate Ninja",
"pageUrl": "http://en.wikipedia.org/wiki/Naruto:_Ultimate_Ninja",
"url": "https://en.wikipedia.org/w/index.php?diff=669355471&oldid=669215360",
"delta": -7, "comment": "/ Characters /",
"wikipediaUrl": "http://en.wikipedia.org",
"channel": "#en.wikipedia",
"wikipediaShort": "en",
"user": "Jmorrison230582",
"userUrl": "http://en.wikipedia.org/wiki/User/Jmorrison230582",
"unpatrolled": false,
"newPage": false,
"robot": false,
"namespace": "article"
}

In order to transform a web stream you need two components:

Web Stream Connector

A Web Stream connector is a Javascript file that needs to transform data retrieved from some web API to a NodeJS stream.

Basically what you need to do is to implement a Transform Stream (a Readable stream is fine too)

Let’s have a look at the Wikipedia example:

var stream = require('stream');
var util = require('util');
var wikichanges = require("wikichanges");

var Transform = stream.Transform || require('readable-stream').Transform;

function WikiStream(options) {
// allow use without new
if (!(this instanceof WikiStream)) {
return new WikiStream(options);
}

this.close = false;
this.w = new wikichanges.WikiChanges({
ircNickname: "jsonLDBot",
wikipedias: ["#en.wikipedia"]
});
_this = this;

this.w.listen(function(c) {
if (!_this.close) {
_this.push(JSON.stringify(c));
} else {
_this.push(null);
}
});

// init Transform
Transform.call(this, options);
}

util.inherits(WikiStream, Transform);

WikiStream.prototype._read = function(enc, cb) {};

WikiStream.prototype.closeStream = function() {
this.close = true;
};
exports = module.exports = WikiStream;

The lines var stream = require('stream'); var util = require('util'); are needed for requiring the stream module and the util module that is needed to implement the inheritance

Then, var Transform = stream.Transform || require('readable-stream').Transform; requires the actual Transform stream class

Then all the logic is implemented inside the WikiStream function

Whenever you want to put some data in the stream you need to call the `this.push(/* some data*/) function (remember that in the stream you can pass only strings)

In this particular example the code works like this:

var wikichanges = require("wikichanges"); requires the library to connect to the stream of changes of wikipedia

The code

this.w = new wikichanges.WikiChanges({
ircNickname: "jsonLDBot",
wikipedias: ["#en.wikipedia"]
});

opens the stream.

Then with the lines

this.w.listen(function(c) {
if (!_this.close) {
_this.push(JSON.stringify(c));
} else {
_this.push(null);
}
});

we create a handler that put the data in our stream whenever they are available from Wikipedia

In order to use a custom stream you need to put your file in the stream/input_stream folder, and then set the stream_name parameter in the configuration file equal to the name of your .js file

Furthermore you can use the SampleStream.js file as a stub to create your own connector.

R2RML Transformation

To adapt and transform Web streams to RDF streams we use a generic transformation process that is specified as R2RML mappings. The example below specifies how a Wikipedia stream update can be mapped to a graph of an RDF stream. This mapping defines first a triple that indicates that the generated subject is of type schema:UpdateAction. The predicateObjectMap clauses add two more triples, one specifying the object of the update (e.g. the modified wiki page) and the author of the update.

:wikiUpdateMap a rr:TriplesMap; rr:logicalTable :wikistream;
rr:subjectMap [ rr:template "http://131.175.141.249/TripleWave/{time}";
          rr:class schema:UpdateAction; rr:graphMap :streamGraph ];
rr:predicateObjectMap [rr:predicate schema:object;
                 rr:objectMap [ rr:column "pageUrl" ]];
rr:predicateObjectMap [rr:predicate schema:agent;
                 rr:objectMap [ rr:column "userUrl"] ];.

Additional mappings can be specified, as in the example below, for providing more information about the user (e.g. user name):

 :wikiUserMap a rr:TriplesMap; rr:logicalTable :wikistream;
rr:subjectMap   [ rr:column "userUrl";
        rr:class schema:Person; rr:graphMap :streamGraph ];
rr:predicateObjectMap [ rr:predicate schema:name;
                   rr:objectMap [ rr:column "user" ]];.

A snippet of the resulting RDF Stream graph, serialized in JSON-LD, is shown below.

{"http://www.w3.org/ns/prov#generatedAtTime": "2015-06-30T16:44:59.587Z",
"@id": "http://131.175.141.249/TripleWave/1435682699587",
"@graph": [
{ "@id": "http://en.wikipedia.org/wiki/User:Jmorrison230582",
"@type": "https://schema.org/Person",
"name": "Jmorrison230582" },
{ "@id": "http://131.175.141.249/TripleWave/1435682699587",
"@type": "https://schema.org/UpdateAction",
"object": {"@id": "http://en.wikipedia.org/wiki/Naruto_Ultimate_Ninja"},
"agent":  {"@id": "http://en.wikipedia.org/wiki/User:Jmorrison230582"}
}
],
"@context": "https://schema.org/"
}

In order to use your transformation you need to put the R2RML file in the transformation folder and set the stream_mapping parameter as the name of the transformation file.

Stream your own RDF data

TripleWave can convert an existing dataset (containing some temporal information) in an RDF stream and can stream it out. In the following, we explain how to configure TripleWave in order to work in this setting.

Set the execution mode and the input file

In order to stream your own RDF file, you should first set one of the two execution modes in the mode parameter of the configuration file.

Moreover, you should set the file location of the RDF file to be converted. It can be done by filling the field rdf_file in the config file, e.g.,

rdf_file=../rdf/data.ttl
Create the stream item structure

The first conversion step consists in specifying how to create the RDF stream items, i.e., a set of pairs (g,t) where g denotes an RDF graph and t a time stamp.

Being the file imported an RDF graph, i.e., a set of triples, it is necessary to specify the criteria to (1) group the data in RDF graphs and (2) associate a time instant to each of them. It is done through the following parametric SPARQL query:

PREFIX sr: <http://streamreasoning.org/>
        WITH sr:sgraph
        INSERT{
          ?g prov:generatedAt ?ts ; sr:hasKey ?key
        }
        WHERE {
          GRAPH sr:input{
          [rdf_stream_item_pattern]
          BIND (iri(concat("http://streamreasoning.org/igraph/",afn:localname(?key))) AS ?g)
          }
        }
        

The above query is dependent on the input data and this fact is captured by the [rdf_stream_item_pattern] parameter. It is necessary to set through the rdf_stream_item_pattern parameter in the config file the value with the following constraints:

TripleWave assumes that the three constraints are verified, otherwise it may not behave properly. With reference to the supplied example file data.ttl, the stream_item_pattern parameter can be set as ( in one line):

rdf_stream_item_pattern =
          ?key <http://knoesis.wright.edu/ssw/ont/sensor-observation.owl#samplingTime> ?time .
          ?time <http://www.w3.org/2006/time#inXSDDateTime> ?ts
        

Consequently, the following query is executed over the input data

PREFIX sr: <http://streamreasoning.org/>
        WITH sr:sgraph
        INSERT{
          ?g prov:generatedAt ?ts ; sr:hasKey ?key
        }
        WHERE {
          GRAPH sr:input{
           ?key <http://knoesis.wright.edu/ssw/ont/sensor-observation.owl#samplingTime> ?time .
           ?time <http://www.w3.org/2006/time#inXSDDateTime> ?ts
           BIND (iri(concat("http://streamreasoning.org/igraph/",afn:localname(?key)))  AS ?g)
          }
        }
        
Fill the stream items

The previous step creates the stream item structure, with element names and relative time instants. To complete the conversion, it is necessary to fill the stream elements with their content. This operation is done with a set of SPARQL queries in the form:

PREFIX sr: <http://streamreasoning.org/>
        WITH [g]
        INSERT{
          [stream_item_content_pattern]
        }
        WHERE {
            GRAPH sr:input{
                [stream_item_content_pattern]
            }
        }
        

[g] denotes a stream element identifier, while [stream_item_content_pattern] indicates the Basic Graph Pattern that extracts the window content.

[g] is automatically set by the TripleWave, while [stream_item_content_pattern] is loaded by the config file. That means, you should set the [stream_item_content_pattern] value through the stream_item_content_pattern parameter. As before, the special variable ?key has to be used to compose the basic graph pattern.

Continuing the example, stream_item_content_pattern parameter can be set as:

rdf_stream_item_pattern =
          ?key ?p ?o.
        

Consequently, the following example query is executed over the input data

PREFIX sr: <http://streamreasoning.org/>
        WITH <http://example.org/data.ttl#item05>
        INSERT{
            <http://example.org/data.ttl#key5> ?p ?o.
        }
        WHERE {
            GRAPH sr:input{
                <http://example.org/data.ttl#key5> ?p ?o.
            }
        }
        

It is worth noting that TripleWave automatically replace the ?key variable with the value related to the graph it has to be created.

Restream your own RDF stream

If the RDF stream is already serialized in JSON-LD according to the TripleWave schema (i.e. a sequence of graphs representing the events and described in the default graph), TripleWave can replay it. It can be enabled by setting

sources=rdfstream
and specifying the location of the RDF stream as follows:
rdfstream_file=examples/rdfstream/stream.rs

It is possible to used both replay and endless modes.

Output

WebSocket

WebSocket is a protocol to enable full-duplex communication. TripleWave implements a publish/subscribe mechanism based on this protocol, which can be enabled by setting

ws_enabled=true

Additional parameters are required to configure the WebSocket address:

ws_stream_location=/stream
ws_port=8124
ws_address=ws://localhost:8124/tw/stream

MQTT

MQTT is a protocol to exchange data in the Internet of Things. It is a publish/subscribe protocol mediated by a broker. Several MQTT brokers are available; we tested TripleWave in this mode with Mosquitto.

MQTT in TripleWave can be enabled by setting

mqtt_enabled=true

When MQTT is enabled, TripleWave expects a set of additional parameters associated to the broker. In particular, it requires the address and the port of the MQTT broker, and the topic where to send the messages:

mqtt_broker_address=localhost
mqtt_broker_port=1883
mqtt_topic=twave

Consume RDF streams

RSP service interfaces

The user can consume a triple wave stream by exploiting RSP services interfaces. The RSP services offers simple HTTP call to interact with an RSP and register stream, register query and consume results. The user can interact with RSP (in this we exemplify the operation flow using the C-SPARQL engine) and consume the stream as follow:

Here is available a compressed file containing running examples that exploit online RDF streams created with TripleWave and the C-SPARQL Engine (via RSP Services).

The source code of the RSP Services can be found on github.

The source code of the running examples client can be found on github.