Storm drpc and why it didn’t solve the case for us.

If you have basic idea of storm and taking your step to try out its drpc feature, I would suggest you should read this before you do that.

Why we came to an idea of using drpc in storm.

Of Course this was not plan A when we started with our project which had complex calculations to be done with high throughput.

The requirements in mind were

1. should not lose any request that come into storm.

2. should be able to generate the results in real time.

As storm is known for its stream processing capabilities, our attention was on the request loss factor. So a messaging queue was required which would act as shock absorb-er when a request flood happens. And we selected kafka.

A week later, we had our topology built on storm with kafka as our spout. Requests were getting processed and the loss of requests were minimal. Issue was the processing time the storm was taking.

It had a delay of 1.8 seconds to generate a result and store to cache when we inserted a request into kafka which was absolutely unacceptable.

We did inspect on why this delay is happening in the result generation. And in storm ui the message processing time taken by bolts in our topology were absolute minimal like in single digit milliseconds. So naturally the suspicion went on kafka as it is a persistent messaging queue  which would have a delay of writing and reading from the disk.

Why processing by storm-kafka combination took 1.8 seconds?

The conclusion we could arrive at in the end is below.

The contribution in 1.8 seconds taken by the daemon goes to :

1. write time into file system while a message is produced into kafka.

2. read time from storm spout for the same message to be read from kafka.

3. and time taken by the complex calculations in other bolts to occur.

So in which storm ui was giving the values of time taken by each and every bolt. So the rest went to kafka’s bag. And it was too much of time that we cannot tolerate for a real time requirement.

Red card to Kafka. So we are left with two options to use storm.

1. replace kafka with an in memory queue

2. use drpc feature of storm and directly execute the query on topology.

So the first option we tried out is drpc.

Taking our first steps with storm drpc.

For anyone who would search on google for “storm drpc” on google, the first result that would come is the storm wiki which tells you about drpc.

Issues we come across while building our topology for drpc calls.

1. The storm wiki tells about implementing the topology with  LinearDPRCTopologyBuilder in normal storm and not on trident. But linear drpc topologies are deprecated as of now. So what should we do? We have to use trident.

2. We should have a drpc client written in the language we are going to make the drpc calls to storm. We had to make those calls in php which we could not find when we searched over the net.

Some client implementations we could find were.

1. the java drpc client

2. node-drpc client

3. py-dprc-client

Writing a storm drpc client in php

We found this question on storm-user and we ourselves put a question on stackoverflow. We dint get any proper implementations of storm drpc with php as client. So we decided to write the client by mocking up the other implementations and published the below repo.

https://github.com/mithunsatheesh/php-drpc

which was working for our our purpose.

Topology ported to trident and php client made? Now what else?

We had made our topology in trident with drpc calls. Now its time to make the jar and test in on the cluster. Before doing that make sure you start the drpc on storm cluster by doing below command.

storm drpc

If you are looking for trident drpc sample topologies the below links to storm starter project should really help.

1. TridentWordCount.java

2. TridentReach.java

Still not working for you? Check the below points:

1. While submitting to cluster, we have to have the below line

topology.newDRPCStream(“words”, null)

as null only and not a local drpc instance as you see in the local test mode samples.

2. You have to emit from the last bolt and that would be obtained as result in the drpc client call.

3. Always check the log folders of storm path for the exact error.

Did DRPC solve the problem for us?

No. We had the drpc calls working on storm and had shifted the project from kafka-storm to a drpc trident solution. But the end result was it was not satisfactory.  The kafka solution ensured that we process all the messages but had delay in processing. But with drpc the case was that it was behaving unexpectedly. Its like sometimes it would give you a correct result and at sometimes it would give a null response. This behavior tempted us to drop the drpc solution even before we started with a load test on it.

And you know storm drpc has a memory leak issue open?

We came across  this open issue in storm drpc. Which says:

“When a drpc request timeout, it is not removed from the queue of drpc server. This will cause memory leak.”. which will eventually lead the storm to crash.

It means if at any point of time a processing gets timed out in storm drpc server, the allocated resources are not freed and it would lead to a daemon crash.

Anything wrong to your reasoning? We should discuss it.

These are very beginner level conclusions that I arrived at with my minimal experience on storm and even on java. So if you think any of the above facts are wrong or even right, feel free to comment below so that we could get more clarity on the same. I put up all these together so that a new person who is going to try his luck on drpc should know all these in advance.

Advertisements
Quote

Storm and Kafka – parallelism is not magic

This came to me when we were trying to exploit the maximum out of parallelism factor in our storm topology. While going through the docs and understanding storm, I had in mind that we should increase the parallelism factor to get more throughput out of storm.

We had a sample storm topology which was using a kafka spout for its input feed. But after trying to increase the parallelism factor more than 1, we dint get a much of gain in throughput from out storm execution.

This led me to this storm group discussion:

which quotes Nathan Marz saying

The maximum parallelism you can have on a KafkaSpout is the number of partitions.

And all the spout instances which are more than the number of kafka partitions for the topic we are subscribing wont read any data.

So if you are trying to get maximum out of the parallelism factor of storm be sure to have that many number of partitions for kafka topic you are subscribing to. 🙂

Building a mesh topology with twitter storm

Storm is an super exciting framework that comes to us when we start searching for solutions in real time stream processing or even complex data manipulations.  Its a open source project from twitter and has got a really helpful wiki to get people start building their logic on storm.

Along with the wiki storm has got, Nathan Marz has also open sourced a bundle of examples which actually helps a lot in understanding the base concepts of storm like topology,  spout, bolt, tuples and so on. If you would really like to start learning it I would strongly recommend you to go though the storm wiki and the the storm starter codes that are available on github.

Here we would discuss a sample implementation of a demo storm topology that tries to make a mesh, which would be different from the normal linear topology representations that we would see in storm starter samples.

What we try to implement is a mesh structured topology which would look similar to the diagram below.

mesh

We may demo the same concept using a topology in which each message from the spout travels through its own path as specified in it self. When the message reaches each bolt it check its path contained and sends it to the next bolt defined in the path. The components of demo topology is explained below with the relevant code. The code shows the implementation of a mesh topology which has 5 sub bolts inter connected to each other.

1. Mesh Topology

In the topology random paths gets generated from a path generator bolt and gets emitted to a main decider bolt. Main decider takes the path from the message and figures out the first bolt in the path and emits to it. Sub bolts will continue to same process of emitting to the next bolt until the path array defined gets completely traversed. Once the path is completely traversed one of the sub bolt emits it to the output bolt for termination and may be write some output of operation performed.


/**
* Build the topology
*/
TopologyBuilder objTopologyBuilder = new TopologyBuilder();

/**
* Generate random paths and feed the topology
*/
objTopologyBuilder.setSpout("PathSpout", new PathSpout());

/**
* Main decider bolt which maps to first sub bolt in path.
*/
objTopologyBuilder.setBolt("MainDecider", new BoltDecider(),1)
.shuffleGrouping("PathSpout");

/**
* Each sub bolt subscribes to all other sub bolts plus the main decider
*/
objTopologyBuilder.setBolt("Bolt1", new Bolt1(),1)
.shuffleGrouping("Bolt2","StreamBolt1")
.shuffleGrouping("Bolt3","StreamBolt1")
.shuffleGrouping("Bolt4","StreamBolt1")
.shuffleGrouping("Bolt5","StreamBolt1")
.shuffleGrouping("MainDecider","StreamBolt1");

objTopologyBuilder.setBolt("Bolt2", new Bolt2(),1)
.shuffleGrouping("Bolt1","StreamBolt2")
.shuffleGrouping("Bolt3","StreamBolt2")
.shuffleGrouping("Bolt4","StreamBolt2")
.shuffleGrouping("Bolt5","StreamBolt2")
.shuffleGrouping("MainDecider","StreamBolt2");

/**
* this has to repeat for each bolt till Bolt5
*.............................................
*/

/** 
* Output writer bolt subscribes to all the sub bolts 
*/
objTopologyBuilder.setBolt("OutputWriter", new BoltFinalOutput(),1)
.shuffleGrouping("Bolt1","StreamDbWriter")
.shuffleGrouping("Bolt2","StreamDbWriter")
.shuffleGrouping("Bolt3","StreamDbWriter")
.shuffleGrouping("Bolt4","StreamDbWriter")
.shuffleGrouping("Bolt5","StreamDbWriter");

/**
* setup the Storm configuration configuration object
*/
Config objStormConfig = new Config();
objStormConfig.setDebug(false);

/**
* Submit to local cluster - development mode
*/
LocalCluster objLocalCluster = new LocalCluster();
objLocalCluster.submitTopology("MeshTopology", objStormConfig, objTopologyBuilder.createTopology());

2. Random path Spout

Spout implementation here emits a random path containing the possible bolts to follow through the topology. An example path message would like “[1,2,3]”

The emitted message goes to the main decider bolt.

@Override
public void nextTuple() {
     Utils.sleep(100);
      String[] sentences = new String[] {
           "[1,2,3,4,5]",
           "[1,3,2,4,5]",
           "[1,5,3,4,2]",
           "[1,2,5,4,3]",
           "[1,4,3,2,5]",
           "[2,1,4,3,5]",
           "[2,1,5,3,4]",
           "[3,1,4,2,5]",
           "[3,1,5,2,4]",
           "[4,1,3,2,5]",
           "[4,1,5,2,4]",
           "[5,4,3,2,1]",
           "[5,4,1,2,3]"
      };
      String sentence = sentences[_rand.nextInt(sentences.length)];
      _collector.emit(new Values(sentence));
}

3. Main Decider

The main Decider bolt receives the message from the spout and pops the last element from the path message received and emits to the bolt corresponding to the poped element from path message. So in case of “[1,2,3]”, “3” is poped from message and the remaining message “[1,2]” is sent to the bolt3.

@Override
public void execute(Tuple input) {	

	JSONArray data;	
	try {		
	     data = new JSONArray(input.getString(0));
	} catch (JSONException e) {
	     return;
	}
	
	/**
	* First Bolt to which we need to emit as per the path 
	*/
	String nextBolt = data.optString(data.length()-1);
	
	/**
	* remove first path from the array 
	*/
	data.remove(data.length()-1);
	System.err.println(data.toString());
	this.objCollector.emit("StreamBolt"+nextBolt,new Values(data.toString()));			

}

4. Sub bolts

Sub bolts receives the message from the main decider and checks if the received path message is empty. If empty then it emits to the output bolt as the path is completed. If still elements are remaining in the path array then again last element is poped and then the rest of message is sent to the corresponding bolt.

@Override
public void execute(Tuple tuple) {  	    	
	
	JSONArray data;	
	try {		
		data = new JSONArray(tuple.getString(0));
	} catch (JSONException e) {
		return;
	}		
	String nextBolt = data.optString(data.length()-1);
	data.remove(data.length()-1);
	System.err.println(data.toString());
	
	if(data.length()-1 == 0) {
		
		/**
		* Path completed so go to output bolt and terminate
		*/
		this.objCollector.emit("StreamDbWriter",new Values("output"));	
		
	} else {
	
		/**
		* emit to next bolt in path
		*/
		this.objCollector.emit("StreamBolt"+nextBolt,new Values(data.toString()));	
	
	}
	
}

5. Output Bolt

Once the path is traversed on the mesh and the path array becomes empty, the flow goes to the output bolt which is the end of topology.

The sample codes for the same are shared on this github repository.

why should we use restify?

This is just to share an experience I had while trying out the restify framework in node.js. I had a requirement to build a REST API in node.js and was thinking a lighter framework than express.js should be chosen to achieve better performance. It was that time we came across the restify which was advocating the same advantages over express.js.

In fact the “Why use restify and not express?” in their home page was giving more than one reason why everyone should choose restify if the intention is to build a Restful API. In fact the exposed methods of the framework actually resembles those of express.js and leads someone to believe that it was something made by refining express.js to build a custom tool for Restful services.

Just to get more assurance i just decided to test out the performance of both under a specified load and made two separate node apps with a simple post route.

The result that came out is worth to mention.

when i applied a sample load of 200 post requests per sec to the app running on restify, the app processed the requests for few seconds and then went non responsive to the requests. When I tested out the same load with the app on express it worked fine even on loads above 2000 requests per second.

It is true that there are various places on web where the performance flaws of restify are discussed, but those are some what old to confuse people that the framework is flawless now and the issues are sorted out. Consider the graphs on this page for an example.

The author himself quotes : “note, this chart is out of date since Restify performance issues were resolved”

But now after testing out and comparing the performances i felt the issues are still there. I have put the same as a stackoverflow question so that its open for a discussion. The node.js daemon code i used for applying the load is also posted along with the question.

May be i am wrong some where , but I would suggest a proper performance comparison before you make your choice for restify.