Doh, I see the silly mistake I made. Thanks guys.
So I have run into a different issue. I have modified this to add a
sum count for each word in the tweets, but when running I get a weird
error:
cascading.pipe.OperatorException: [2ffb298f-bf99-4aea-ba4...]
[sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)] operator Each failed executing operation
at cascading.pipe.Each$EachHandler.operate(Each.java:486)
at
cascading.flow.stack.EachReducerStackElement.operateEach(EachReducerStackElement.java:
97)
at
cascading.flow.stack.EachReducerStackElement.collect(EachReducerStackElement.java:
84)
at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
71)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
55)
at cascading.operation.Identity.operate(Identity.java:99)
at cascading.pipe.Each.applyFunction(Each.java:380)
at cascading.pipe.Each.access$200(Each.java:53)
at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
at cascading.flow.stcascading.flow.FlowException: step failed:
(1/2) ...Hfs["SequenceFile[['?word', '!__gen1']]"][2ffb298f-bf99-4aea-
ba4a-2/60550/], with job id: job_local_0001, please see cluster logs
for failure messages (NO_SOURCE_FILE:0)
user=>
ack.EachReducerStackElement.operateEach(EachReducerStackElement.java:
97)
at
cascading.flow.stack.EachReducerStackElement.collect(EachReducerStackElement.java:
84)
at cascading.pipe.Each.applyFilter(Each.java:375)
at cascading.pipe.Each.access$300(Each.java:53)
at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
at
cascading.flow.stack.EachReducerStackElement.operateEach(EachReducerStackElement.java:
97)
at
cascading.flow.stack.EachReducerStackElement.collect(EachReducerStackElement.java:
84)
at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
71)
at cascalog.ClojureMapcat.operate(ClojureMapcat.java:40)
at cascading.pipe.Each.applyFunction(Each.java:380)
at cascading.pipe.Each.access$200(Each.java:53)
at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
at
cascading.flow.stack.EachReducerStackElement.operateEach(EachReducerStackElement.java:
97)
at
cascading.flow.stack.EachReducerStackElement.collect(EachReducerStackElement.java:
84)
at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
71)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
55)
at cascading.operation.Identity.operate(Identity.java:99)
at cascading.pipe.Each.applyFunction(Each.java:380)
at cascading.pipe.Each.access$200(Each.java:53)
at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
at
cascading.flow.stack.EachReducerStackElement.operateEach(EachReducerStackElement.java:
97)
at
cascading.flow.stack.EachReducerStackElement.collect(EachReducerStackElement.java:
84)
at cascading.pipe.Each.applyFilter(Each.java:375)
at cascading.pipe.Each.access$300(Each.java:53)
at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
at
cascading.flow.stack.EachReducerStackElement.operateEach(EachReducerStackElement.java:
97)
at
cascading.flow.stack.EachReducerStackElement.collect(EachReducerStackElement.java:
84)
at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
71)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
55)
at cascading.operation.Identity.operate(Identity.java:99)
at cascading.pipe.Each.applyFunction(Each.java:380)
at cascading.pipe.Each.access$200(Each.java:53)
at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
at
cascading.flow.stack.EachReducerStackElement.operateEach(EachReducerStackElement.java:
97)
at
cascading.flow.stack.EachReducerStackElement.collect(EachReducerStackElement.java:
84)
at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
71)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
55)
at cascading.operation.Identity.operate(Identity.java:99)
at cascading.pipe.Each.applyFunction(Each.java:380)
at cascading.pipe.Each.access$200(Each.java:53)
at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
at
cascading.flow.stack.EachReducerStackElement.operateEach(EachReducerStackElement.java:
97)
at
cascading.flow.stack.EachReducerStackElement.collect(EachReducerStackElement.java:
84)
at cascading.pipe.Every$EveryBufferHandler$1.collect(Every.java:482)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
71)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
55)
at cascalog.FastFirst.operate(FastFirst.java:36)
at cascading.pipe.Every$EveryBufferHandler.operate(Every.java:534)
at
cascading.flow.stack.EveryBufferReducerStackElement.collect(EveryBufferReducerStackElement.java:
89)
at
cascading.flow.stack.GroupReducerStackElement.operateGroup(GroupReducerStackElement.java:
74)
at
cascading.flow.stack.GroupReducerStackElement.collect(GroupReducerStackElement.java:
58)
at cascading.flow.stack.FlowReducerStack.reduce(FlowReducerStack.java:
169)
at cascading.flow.FlowReducer.reduce(FlowReducer.java:75)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:
463)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
at org.apache.hadoop.mapred.LocalJobRunner
$Job.run(LocalJobRunner.java:215)
Caused by: java.lang.RuntimeException: java.lang.ClassCastException:
java.lang.String cannot be cast to java.lang.Number
at cascalog.ClojureCombinerBase.operate(ClojureCombinerBase.java:124)
at cascading.pipe.Each.applyFunction(Each.java:380)
at cascading.pipe.Each.access$200(Each.java:53)
at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
... 76 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be
cast to java.lang.Number
at clojure.lang.Numbers.add(Numbers.java:123)
at clojure.core$_PLUS_.invoke(core.clj:816)
at clojure.lang.AFn.applyToHelper(AFn.java:163)
at clojure.lang.RestFn.applyTo(RestFn.java:132)
at cascalog.ClojureCombinerBase.operate(ClojureCombinerBase.java:120)
... 80 more
here is my code:
(defn tweet_other_mentions
[]
(let [tweets (tweets_data) ]
(?<- (lfs-textline "results/tweet_mentions") [?word ?sum]
(tweets ?a_id ?data)
(split ?data :> ?word)
(c/sum ?word :> ?sum)
)))
If I take out the sum method it works fine:
(defn tweet_other_mentions
[]
(let [tweets (tweets_data) ]
(?<- (lfs-textline "results/tweet_mentions") [?word]
(tweets ?a_id ?data)
(split ?data :> ?word)
)))
Here is my split function:
(defmapcatop split [#^String words]
(seq (.split words "\\s+")))
Why is the sum method trying to sum Strings with Numbers and hwo
should I go about fixing this?
On Apr 5, 4:57 pm, Sam Ritchie wrote:Nailed it. I recommend either using tweets_data directly, or converting it
to a function:
(defn tweets_data []
(let [source (lfs-textline "tweets.json")]
(<- [?a_id ?data]
(source ?line)
(tweets_parser ?line :> ?id ?a_id ?data ?retweet_count ?at)))))
On Thu, Apr 5, 2012 at 1:35 PM, Marc Limotte wrote:Hi Jason.
I think your problem is here:
(let [tweets (tweets_data) ]
tweets_data shouldn't be in a (). It's not a function... it's a Cascalog
query (which is really map), so when you wrap in parens, it tries to
execute the map as a fn with no args... hence your exception.
Also, you might try this out in the repl. The tweets_parser function in
particular. And then move one step up after that works. Also, just to
verify.... I haven't used that particular json parser, but some of the
parsers will automatically convert string keys to keywords, so maybe you
need [:id :twitter_user_id ...] instead of ["id" ...].
Marc
On Thu, Apr 5, 2012 at 4:04 PM, Jason Toy wrote:
Could someone tell me what I'm doing wrong here,
When I run this command:
(tweet_other_mentions)
I get this error:
java.lang.IllegalArgumentException: Wrong number of args (0) passed
to: PersistentStructMap (NO_SOURCE_FILE:0)
And secondly is there a cleaner way to write this? I'm new to both
clojure and cascalog, so I realize my code is probably a mess. It
seems like the way I parse the json files could be simplified.
(ns test.core
(:use cascalog.api)
(:require [cascalog [vars :as v]])
(:require [clojure-csv [core :as csv]])
(:require [clj-json [core :as json]])
(:require [clojure [string]])
(:require [cascalog [ops :as c]]))
(defn tweets_parser
[line]
(map (json/parse-string line) ["id" "twitter_user_id" "data"
"retweet_count" "at"]))
(def tweets_data
(let [ source (lfs-textline "tweets.json")]
(<- [?a_id ?data]
(source ?line)
(tweets_parser ?line :> ?id ?a_id ?data ?retweet_count ?at)
)))
(defn tweet_other_mentions
[]
(let [tweets (tweets_data) ]
(?<- (lfs-textline "results/tweet_mentions") [?word ?sum]
(tweets ?a_id ?data)
(split ?data :> ?word)
(c/sum ?word :> ?sum)
)))
--
Sam Ritchie, Twitter Inc703.662.1337
@sritchie09
(Too brief? Here's why!http://emailcharter.org)