Hi Oleg,
I'm a total newbie when it comes to Trident, but the example below seems to
be doing what you ask for.
The line "Open MySpout instance" is printed 5 times whereas "Prepare Inc
instance" is printed 2 times, which means, I believe, that the 5 instances
of spout and 2 instances of bolt are created.
static class MySpout extends BaseRichSpout {
private SpoutOutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer
outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("value"));
}
@Override
public void open(Map map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
System.out.println("Open MySpout instance");
}
@Override
public void nextTuple() {
Utils.sleep(1000);
collector.emit(new Values(1));
}
}
static class Inc implements Function {
@Override
public void execute(TridentTuple tuple, TridentCollector collector)
{
Integer x = tuple.getInteger(0);
collector.emit(new Values(x + 1));
}
@Override
public void prepare(Map map, TridentOperationContext
tridentOperationContext) {
System.out.println("Prepare Inc instance");
}
@Override
public void cleanup() {
}
}
public static void main(String[] args) {
TridentTopology topology = new TridentTopology();
topology.newStream("stream", new MySpout())
.parallelismHint(5)
.shuffle()
.each(new Fields("value"), new Inc(), new
Fields("incrementedValue"))
.parallelismHint(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("topology", new Config(), topology.build());
Utils.sleep(5000);
cluster.shutdown();
}
On Friday, December 7, 2012 7:07:11 PM UTC+2, Oleg Migrin wrote:
Hi developers,
I have Trident toplogy:
JmsSpout jmsSpout = new JmsSpout()
TridentTopology topology = new TridentTopology();
TridentState state = topology.newStream('stream_name', jmsSpout)
.each( new Fields(...), new FirstEmitter(), new Fields(...))
.each(new Fields(...), new SecondEmitter(), new Fields(...))
...
.parallelismHint(3);
As I see that parallelism hint can be applied only for functions(bolts)
but what about spout ?
How correctly paralelize spout in my case or it will be distributed to the
all workers on cluster?
-------
Thanks!
Oleg M.