Hi folks,
I'm having a problem with a Pig job I wrote, it is throwing exceptions
in the map phase. I'm using the latest SVN of Pig, compiled against
the Hadoop15 jar included in SVN. My cluster is running Hadoop 0.15.1
on Java 1.6.0_03. Here's the pig job (which I ran through grunt):
A = LOAD 'netflix/netflix.csv' USING PigStorage(',') AS
(movie,user,rating,date);
B = GROUP A BY movie;
C = FOREACH B GENERATE group, COUNT(A.user) as ratingcount,
AVG(A.rating) as averagerating;
D = ORDER C BY averagerating;
STORE D INTO 'output/output.tsv';
A large number of jobs fail (but not all, some succeed) with the
following exception:
error: Error message from task (map) tip_200712051644_0002_m_000003
java.lang.RuntimeException: Unexpected data while reading tuple from
binary file
at org.apache.pig.impl.io.DataBagFileReader$myIterator.next(DataBagFileReader.java:81)
at org.apache.pig.impl.io.DataBagFileReader$myIterator.next(DataBagFileReader.java:41)
at org.apache.pig.impl.eval.collector.DataCollector.addToSuccessor(DataCollector.java:89)
at org.apache.pig.impl.eval.SimpleEvalSpec$1.add(SimpleEvalSpec.java:35)
at org.apache.pig.impl.eval.GenerateSpec$CrossProductItem.exec(GenerateSpec.java:273)
at org.apache.pig.impl.eval.GenerateSpec$1.add(GenerateSpec.java:86)
at org.apache.pig.impl.eval.EvalSpec.simpleEval(EvalSpec.java:216)
at org.apache.pig.impl.eval.FuncEvalSpec$1.add(FuncEvalSpec.java:105)
at org.apache.pig.impl.eval.GenerateSpec$CrossProductItem.(GenerateSpec.java:77)
at org.apache.pig.impl.mapreduceExec.PigCombine.reduce(PigCombine.java:101)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.combineAndSpill(MapTask.java:439)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpillToDisk(MapTask.java:418)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:364)
at org.apache.pig.impl.mapreduceExec.PigMapReduce$MapDataOutputCollector.add(PigMapReduce.java:309)
at org.apache.pig.impl.eval.collector.UnflattenCollector.add(UnflattenCollector.java:56)
at org.apache.pig.impl.eval.GenerateSpec$CrossProductItem.add(GenerateSpec.java:242)
at org.apache.pig.impl.eval.collector.UnflattenCollector.add(UnflattenCollector.java:56)
at org.apache.pig.impl.eval.collector.DataCollector.addToSuccessor(DataCollector.java:93)
at org.apache.pig.impl.eval.SimpleEvalSpec$1.add(SimpleEvalSpec.java:35)
at org.apache.pig.impl.eval.GenerateSpec$CrossProductItem.exec(GenerateSpec.java:273)
at org.apache.pig.impl.eval.GenerateSpec$1.add(GenerateSpec.java:86)
at org.apache.pig.impl.eval.collector.UnflattenCollector.add(UnflattenCollector.java:56)
at org.apache.pig.impl.mapreduceExec.PigMapReduce.run(PigMapReduce.java:113)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:192)
at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:1760)
As a comparison, the following job runs successfully:
A = LOAD 'netflix/netflix.csv' USING PigStorage(',') AS
(movie,user,rating,date);
B = FILTER A BY movie == '8';
C = GROUP B BY movie;
D = FOREACH C GENERATE group, COUNT(B.user) as ratingcount,
AVG(B.rating) as averagerating;
DUMP D;
Any help in tracking this down would be greatly appreciated. So far,
Pig is looking really slick and I'd love to write more advanced
programs with it.
Thanks,
Andrew Hitchcock