Real-time Analytics with Storm and Cassandra
上QQ阅读APP看书,第一时间看更新

Anchoring and acking

We have talked about DAG that is created for the execution of a Storm topology. Now when you are designing your topologies to cater to reliability, there are two items that needs to be added to Storm:

  • Whenever a new link, that is, a new stream is being added to the DAG, it is called anchoring
  • When the tuple is processed in entirety, it is called acking

When Storm knows these preceding facts, then during the processing of tuples it can gauge them and accordingly fail or acknowledge the tuples depending upon whether they are completely processed or not.

Let's take a look at the following WordCount topology bolts to understand the Storm API anchoring and acking better:

  • SplitSentenceBolt: The purpose of this bolt was to split the sentence into different words and emit it. Now let's examine the output declarer and the execute methods of this bolt in detail (specially the highlighted sections) as shown in the following code:
      public void execute(Tuple tuple) {
          String sentence = tuple.getString(0);
          for(String word: sentence.split(" ")) {
              _collector.emit(tuple, new Values(word)); //1
          }
          _collector.ack(tuple); //2
      }
      public void declareOutputFields(OutputFieldsDeclarer  declarer) {
          declarer.declare(new Fields("word")); //3
      }
    }

The output declarer functionality of the preceding code is elaborated as follows:

  • _collector.emit: Here each tuple being emitted by the bolt on the stream called word (the second argument ) is anchored using the first argument of the method (the tuple). In this arrangement, if a failure occurs the tuple being anchored at the root of the tree would be replayed by the spout.
  • collector.ack: Here we are informing Storm that tuple has been processed successfully by this bolt. In the event of a failure, the programmer can explicitly call a fail method, or Storm internally calls it, as in the case of timeout events so that it can be replayed.
  • declarer.declare: This is the method called to specify the stream on which successfully processed tuples would be emitted. Notice that we have used the same word stream in the _collector.emit method. Similarly, if you look into the WordCount topology's Builder method, you'd find another piece in overall integration of word stream, which is as follows:
      builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

The unreliable topology

Now let's look at the unreliable version of the same topology. Here, if the tuple fails to be processed by Storm in entirety, it is not replayed by the framework. The code which we used previously, in this topology, would look like this:

java _collector.emit(new Values(word));

Thus, an un-anchored tuple is emitted by the bolt. Sometimes, due to programming needs to handle various problems, developers deliberately create unreliable topologies.