I have a transactional topology with kafka spout. The following is how I
define the topology:
String topicName = cmd.getOptionValue(topic.getOpt());
TridentKafkaConfig kafkaConf = new TridentKafkaConfig(newKafkaConfig.ZkHosts(cmd.getOptionValue(kafkaHosts.getOpt()),
"/brokers"),
topicName);
kafkaConf.scheme = new RawScheme();
if (cmd.hasOption(startOffset.getOpt())){
kafkaConf.forceStartOffsetTime(Integer.parseInt(cmd.getOptionValue(startOffset.getOpt())));
}
TransactionalTridentKafkaSpout spout = newTransactionalTridentKafkaSpout(kafkaConf);
TridentTopology topology = new TridentTopology();
topology.newStream("impressionEvents", spout).each(new Fields("bytes"), newImpressionLogLineParser(),
new Fields(ImpressionLogLineParser.getFieldNames()))
.each(new Fields("date"),new RoundDownTimeStamp(Calendar.HOUR_OF_DAY), newFields(
"roundHour"))
.each(new Fields("referrer"),new ExtractDomain(), new Fields("domain"))
.groupBy(new Fields("roundHour", "domain"))
.persistentAggregate(new ImpressionsByDomainTopology.Factory(tds), newCount(),
new Fields("count"))
.parallelismHint(Integer.parseInt(cmd.getOptionValue(parallelismOpt.getOpt(),
"1")));
The topology can be run in local mode without any problem. But It can't
start worker when launched to the storm cluster. I am using
storm-0.9.0-wip3 in the cluster and topology compiler. Here is the only
info I can get from Supervisor.log:
2012-12-05 00:42:45 b.s.d.supervisor [INFO]
b8aa99ea-7a6a-48a4-b43c-688eab1bb8db still hasn't started
2012-12-05 00:42:45 b.s.d.supervisor [INFO]
b8aa99ea-7a6a-48a4-b43c-688eab1bb8db still hasn't started
2012-12-05 00:42:46 b.s.d.supervisor [INFO]
b8aa99ea-7a6a-48a4-b43c-688eab1bb8db still hasn't started
Is there any other log file that I can take a look at the error which
prevents the worker starting successfully?
Thanks