I have been trying to process a graph in a simple cyclic topology. It is
defined as such (in Scala):
builder.setSpout("edges", graphspout)
builder.setBolt("walk", new WalkBolt)
.fieldsGrouping("edges", new Fields("edge-from"))
.fieldsGrouping("walk", "step", new Fields("source-vertex"))
The spout emits one edge at a time as a 2-tuple (vertex-from, vertex-to),
and is supposed to emit a "stop edge" when it is done.
the WalkBolt emits a vertex in a walk on the graph until it reaches a
termination condition. Now if I test this on a minimal graph with 4 edges,
it works fine. Once all edges are emitted, the "stop edge" is continuously
re-emitted. But if I send in a graph with 60k edges, Storm just seems to
hang after emitting 2-3k edges. I do ack all the tuples. The problem is
"solved" by increasing the WalkBolt parallelism, but clearly this is not
the robustness I was hoping for.
After the spouts stop emitting, I get this message:
11570 [Thread-2] INFO backtype.storm.daemon.nimbus - Setting new
assignment for topology id pr-1-1355165178:
#backtype.storm.daemon.common.Assignment{:master-code-dir
"/var/folders/5h/d2dz7qpx7pd7mr5fk6tzl7640000gp/T//324cd991-8c2f-4817-a329-46514fef57cb/nimbus/stormdist/pr-1-1355165178",
:node->host {"9de60e2a-d1d1-49d0-83c2-f334b1544800" "km.cl.intra"},
:executor->node+port {[3 3] ["9de60e2a-d1d1-49d0-83c2-f334b1544800" 1], [4
4] ["9de60e2a-d1d1-49d0-83c2-f334b1544800" 1], [2 2]
["9de60e2a-d1d1-49d0-83c2-f334b1544800" 1], [1 1]
["9de60e2a-d1d1-49d0-83c2-f334b1544800" 1]}, :executor->start-time-secs {[4
4] 1355165187, [1 1] 1355165187, [2 2] 1355165187, [3 3] 1355165187}}
After that, Storm seems to be continuing its infinite loop and burning CPU,
but all I get are these messages:
32849 [Thread-22] INFO backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __tick, id: {}, [30]
I am currently testing this in local mode. Hints and help very appreciated.
Justin Kaeser