The configuration of TripleWave is controlled through the config.properties file, stored inside the config folder. TripleWave accepts a wide range of configuration parameters, depending on the type of usage. We can broadly group them in three categories: general, input and output. The rest of the page goes into the details according to the following outline:
In order to install TripleWave just clone the repository
git clone https://github.com/streamreasoning/TripleWave.git
Then simply run
npm install
for installing the node dependencies.
node-icu-charset-detector
you could see ../node-icu-charset-detector.cpp:5:10: fatal error: ‘unicode/ucsdet.h’ file not found (tested on OSX 10.10), see herenode-iconv
you could see ../node_modules/nan/nan.h:601:20: error: no type named ‘GCPrologueCallback’ in ‘v8::Isolate’(tested on 10.10), please ignore..These parameters set information about the instance of TripleWave, such as hostname, path and port.
TripleWave allows to generate an RDF stream from an existing stream from the Web. As an example, consider the change stream of Wikipedia. This stream features all the changes that occur on the Wikipedia website. It comprehends not only elements related to the creation or modification of pages (e.g., articles and books), but also events related to users (new registrations and blocked users), and discussions among them.
For example the following JSON excerpt (collected with the API provided here) shows a fragment of the stream of changes of Wikipedia. In particular, it shows that the user Jmorrison230582
modified an article of the English Wikipedia about Naruto: Ultimate Ninja
. Furthermore, the delta attribute tell us that the user deleted some words, and the url
attribute refers the to the Wikipedia page that describes the event.
{
"page": "Naruto: Ultimate Ninja",
"pageUrl": "http://en.wikipedia.org/wiki/Naruto:_Ultimate_Ninja",
"url": "https://en.wikipedia.org/w/index.php?diff=669355471&oldid=669215360",
"delta": -7, "comment": "/ Characters /",
"wikipediaUrl": "http://en.wikipedia.org",
"channel": "#en.wikipedia",
"wikipediaShort": "en",
"user": "Jmorrison230582",
"userUrl": "http://en.wikipedia.org/wiki/User/Jmorrison230582",
"unpatrolled": false,
"newPage": false,
"robot": false,
"namespace": "article"
}
In order to transform a web stream you need two components:
A Web Stream connector is a Javascript file that needs to transform data retrieved from some web API to a NodeJS stream.
Basically what you need to do is to implement a Transform Stream (a Readable stream is fine too)
Let’s have a look at the Wikipedia example:
var stream = require('stream');
var util = require('util');
var wikichanges = require("wikichanges");
var Transform = stream.Transform || require('readable-stream').Transform;
function WikiStream(options) {
// allow use without new
if (!(this instanceof WikiStream)) {
return new WikiStream(options);
}
this.close = false;
this.w = new wikichanges.WikiChanges({
ircNickname: "jsonLDBot",
wikipedias: ["#en.wikipedia"]
});
_this = this;
this.w.listen(function(c) {
if (!_this.close) {
_this.push(JSON.stringify(c));
} else {
_this.push(null);
}
});
// init Transform
Transform.call(this, options);
}
util.inherits(WikiStream, Transform);
WikiStream.prototype._read = function(enc, cb) {};
WikiStream.prototype.closeStream = function() {
this.close = true;
};
exports = module.exports = WikiStream;
The lines var stream = require('stream'); var util = require('util');
are needed for requiring the stream module and the util module that is needed to implement the inheritance
Then, var Transform = stream.Transform || require('readable-stream').Transform;
requires the actual Transform stream class
Then all the logic is implemented inside the WikiStream
function
Whenever you want to put some data in the stream you need to call the `this.push(/* some data*/) function (remember that in the stream you can pass only strings)
In this particular example the code works like this:
var wikichanges = require("wikichanges");
requires the library to connect to the stream of changes of wikipedia
The code
this.w = new wikichanges.WikiChanges({
ircNickname: "jsonLDBot",
wikipedias: ["#en.wikipedia"]
});
opens the stream.
Then with the lines
this.w.listen(function(c) {
if (!_this.close) {
_this.push(JSON.stringify(c));
} else {
_this.push(null);
}
});
we create a handler that put the data in our stream whenever they are available from Wikipedia
In order to use a custom stream you need to put your file in the stream/input_stream
folder, and then set the stream_name
parameter in the configuration file equal to the name of your .js file
Furthermore you can use the SampleStream.js
file as a stub to create your own connector.
To adapt and transform Web streams to RDF streams we use a generic transformation process that is specified as R2RML mappings. The example below specifies how a Wikipedia stream update can be mapped to a graph of an RDF stream. This mapping defines first a triple that indicates that the generated subject is of type schema:UpdateAction
. The predicateObjectMap
clauses add two more triples, one specifying the object of the update (e.g. the modified wiki page) and the author of the update.
:wikiUpdateMap a rr:TriplesMap; rr:logicalTable :wikistream;
rr:subjectMap [ rr:template "http://131.175.141.249/TripleWave/{time}";
rr:class schema:UpdateAction; rr:graphMap :streamGraph ];
rr:predicateObjectMap [rr:predicate schema:object;
rr:objectMap [ rr:column "pageUrl" ]];
rr:predicateObjectMap [rr:predicate schema:agent;
rr:objectMap [ rr:column "userUrl"] ];.
Additional mappings can be specified, as in the example below, for providing more information about the user (e.g. user name):
:wikiUserMap a rr:TriplesMap; rr:logicalTable :wikistream;
rr:subjectMap [ rr:column "userUrl";
rr:class schema:Person; rr:graphMap :streamGraph ];
rr:predicateObjectMap [ rr:predicate schema:name;
rr:objectMap [ rr:column "user" ]];.
A snippet of the resulting RDF Stream graph, serialized in JSON-LD, is shown below.
{"http://www.w3.org/ns/prov#generatedAtTime": "2015-06-30T16:44:59.587Z",
"@id": "http://131.175.141.249/TripleWave/1435682699587",
"@graph": [
{ "@id": "http://en.wikipedia.org/wiki/User:Jmorrison230582",
"@type": "https://schema.org/Person",
"name": "Jmorrison230582" },
{ "@id": "http://131.175.141.249/TripleWave/1435682699587",
"@type": "https://schema.org/UpdateAction",
"object": {"@id": "http://en.wikipedia.org/wiki/Naruto_Ultimate_Ninja"},
"agent": {"@id": "http://en.wikipedia.org/wiki/User:Jmorrison230582"}
}
],
"@context": "https://schema.org/"
}
In order to use your transformation you need to put the R2RML file in the transformation
folder and set the stream_mapping
parameter as the name of the transformation file.
TripleWave can convert an existing dataset (containing some temporal information) in an RDF stream and can stream it out. In the following, we explain how to configure TripleWave in order to work in this setting.
In order to stream your own RDF file, you should first set one of the two execution modes in the mode
parameter of the configuration file.
Moreover, you should set the file location of the RDF file to be converted. It can be done by filling the field rdf_file
in the config file, e.g.,
rdf_file=../rdf/data.ttl
The first conversion step consists in specifying how to create the RDF stream items, i.e., a set of pairs (g,t) where g denotes an RDF graph and t a time stamp.
Being the file imported an RDF graph, i.e., a set of triples, it is necessary to specify the criteria to (1) group the data in RDF graphs and (2) associate a time instant to each of them. It is done through the following parametric SPARQL query:
PREFIX sr: <http://streamreasoning.org/>
WITH sr:sgraph
INSERT{
?g prov:generatedAt ?ts ; sr:hasKey ?key
}
WHERE {
GRAPH sr:input{
[rdf_stream_item_pattern]
BIND (iri(concat("http://streamreasoning.org/igraph/",afn:localname(?key))) AS ?g)
}
}
The above query is dependent on the input data and this fact is captured by the [rdf_stream_item_pattern] parameter. It is necessary to set through the rdf_stream_item_pattern
parameter in the config file the value with the following constraints:
TripleWave assumes that the three constraints are verified, otherwise it may not behave properly. With reference to the supplied example file data.ttl, the stream_item_pattern
parameter can be set as (
in one line):
rdf_stream_item_pattern =
?key <http://knoesis.wright.edu/ssw/ont/sensor-observation.owl#samplingTime> ?time .
?time <http://www.w3.org/2006/time#inXSDDateTime> ?ts
Consequently, the following query is executed over the input data
PREFIX sr: <http://streamreasoning.org/>
WITH sr:sgraph
INSERT{
?g prov:generatedAt ?ts ; sr:hasKey ?key
}
WHERE {
GRAPH sr:input{
?key <http://knoesis.wright.edu/ssw/ont/sensor-observation.owl#samplingTime> ?time .
?time <http://www.w3.org/2006/time#inXSDDateTime> ?ts
BIND (iri(concat("http://streamreasoning.org/igraph/",afn:localname(?key))) AS ?g)
}
}
The previous step creates the stream item structure, with element names and relative time instants. To complete the conversion, it is necessary to fill the stream elements with their content. This operation is done with a set of SPARQL queries in the form:
PREFIX sr: <http://streamreasoning.org/>
WITH [g]
INSERT{
[stream_item_content_pattern]
}
WHERE {
GRAPH sr:input{
[stream_item_content_pattern]
}
}
[g] denotes a stream element identifier, while [stream_item_content_pattern] indicates the Basic Graph Pattern that extracts the window content.
[g] is automatically set by the TripleWave, while [stream_item_content_pattern] is loaded by the config file. That means, you should set the [stream_item_content_pattern] value through the
stream_item_content_pattern
parameter. As before, the special variable ?key has to be used to compose the basic graph pattern.
Continuing the example, stream_item_content_pattern
parameter can be set as:
rdf_stream_item_pattern =
?key ?p ?o.
Consequently, the following example query is executed over the input data
PREFIX sr: <http://streamreasoning.org/>
WITH <http://example.org/data.ttl#item05>
INSERT{
<http://example.org/data.ttl#key5> ?p ?o.
}
WHERE {
GRAPH sr:input{
<http://example.org/data.ttl#key5> ?p ?o.
}
}
It is worth noting that TripleWave automatically replace the ?key variable with the value related to the graph it has to be created.
If the RDF stream is already serialized in JSON-LD according to the TripleWave schema (i.e. a sequence of graphs representing the events and described in the default graph), TripleWave can replay it. It can be enabled by setting
sources=rdfstream
rdfstream_file=examples/rdfstream/stream.rs
It is possible to used both replay and endless modes.
WebSocket is a protocol to enable full-duplex communication. TripleWave implements a publish/subscribe mechanism based on this protocol, which can be enabled by setting
ws_enabled=true
Additional parameters are required to configure the WebSocket address:
ws_stream_location=/stream
ws_port=8124
ws_address=ws://localhost:8124/tw/stream
MQTT is a protocol to exchange data in the Internet of Things. It is a publish/subscribe protocol mediated by a broker. Several MQTT brokers are available; we tested TripleWave in this mode with Mosquitto.
MQTT in TripleWave can be enabled by setting
mqtt_enabled=true
When MQTT is enabled, TripleWave expects a set of additional parameters associated to the broker. In particular, it requires the address and the port of the MQTT broker, and the topic where to send the messages:
mqtt_broker_address=localhost
mqtt_broker_port=1883
mqtt_topic=twave
The user can consume a triple wave stream by exploiting RSP services interfaces. The RSP services offers simple HTTP call to interact with an RSP and register stream, register query and consume results. The user can interact with RSP (in this we exemplify the operation flow using the C-SPARQL engine) and consume the stream as follow:
<serveraddress>/streams
) to the RSP Sevices interfaces with the parameter streamIRI
in the body (it represents the unique ID of the stream in the engine).<serveraddress>/streams/<queryName>
with the query in the body as raw stringHere is available a compressed file containing running examples that exploit online RDF streams created with TripleWave and the C-SPARQL Engine (via RSP Services).
The source code of the RSP Services can be found on github.
The source code of the running examples client can be found on github.