Building
a distributed data platform
with Node.js, Storm,
Kafka, and ZeroMQ
Kyle Quest
@kcqon
https://linkedin.com/in/kylequest
#gluecon
To build a data platorm you need...
Tools
1.Node.js
2.Scaling
3.APIs
4.IPC
5.Queues
6.Data processing
7.Data storage
Node.js
I/O
Productivity!
Fun :-)
Not perfect...
for(var i = 0; i < maxLineNumber; i++) {
DoSomething(fileName,i);
}
var lineReader = require('line-reader');
var async = require('async');
//...
lineReader.eachLine(fileName, function(line,last,next) {
loaderQ.push({data: line});
next();
});
var loaderQ = async.queue(function (data,next)
{
StoreDataInDB(data,next)
}, queueConcurrency);
Scaling...
...with Node.js
...with system tweaks
...with multiple machines
Talking to your platform...
APIs!
http(s)
express.js
restify
var restify = require('restify');
function OnTest(req, res, next) {
res.json({status: 'ok', name: req.params.name});
}
var server = restify.createServer();
server.get('/test/:name',OnTest);
server.listen(13000);
hapi
var Hapi = require('hapi');
var apiConfig =
{
handler: function (request) {
request.reply({status: 'ok', name: req.params.name});
},
validate: {
payload: {
userdata: Hapi.Types.String().required()
}
}
};
var apiServer = Hapi.createServer(port);
apiServer.route({
method: 'POST',
path: '/test/data',
config: apiConfig
});
apiServer.start();
Communication
inside the platform...
IPC Options
HTTP APIs
hook.io
var Hook = require('hook.io').Hook;
var hook = new Hook({name:'MyHookService1'});
hook.on('hook::ready', function (data) {
console.log('MyHookService1 started');
});
hook.on('*::DoStuff', function(data,callback){
console.log('event=%s => data=%s',this.event,data);
callback(null,'ok');
});
hook.start();
var Hook = require('hook.io').Hook;
var hook = new Hook({name:'MyHookService2'});
hook.on('hook::ready', function (data) {
hook.emit('DoStuff','data to process',
function(error,result){
console.log(result);
});
});
hook.start();
hook.io problems
dnode
var dnode = require('dnode');
var server = dnode(function (remote, conn) {
this.DoStuff = function (data, callback) {
console.log('data=%s',data);
callback('ok');
};
});
server.listen(13000);
var dnode = require('dnode');
dnode.connect(13000, function (remote, conn) {
remote.DoStuff('data to process',
function (result) {
console.log(result);
conn.end();
});
});
ZeroMQ
ZeroMQ communication patterns
var zmq = require('zmq');
var sock = zmq.socket('req');
sock.connect('tcp://127.0.0.1:13000');
var msgId = 1;
setInterval(function(){
sock.send('msg#' + msgId++);
}, 500);
socket.on('message', function(rep) {
console.log('reply => ' + rep);
});
var zmq = require('zmq');
var sock = zmq.socket('rep');
sock.bindSync('tcp://127.0.0.1:13000');
sock.on('message', function(msg){
sock.send('rep: ' + msg);
});
var zmq = require('zmq');
var sock = zmq.socket('pub');
sock.bindSync('tcp://127.0.0.1:13000');
var msgId = 1;
setInterval(function(){
sock.send('msg#' + msgId++);
sock.send('other#' + msgId++);
}, 500);
var zmq = require('zmq');
var sock = zmq.socket('sub');
sock.connect('tcp://127.0.0.1:13000');
sock.subscribe('msg');
sock.on('message', function(msg){
//will not see messages prefixed with 'other'
console.log(msg.toString());
});
var zmq = require('zmq');
var sock = zmq.socket('push');
sock.bindSync('tcp://127.0.0.1:13000');
var msgId = 1;
setInterval(function(){
sock.send('msg#' + msgId++);
}, 500);
var zmq = require('zmq');
var sock = zmq.socket('pull');
sock.connect('tcp://127.0.0.1:13000');
sock.on('message', function(msg){
console.log(msg);
});
Things to keep in mind about ZeroMQ
Other options:
stack.io, zerorpc
How to encode data
Serialized data size for
{title: 'something'}:
Protobufs: 11
MsgPack: 17
JSON: 21
Serializing/Deserializing objects:
Protobufs: 15.79 us / object
MsgPack: 11.47 us / object
JSON: 2.37 us / object
package feeds;
message Feed
{
optional string title = 1;
message Entry {
optional string title = 1;
}
repeated Entry entry = 2;
}
var fs = require('fs');
var msgpack = require('msgpack');
var Schema = require('protobuf').Schema;
var schema = new Schema(fs.readFileSync('feeds.desc'));
var Feed = schema['feeds.Feed'];
var o = {title: 'something'};
//Protobufs:
var pbSo = Feed.serialize(o);
var pbDo = Feed.parse(pbSo);
//MsgPack:
var mpSo = msgpack.pack(o);
var mpDo = msgpack.unpack(mpSo);
//JSON:
var jsSo = JSON.stringify(o);
var jsDo = JSON.parse(jsSo);
Queues
Task Queues
kue
Message Queues
Kafka
RabbitMQ
Redis*
CouchDB*
Kafka
var Producer = require('Prozess').Producer;
var producer = new Producer('demotopic', {host :'kafkaBrokerServer'});
producer.connect();
producer.on('error', function(err){console.log("Error: ", err);});
producer.on('brokerReconnectError', function(err){
console.log("could not reconnect: ", err);
});
var msgId = 1;
setInterval(function(){
producer.send(JSON.stringify({"id": msgId++}), function(err){
if (err){ console.log("[%d] send error: ", msgId,err);}
else { console.log("[%d] sent",msgId); }
});
}, 1000);
var Consumer = require('Prozess').Consumer;
var consumer = new Consumer({
host : 'kafkaBrokerServer', topic : 'demotopic',
partition : 0, offset : 0});
consumer.connect(function(err){
if (err){ throw "Kafka connection failure";}
console.log("consuming: " + consumer.topic);
setInterval(function(){
consumer.consume(function(err, messages){
if(err) {console.log('Error => %j',err);}
else {
messages.forEach(function(m){
//m: bytesLengthVal,magic,compression,checksum,payload
console.log(m.payload.toString('utf-8'));
});
}
});
}, 3000);
});
Kafka libraries
Prozess
franz-kafka
node-kafka
Prozess
*Most active/recent node.js library
*Only low level Producer/Consumer APIs
*Consumer API gets only the latest messages
unless you know the exact message offset
(will release a version without this problem)
*No support for Kafka 0.8 yet
franz-kafka
*Best high level Producer/Consumer APIs
*Latest code fails to install
(due to a missing git dependency)
node-kafka
*3 versions
*Mostly low level Producer/Consumer API
*One version has basic high level APIs
*NPM installs the version from terrancesnyder
(which doesn't have high level APIs/zookeeper support)
Things to keep in mind about Kafka
New Kafka
Data Processing Tools
Realtime vs batch processing
Need both!
Realtime processing with Storm
Node.js with Storm
var Storm = require('./storm.js');
var util = require('util');
function TestSpout()
{
Storm.Spout.call(this);
var self = this;
self.on('ack',function(id){});
self.on('fail',function(id){});
self.on('next',function(done){
var sentences =
["the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"];
self.stormEmit([sentences[Math.floor(Math.random() * sentences.length)]]);
done();
});
}
util.inherits(TestSpout, Storm.Spout);
var test = new TestSpout();
test.run();
var Storm = require('./storm.js');
var util = require('util');
function TestBolt()
{
Storm.Bolt.call(this);
var self = this;
self.on('tuple',data,function(done){
var words = data.tuple[0].split(" ");
words.forEach(function(word,idx,all){
self.stormEmit([word],self);
if((all.length - 1) == idx) {
done(); //'acks' this tuple
}
});
});
}
util.inherits(TestBolt, Storm.Bolt);
var test = new TestBolt();
test.run();
public static class TestSpout extends ShellSpout implements IRichSpout {
public TestSpout() {
super("node", "storm-spout.js");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
@Override
public Map getComponentConfiguration() {
return null;
}
}
public static class TestBolt extends ShellBolt implements IRichBolt {
public TestBolt() {
super("node", "storm-bolt.js");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map getComponentConfiguration() {
return null;
}
}
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TestSpout(), 5);
builder.setBolt("split", new TestBolt(), 8)
.shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12)
.fieldsGrouping("split", new Fields("word"));
SpoutConfig spoutConfig = new SpoutConfig(
SpoutConfig.fromHostStrings(
//Kafka brokers
ImmutableList.of("kafkaBrokerHost1", "kafkaBrokerHost2"),
8), //Number of Kafka partitions
"sentences", //Kafka topic to read
"/kafkastorm", //Zookeeper root path to store consumer offsets
"wordcounter"); // an id for this consumer (group)
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new KafkaSpout(spoutConfig));
Storm libraries
storm-node
storm-node-multilang
storm-node
*"Official" library
*Implements only Bolts
*Doesn't work as-is :-)
(requires a one line fix)
storm-node-multilang
*Implements Spouts and Bolts
*Doesn't work as-is :-)
Batch processing with Hadoop
Node.js and JavaScript with Hadoop
DIY Hadoop streaming with node.js
'timothy' NPM module
Pig JavaScript UDFs
Pig 'STREAM' command with node.js
Storing Data
Cassandra
Cassandra libraries
node-cassandra-client
CQL API only
From Rackspace
Recommended
helenus
CQL and (partial) Thrift API
Redis
node_redis
PostgreSQL
node-postgres
CouchDB
CouchDB libraries
nano
cradle
MongoDB
MongoDB libraries
node-mongoskin
light wrapper for node-mongodb-native
mongoose
Requires schemas
(if you want to deal with schemas just use PostgreSQL)
Riak
Riak libraries
riak-js
node_riak
Now you are armed and ready for action :-)