FAQ
Hi developers,


I have Trident toplogy:

JmsSpout jmsSpout = new JmsSpout()
TridentTopology topology = new TridentTopology();

TridentState state = topology.newStream('stream_name', jmsSpout)
.each( new Fields(...), new FirstEmitter(), new Fields(...))
.each(new Fields(...), new SecondEmitter(), new Fields(...))
...
.parallelismHint(3);
As I see that parallelism hint can be applied only for functions(bolts) but
what about spout ?
How correctly paralelize spout in my case or it will be distributed to the
all workers on cluster?

-------
Thanks!
Oleg M.

Search Discussions

  • Gleb Alexeyev at Dec 13, 2012 at 5:44 am
    Hi Oleg,

    I'm a total newbie when it comes to Trident, but the example below seems to
    be doing what you ask for.
    The line "Open MySpout instance" is printed 5 times whereas "Prepare Inc
    instance" is printed 2 times, which means, I believe, that the 5 instances
    of spout and 2 instances of bolt are created.

    static class MySpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    @Override
    public void declareOutputFields(OutputFieldsDeclarer
    outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("value"));
    }

    @Override
    public void open(Map map, TopologyContext topologyContext,
    SpoutOutputCollector spoutOutputCollector) {
    this.collector = spoutOutputCollector;
    System.out.println("Open MySpout instance");
    }

    @Override
    public void nextTuple() {
    Utils.sleep(1000);
    collector.emit(new Values(1));
    }
    }

    static class Inc implements Function {

    @Override
    public void execute(TridentTuple tuple, TridentCollector collector)
    {
    Integer x = tuple.getInteger(0);
    collector.emit(new Values(x + 1));
    }

    @Override
    public void prepare(Map map, TridentOperationContext
    tridentOperationContext) {
    System.out.println("Prepare Inc instance");
    }

    @Override
    public void cleanup() {
    }
    }

    public static void main(String[] args) {
    TridentTopology topology = new TridentTopology();

    topology.newStream("stream", new MySpout())
    .parallelismHint(5)
    .shuffle()
    .each(new Fields("value"), new Inc(), new
    Fields("incrementedValue"))
    .parallelismHint(2);

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("topology", new Config(), topology.build());

    Utils.sleep(5000);
    cluster.shutdown();
    }
    On Friday, December 7, 2012 7:07:11 PM UTC+2, Oleg Migrin wrote:

    Hi developers,


    I have Trident toplogy:

    JmsSpout jmsSpout = new JmsSpout()
    TridentTopology topology = new TridentTopology();

    TridentState state = topology.newStream('stream_name', jmsSpout)
    .each( new Fields(...), new FirstEmitter(), new Fields(...))
    .each(new Fields(...), new SecondEmitter(), new Fields(...))
    ...
    .parallelismHint(3);
    As I see that parallelism hint can be applied only for functions(bolts)
    but what about spout ?
    How correctly paralelize spout in my case or it will be distributed to the
    all workers on cluster?

    -------
    Thanks!
    Oleg M.
  • Oleg Migrin at Dec 14, 2012 at 9:15 am
    Thanks Gleb!
    This approach works for me.

    Yeah, Trident API quite poorly documented, so it's difficult sometimes
    figure-out (without JavaDoc) how it should work.

    ------
    Oleg M.
    On Wednesday, December 12, 2012 1:31:38 PM UTC+2, Gleb Alexeyev wrote:

    Hi Oleg,

    I'm a total newbie when it comes to Trident, but the example below seems
    to be doing what you ask for.
    The line "Open MySpout instance" is printed 5 times whereas "Prepare Inc
    instance" is printed 2 times, which means, I believe, that the 5 instances
    of spout and 2 instances of bolt are created.

    static class MySpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    @Override
    public void declareOutputFields(OutputFieldsDeclarer
    outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("value"));
    }

    @Override
    public void open(Map map, TopologyContext topologyContext,
    SpoutOutputCollector spoutOutputCollector) {
    this.collector = spoutOutputCollector;
    System.out.println("Open MySpout instance");
    }

    @Override
    public void nextTuple() {
    Utils.sleep(1000);
    collector.emit(new Values(1));
    }
    }

    static class Inc implements Function {

    @Override
    public void execute(TridentTuple tuple, TridentCollector
    collector) {
    Integer x = tuple.getInteger(0);
    collector.emit(new Values(x + 1));
    }

    @Override
    public void prepare(Map map, TridentOperationContext
    tridentOperationContext) {
    System.out.println("Prepare Inc instance");
    }

    @Override
    public void cleanup() {
    }
    }

    public static void main(String[] args) {
    TridentTopology topology = new TridentTopology();

    topology.newStream("stream", new MySpout())
    .parallelismHint(5)
    .shuffle()
    .each(new Fields("value"), new Inc(), new
    Fields("incrementedValue"))
    .parallelismHint(2);

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("topology", new Config(), topology.build());

    Utils.sleep(5000);
    cluster.shutdown();
    }
    On Friday, December 7, 2012 7:07:11 PM UTC+2, Oleg Migrin wrote:

    Hi developers,


    I have Trident toplogy:

    JmsSpout jmsSpout = new JmsSpout()
    TridentTopology topology = new TridentTopology();

    TridentState state = topology.newStream('stream_name', jmsSpout)
    .each( new Fields(...), new FirstEmitter(), new
    Fields(...))
    .each(new Fields(...), new SecondEmitter(), new Fields(...))
    ...
    .parallelismHint(3);
    As I see that parallelism hint can be applied only for functions(bolts)
    but what about spout ?
    How correctly paralelize spout in my case or it will be distributed to
    the all workers on cluster?

    -------
    Thanks!
    Oleg M.

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupstorm-user @
postedDec 10, '12 at 7:23a
activeDec 14, '12 at 9:15a
posts3
users2
websitestorm-project.net
irc#storm-user

2 users in discussion

Oleg Migrin: 2 posts Gleb Alexeyev: 1 post

People

Translate

site design / logo © 2022 Grokbase