FAQ
Hi, I have a beginner's problem, I am having problems loading json
data, could someone explain why both my test queries dont work:

(ns test.core
(:use cascalog.api)
(:require [clojure-csv [core :as csv]])
(:require [clj-json [core :as json]])
(:require [cascalog [ops :as c]]))

(defmacro bootstrap []
'(do
(use (quote cascalog.api))
(require (quote [clojure-csv [core :as csv]]))
(require (quote [clj-json [core :as json]]))
(require (quote [cascalog [ops :as c]]))))


(defn parse_json_file [file]
(let [ source (lfs-textline file)]
(<- [?record]
(source ?line)
(json/parse-string ?line :> ?record))))


(defn followers
[]
(let [ source (lfs-textline "f.json")]
(<- [?a_id ?b_id]
(source ?line)
(json/parse-string ?line :> ?a_id ?b_id))))

(defn first_test_query
"this is a test"
[]
(?<- (stdout) [?record]
((parse_json_file "f.json" ) ?record)
))

(defn second_test_query
"this is a test2"
[]
(let [follows (followers)]
(?<- (stdout) [?a_id ?b_id] (follows ?a_id ?b_id))
)
)

and my f.json file is just:
{"a_id":"1081","b_id":"12870592"}
{"a_id":"1081","b_id":"1366401"}
{"a_id":"1081","b_id":"61233"}
{"a_id":"1081","b_id":"16259976"}
{"a_id":"1081","b_id":"1479681"}
{"a_id":"1081","b_id":"771648"}
{"a_id":"1081","b_id":"622543"}
{"a_id":"1081","b_id":"3861"}
{"a_id":"1081","b_id":"14945059"}
{"a_id":"1081","b_id":"17588833"}

Search Discussions

  • Paul Lam at Mar 16, 2012 at 2:45 pm
    what's the error message?

    On Friday, 16 March 2012 10:19:22 UTC, Jason Toy wrote:

    Hi, I have a beginner's problem, I am having problems loading json
    data, could someone explain why both my test queries dont work:

    (ns test.core
    (:use cascalog.api)
    (:require [clojure-csv [core :as csv]])
    (:require [clj-json [core :as json]])
    (:require [cascalog [ops :as c]]))

    (defmacro bootstrap []
    '(do
    (use (quote cascalog.api))
    (require (quote [clojure-csv [core :as csv]]))
    (require (quote [clj-json [core :as json]]))
    (require (quote [cascalog [ops :as c]]))))


    (defn parse_json_file [file]
    (let [ source (lfs-textline file)]
    (<- [?record]
    (source ?line)
    (json/parse-string ?line :> ?record))))


    (defn followers
    []
    (let [ source (lfs-textline "f.json")]
    (<- [?a_id ?b_id]
    (source ?line)
    (json/parse-string ?line :> ?a_id ?b_id))))

    (defn first_test_query
    "this is a test"
    []
    (?<- (stdout) [?record]
    ((parse_json_file "f.json" ) ?record)
    ))

    (defn second_test_query
    "this is a test2"
    []
    (let [follows (followers)]
    (?<- (stdout) [?a_id ?b_id] (follows ?a_id ?b_id))
    )
    )

    and my f.json file is just:
    {"a_id":"1081","b_id":"12870592"}
    {"a_id":"1081","b_id":"1366401"}
    {"a_id":"1081","b_id":"61233"}
    {"a_id":"1081","b_id":"16259976"}
    {"a_id":"1081","b_id":"1479681"}
    {"a_id":"1081","b_id":"771648"}
    {"a_id":"1081","b_id":"622543"}
    {"a_id":"1081","b_id":"3861"}
    {"a_id":"1081","b_id":"14945059"}
    {"a_id":"1081","b_id":"17588833"}
  • Jason Toy at Mar 16, 2012 at 3:16 pm
    Here is the trace I get:

    12/03/16 18:16:39 INFO util.Util: using default application jar, may
    cause class not found exceptions on the cluster
    12/03/16 18:16:39 INFO flow.MultiMapReducePlanner: using application
    jar: /Users/jtoy/sandbox/cascalog/lib/cascading-core-1.2.4.jar
    12/03/16 18:16:39 INFO flow.Flow: [] starting
    12/03/16 18:16:39 INFO flow.Flow: [] source: Lfs["TextLine[['line']-
    [ALL]]"]["f.json"]"]
    12/03/16 18:16:39 INFO flow.Flow: [] sink:
    StdoutTap["SequenceFile[[UNKNOWN]->[ALL]]"]["/var/folders/kq/
    vx7c22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:39 INFO flow.Flow: [] parallel execution is enabled:
    false
    12/03/16 18:16:39 INFO flow.Flow: [] starting jobs: 2
    12/03/16 18:16:39 INFO flow.Flow: [] allocating threads: 1
    12/03/16 18:16:39 INFO flow.FlowStep: [] starting step: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-b41a-4cac-9473-7/37584/]
    12/03/16 18:16:39 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics
    with processName=JobTracker, sessionId= - already initialized
    12/03/16 18:16:39 INFO mapred.FileInputFormat: Total input paths to
    process : 1
    12/03/16 18:16:40 INFO mapred.FileInputFormat: Total input paths to
    process : 1
    12/03/16 18:16:40 INFO hadoop.MultiInputSplit: current split input
    path: file:/Users/jtoy/sandbox/cascalog/f.json
    12/03/16 18:16:40 INFO mapred.MapTask: numReduceTasks: 1
    12/03/16 18:16:40 INFO mapred.MapTask: io.sort.mb = 100
    12/03/16 18:16:40 INFO mapred.MapTask: data buffer = 79691776/99614720
    12/03/16 18:16:40 INFO mapred.MapTask: record buffer = 262144/327680
    12/03/16 18:16:40 WARN mapred.LocalJobRunner: job_local_0002
    cascading.CascadingException: unable to load serializer for:
    clojure.lang.PersistentArrayMap from:
    org.apache.hadoop.io.serializer.SerializationFactory
    at
    cascading.tuple.hadoop.TupleSerialization.getNewSerializer(TupleSerialization.java:
    309)
    at
    cascading.tuple.hadoop.SerializationElementWriter.write(SerializationElementWriter.java:
    75)
    at cascading.tuple.TupleOutputStream.write(TupleOutputStream.java:
    221)
    at
    cascading.tuple.TupleOutputStream.writeTuple(TupleOutputStream.java:
    179)
    at
    cascading.tuple.hadoop.TupleSerializer.serialize(TupleSerializer.java:
    37)
    at
    cascading.tuple.hadoop.TupleSerializer.serialize(TupleSerializer.java:
    28)
    at org.apache.hadoop.mapred.MapTask
    $MapOutputBuffer.collect(MapTask.java:854)
    at org.apache.hadoop.mapred.MapTask
    $OldOutputCollector.collect(MapTask.java:466)
    at cascading.pipe.Group.collectReduceGrouping(Group.java:961)
    at
    cascading.flow.stack.GroupMapperStackElement.operateGroup(GroupMapperStackElement.java:
    82)
    at
    cascading.flow.stack.GroupMapperStackElement.collect(GroupMapperStackElement.java:
    70)
    at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    55)
    at cascading.operation.Identity.operate(Identity.java:99)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:
    82)
    at cascading.pipe.Each.applyFilter(Each.java:375)
    at cascading.pipe.Each.access$300(Each.java:53)
    at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:
    82)
    at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at cascalog.ClojureMap.operate(ClojureMap.java:35)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:
    82)
    at cascading.pipe.Each.applyFilter(Each.java:375)
    at cascading.pipe.Each.access$300(Each.java:53)
    at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:
    82)
    at cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    55)
    at cascading.operation.Identity.operate(Identity.java:99)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:
    82)
    at cascading.flow.stack.FlowMapperStack.map(FlowMapperStack.java:220)
    at cascading.flow.FlowMapper.map(FlowMapper.java:75)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
    at org.apache.hadoop.mapred.LocalJobRunner
    $Job.run(LocalJobRunner.java:176)
    12/03/16 18:16:40 WARN flow.FlowStep: [] task completion events
    identify failed tasks
    12/03/16 18:16:40 WARN flow.FlowStep: [] task completion events count:

    12/03/16 18:16:40 WARN flow.FlowStep: [] abandoning step: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"], predecessor failed:
    (1/2) TempHfs["SequenceFile[['?record']]"][6990c1a4-
    b41a-4cac-9473-7/37584/]
    12/03/16 18:16:40 INFO flow.FlowStep: [] stopping: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:40 WARN flow.Flow: stopping jobs
    12/03/16 18:16:40 INFO flow.FlowStep: [] stopping: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:40 INFO flow.FlowStep: [] stopping: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-b41a-4cac-9473-7/37584/]
    12/03/16 18:16:40 WARN flow.Flow: stopped jobs
    12/03/16 18:16:40 WARN flow.Flow: shutting down job executor
    12/03/16 18:16:40 WARN flow.Flow: shutdown complete
    RESULTS
    -----------------------
    -----------------------
    cascading.flow.FlowException: step failed: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-
    b41a-4cac-9473-7/37584/], with job id: job_local_0002, please see
    cluster logs for failure messages (NO_SOURCE_FILE:0)

    On Mar 16, 10:45 pm, Paul Lam wrote:
    what's the error message?






    On Friday, 16 March 2012 10:19:22 UTC, Jason Toy wrote:

    Hi, I have a beginner's problem, I am having problems loading json
    data, could someone explain why both my test queries dont work:
    (ns test.core
    (:use cascalog.api)
    (:require [clojure-csv [core :as csv]])
    (:require [clj-json [core :as json]])
    (:require [cascalog [ops :as c]]))
    (defmacro bootstrap []
    '(do
    (use (quote cascalog.api))
    (require (quote [clojure-csv [core :as csv]]))
    (require (quote [clj-json [core :as json]]))
    (require (quote [cascalog [ops :as c]]))))
    (defn parse_json_file [file]
    (let [ source (lfs-textline file)]
    (<- [?record]
    (source ?line)
    (json/parse-string ?line :> ?record))))
    (defn followers
    []
    (let [ source (lfs-textline "f.json")]
    (<- [?a_id ?b_id]
    (source ?line)
    (json/parse-string ?line :> ?a_id ?b_id))))
    (defn first_test_query
    "this is a test"
    []
    (?<- (stdout) [?record]
    ((parse_json_file "f.json" ) ?record)
    ))
    (defn second_test_query
    "this is a test2"
    []
    (let [follows (followers)]
    (?<- (stdout) [?a_id ?b_id] (follows ?a_id ?b_id))
    )
    )
    and my f.json file is just:
    {"a_id":"1081","b_id":"12870592"}
    {"a_id":"1081","b_id":"1366401"}
    {"a_id":"1081","b_id":"61233"}
    {"a_id":"1081","b_id":"16259976"}
    {"a_id":"1081","b_id":"1479681"}
    {"a_id":"1081","b_id":"771648"}
    {"a_id":"1081","b_id":"622543"}
    {"a_id":"1081","b_id":"3861"}
    {"a_id":"1081","b_id":"14945059"}
    {"a_id":"1081","b_id":"17588833"}
  • Paul Lam at Mar 16, 2012 at 5:17 pm
    are you using 1.8.6 or later?


    On Friday, 16 March 2012 15:16:51 UTC, Jason Toy wrote:

    Here is the trace I get:

    12/03/16 18:16:39 INFO util.Util: using default application jar, may
    cause class not found exceptions on the cluster
    12/03/16 18:16:39 INFO flow.MultiMapReducePlanner: using application
    jar: /Users/jtoy/sandbox/cascalog/lib/cascading-core-1.2.4.jar
    12/03/16 18:16:39 INFO flow.Flow: [] starting
    12/03/16 18:16:39 INFO flow.Flow: [] source: Lfs["TextLine[['line']-
    [ALL]]"]["f.json"]"]
    12/03/16 18:16:39 INFO flow.Flow: [] sink:
    StdoutTap["SequenceFile[[UNKNOWN]->[ALL]]"]["/var/folders/kq/
    vx7c22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:39 INFO flow.Flow: [] parallel execution is enabled:
    false
    12/03/16 18:16:39 INFO flow.Flow: [] starting jobs: 2
    12/03/16 18:16:39 INFO flow.Flow: [] allocating threads: 1
    12/03/16 18:16:39 INFO flow.FlowStep: [] starting step: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-b41a-4cac-9473-7/37584/]
    12/03/16 18:16:39 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics
    with processName=JobTracker, sessionId= - already initialized
    12/03/16 18:16:39 INFO mapred.FileInputFormat: Total input paths to
    process : 1
    12/03/16 18:16:40 INFO mapred.FileInputFormat: Total input paths to
    process : 1
    12/03/16 18:16:40 INFO hadoop.MultiInputSplit: current split input
    path: file:/Users/jtoy/sandbox/cascalog/f.json
    12/03/16 18:16:40 INFO mapred.MapTask: numReduceTasks: 1
    12/03/16 18:16:40 INFO mapred.MapTask: io.sort.mb = 100
    12/03/16 18:16:40 INFO mapred.MapTask: data buffer = 79691776/99614720
    12/03/16 18:16:40 INFO mapred.MapTask: record buffer = 262144/327680
    12/03/16 18:16:40 WARN mapred.LocalJobRunner: job_local_0002
    cascading.CascadingException: unable to load serializer for:
    clojure.lang.PersistentArrayMap from:
    org.apache.hadoop.io.serializer.SerializationFactory
    at
    cascading.tuple.hadoop.TupleSerialization.getNewSerializer(TupleSerialization.java:

    309)
    at
    cascading.tuple.hadoop.SerializationElementWriter.write(SerializationElementWriter.java:

    75)
    at cascading.tuple.TupleOutputStream.write(TupleOutputStream.java:
    221)
    at
    cascading.tuple.TupleOutputStream.writeTuple(TupleOutputStream.java:
    179)
    at
    cascading.tuple.hadoop.TupleSerializer.serialize(TupleSerializer.java:
    37)
    at
    cascading.tuple.hadoop.TupleSerializer.serialize(TupleSerializer.java:
    28)
    at org.apache.hadoop.mapred.MapTask
    $MapOutputBuffer.collect(MapTask.java:854)
    at org.apache.hadoop.mapred.MapTask
    $OldOutputCollector.collect(MapTask.java:466)
    at cascading.pipe.Group.collectReduceGrouping(Group.java:961)
    at
    cascading.flow.stack.GroupMapperStackElement.operateGroup(GroupMapperStackElement.java:

    82)
    at
    cascading.flow.stack.GroupMapperStackElement.collect(GroupMapperStackElement.java:

    70)
    at
    cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    55)
    at cascading.operation.Identity.operate(Identity.java:99)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:

    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:

    82)
    at cascading.pipe.Each.applyFilter(Each.java:375)
    at cascading.pipe.Each.access$300(Each.java:53)
    at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:

    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:

    82)
    at
    cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at cascalog.ClojureMap.operate(ClojureMap.java:35)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:

    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:

    82)
    at cascading.pipe.Each.applyFilter(Each.java:375)
    at cascading.pipe.Each.access$300(Each.java:53)
    at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:

    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:

    82)
    at
    cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    55)
    at cascading.operation.Identity.operate(Identity.java:99)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:

    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:

    82)
    at
    cascading.flow.stack.FlowMapperStack.map(FlowMapperStack.java:220)
    at cascading.flow.FlowMapper.map(FlowMapper.java:75)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
    at org.apache.hadoop.mapred.LocalJobRunner
    $Job.run(LocalJobRunner.java:176)
    12/03/16 18:16:40 WARN flow.FlowStep: [] task completion events
    identify failed tasks
    12/03/16 18:16:40 WARN flow.FlowStep: [] task completion events count:

    12/03/16 18:16:40 WARN flow.FlowStep: [] abandoning step: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"], predecessor failed:
    (1/2) TempHfs["SequenceFile[['?record']]"][6990c1a4-
    b41a-4cac-9473-7/37584/]
    12/03/16 18:16:40 INFO flow.FlowStep: [] stopping: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:40 WARN flow.Flow: stopping jobs
    12/03/16 18:16:40 INFO flow.FlowStep: [] stopping: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:40 INFO flow.FlowStep: [] stopping: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-b41a-4cac-9473-7/37584/]
    12/03/16 18:16:40 WARN flow.Flow: stopped jobs
    12/03/16 18:16:40 WARN flow.Flow: shutting down job executor
    12/03/16 18:16:40 WARN flow.Flow: shutdown complete
    RESULTS
    -----------------------
    -----------------------
    cascading.flow.FlowException: step failed: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-
    b41a-4cac-9473-7/37584/], with job id: job_local_0002, please see
    cluster logs for failure messages (NO_SOURCE_FILE:0)

    On Mar 16, 10:45 pm, Paul Lam wrote:
    what's the error message?






    On Friday, 16 March 2012 10:19:22 UTC, Jason Toy wrote:

    Hi, I have a beginner's problem, I am having problems loading json
    data, could someone explain why both my test queries dont work:
    (ns test.core
    (:use cascalog.api)
    (:require [clojure-csv [core :as csv]])
    (:require [clj-json [core :as json]])
    (:require [cascalog [ops :as c]]))
    (defmacro bootstrap []
    '(do
    (use (quote cascalog.api))
    (require (quote [clojure-csv [core :as csv]]))
    (require (quote [clj-json [core :as json]]))
    (require (quote [cascalog [ops :as c]]))))
    (defn parse_json_file [file]
    (let [ source (lfs-textline file)]
    (<- [?record]
    (source ?line)
    (json/parse-string ?line :> ?record))))
    (defn followers
    []
    (let [ source (lfs-textline "f.json")]
    (<- [?a_id ?b_id]
    (source ?line)
    (json/parse-string ?line :> ?a_id ?b_id))))
    (defn first_test_query
    "this is a test"
    []
    (?<- (stdout) [?record]
    ((parse_json_file "f.json" ) ?record)
    ))
    (defn second_test_query
    "this is a test2"
    []
    (let [follows (followers)]
    (?<- (stdout) [?a_id ?b_id] (follows ?a_id ?b_id))
    )
    )
    and my f.json file is just:
    {"a_id":"1081","b_id":"12870592"}
    {"a_id":"1081","b_id":"1366401"}
    {"a_id":"1081","b_id":"61233"}
    {"a_id":"1081","b_id":"16259976"}
    {"a_id":"1081","b_id":"1479681"}
    {"a_id":"1081","b_id":"771648"}
    {"a_id":"1081","b_id":"622543"}
    {"a_id":"1081","b_id":"3861"}
    {"a_id":"1081","b_id":"14945059"}
    {"a_id":"1081","b_id":"17588833"}
  • Paul Lam at Mar 16, 2012 at 5:33 pm
    nevermind, I see it now. The problem is with (json/parse-string ..), which
    is returning a map. You need to return a vector.

    On Friday, 16 March 2012 17:16:57 UTC, Paul Lam wrote:

    are you using 1.8.6 or later?


    On Friday, 16 March 2012 15:16:51 UTC, Jason Toy wrote:

    Here is the trace I get:

    12/03/16 18:16:39 INFO util.Util: using default application jar, may
    cause class not found exceptions on the cluster
    12/03/16 18:16:39 INFO flow.MultiMapReducePlanner: using application
    jar: /Users/jtoy/sandbox/cascalog/lib/cascading-core-1.2.4.jar
    12/03/16 18:16:39 INFO flow.Flow: [] starting
    12/03/16 18:16:39 INFO flow.Flow: [] source: Lfs["TextLine[['line']-
    [ALL]]"]["f.json"]"]
    12/03/16 18:16:39 INFO flow.Flow: [] sink:
    StdoutTap["SequenceFile[[UNKNOWN]->[ALL]]"]["/var/folders/kq/
    vx7c22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:39 INFO flow.Flow: [] parallel execution is enabled:
    false
    12/03/16 18:16:39 INFO flow.Flow: [] starting jobs: 2
    12/03/16 18:16:39 INFO flow.Flow: [] allocating threads: 1
    12/03/16 18:16:39 INFO flow.FlowStep: [] starting step: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-b41a-4cac-9473-7/37584/]
    12/03/16 18:16:39 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics
    with processName=JobTracker, sessionId= - already initialized
    12/03/16 18:16:39 INFO mapred.FileInputFormat: Total input paths to
    process : 1
    12/03/16 18:16:40 INFO mapred.FileInputFormat: Total input paths to
    process : 1
    12/03/16 18:16:40 INFO hadoop.MultiInputSplit: current split input
    path: file:/Users/jtoy/sandbox/cascalog/f.json
    12/03/16 18:16:40 INFO mapred.MapTask: numReduceTasks: 1
    12/03/16 18:16:40 INFO mapred.MapTask: io.sort.mb = 100
    12/03/16 18:16:40 INFO mapred.MapTask: data buffer = 79691776/99614720
    12/03/16 18:16:40 INFO mapred.MapTask: record buffer = 262144/327680
    12/03/16 18:16:40 WARN mapred.LocalJobRunner: job_local_0002
    cascading.CascadingException: unable to load serializer for:
    clojure.lang.PersistentArrayMap from:
    org.apache.hadoop.io.serializer.SerializationFactory
    at
    cascading.tuple.hadoop.TupleSerialization.getNewSerializer(TupleSerialization.java:

    309)
    at
    cascading.tuple.hadoop.SerializationElementWriter.write(SerializationElementWriter.java:

    75)
    at
    cascading.tuple.TupleOutputStream.write(TupleOutputStream.java:
    221)
    at
    cascading.tuple.TupleOutputStream.writeTuple(TupleOutputStream.java:
    179)
    at
    cascading.tuple.hadoop.TupleSerializer.serialize(TupleSerializer.java:
    37)
    at
    cascading.tuple.hadoop.TupleSerializer.serialize(TupleSerializer.java:
    28)
    at org.apache.hadoop.mapred.MapTask
    $MapOutputBuffer.collect(MapTask.java:854)
    at org.apache.hadoop.mapred.MapTask
    $OldOutputCollector.collect(MapTask.java:466)
    at cascading.pipe.Group.collectReduceGrouping(Group.java:961)
    at
    cascading.flow.stack.GroupMapperStackElement.operateGroup(GroupMapperStackElement.java:

    82)
    at
    cascading.flow.stack.GroupMapperStackElement.collect(GroupMapperStackElement.java:

    70)
    at
    cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    55)
    at cascading.operation.Identity.operate(Identity.java:99)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:

    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:

    82)
    at cascading.pipe.Each.applyFilter(Each.java:375)
    at cascading.pipe.Each.access$300(Each.java:53)
    at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:

    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:

    82)
    at
    cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at cascalog.ClojureMap.operate(ClojureMap.java:35)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:

    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:

    82)
    at cascading.pipe.Each.applyFilter(Each.java:375)
    at cascading.pipe.Each.access$300(Each.java:53)
    at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:

    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:

    82)
    at
    cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    55)
    at cascading.operation.Identity.operate(Identity.java:99)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:

    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:

    82)
    at
    cascading.flow.stack.FlowMapperStack.map(FlowMapperStack.java:220)
    at cascading.flow.FlowMapper.map(FlowMapper.java:75)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at
    org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
    at org.apache.hadoop.mapred.LocalJobRunner
    $Job.run(LocalJobRunner.java:176)
    12/03/16 18:16:40 WARN flow.FlowStep: [] task completion events
    identify failed tasks
    12/03/16 18:16:40 WARN flow.FlowStep: [] task completion events count:

    12/03/16 18:16:40 WARN flow.FlowStep: [] abandoning step: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"], predecessor failed:
    (1/2) TempHfs["SequenceFile[['?record']]"][6990c1a4-
    b41a-4cac-9473-7/37584/]
    12/03/16 18:16:40 INFO flow.FlowStep: [] stopping: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:40 WARN flow.Flow: stopping jobs
    12/03/16 18:16:40 INFO flow.FlowStep: [] stopping: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:40 INFO flow.FlowStep: [] stopping: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-b41a-4cac-9473-7/37584/]
    12/03/16 18:16:40 WARN flow.Flow: stopped jobs
    12/03/16 18:16:40 WARN flow.Flow: shutting down job executor
    12/03/16 18:16:40 WARN flow.Flow: shutdown complete
    RESULTS
    -----------------------
    -----------------------
    cascading.flow.FlowException: step failed: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-
    b41a-4cac-9473-7/37584/], with job id: job_local_0002, please see
    cluster logs for failure messages (NO_SOURCE_FILE:0)

    On Mar 16, 10:45 pm, Paul Lam wrote:
    what's the error message?






    On Friday, 16 March 2012 10:19:22 UTC, Jason Toy wrote:

    Hi, I have a beginner's problem, I am having problems loading json
    data, could someone explain why both my test queries dont work:
    (ns test.core
    (:use cascalog.api)
    (:require [clojure-csv [core :as csv]])
    (:require [clj-json [core :as json]])
    (:require [cascalog [ops :as c]]))
    (defmacro bootstrap []
    '(do
    (use (quote cascalog.api))
    (require (quote [clojure-csv [core :as csv]]))
    (require (quote [clj-json [core :as json]]))
    (require (quote [cascalog [ops :as c]]))))
    (defn parse_json_file [file]
    (let [ source (lfs-textline file)]
    (<- [?record]
    (source ?line)
    (json/parse-string ?line :> ?record))))
    (defn followers
    []
    (let [ source (lfs-textline "f.json")]
    (<- [?a_id ?b_id]
    (source ?line)
    (json/parse-string ?line :> ?a_id ?b_id))))
    (defn first_test_query
    "this is a test"
    []
    (?<- (stdout) [?record]
    ((parse_json_file "f.json" ) ?record)
    ))
    (defn second_test_query
    "this is a test2"
    []
    (let [follows (followers)]
    (?<- (stdout) [?a_id ?b_id] (follows ?a_id ?b_id))
    )
    )
    and my f.json file is just:
    {"a_id":"1081","b_id":"12870592"}
    {"a_id":"1081","b_id":"1366401"}
    {"a_id":"1081","b_id":"61233"}
    {"a_id":"1081","b_id":"16259976"}
    {"a_id":"1081","b_id":"1479681"}
    {"a_id":"1081","b_id":"771648"}
    {"a_id":"1081","b_id":"622543"}
    {"a_id":"1081","b_id":"3861"}
    {"a_id":"1081","b_id":"14945059"}
    {"a_id":"1081","b_id":"17588833"}
  • Jason Toy at Mar 17, 2012 at 1:55 am
    Ok, I'll try that out, I'm actually learning both clojure and cascalog
    at the same time.
    Is this the recommended way to convert from a map to a vector:
    (into [] '(1 2 3 4))
    I saw that from:
    http://stackoverflow.com/questions/5088803/in-clojure-is-there-an-easy-way-to-convert-between-list-types
    On Mar 17, 1:33 am, Paul Lam wrote:
    nevermind, I see it now. The problem is with (json/parse-string ..), which
    is returning a map. You need to return a vector.






    On Friday, 16 March 2012 17:16:57 UTC, Paul Lam wrote:

    are you using 1.8.6 or later?
    On Friday, 16 March 2012 15:16:51 UTC, Jason Toy wrote:

    Here is the trace I get:
    12/03/16 18:16:39 INFO util.Util: using default application jar, may
    cause class not found exceptions on the cluster
    12/03/16 18:16:39 INFO flow.MultiMapReducePlanner: using application
    jar: /Users/jtoy/sandbox/cascalog/lib/cascading-core-1.2.4.jar
    12/03/16 18:16:39 INFO flow.Flow: [] starting
    12/03/16 18:16:39 INFO flow.Flow: []  source: Lfs["TextLine[['line']-
    [ALL]]"]["f.json"]"]
    12/03/16 18:16:39 INFO flow.Flow: []  sink:
    StdoutTap["SequenceFile[[UNKNOWN]->[ALL]]"]["/var/folders/kq/
    vx7c22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:39 INFO flow.Flow: []  parallel execution is enabled:
    false
    12/03/16 18:16:39 INFO flow.Flow: []  starting jobs: 2
    12/03/16 18:16:39 INFO flow.Flow: []  allocating threads: 1
    12/03/16 18:16:39 INFO flow.FlowStep: [] starting step: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-b41a-4cac-9473-7/37584/]
    12/03/16 18:16:39 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics
    with processName=JobTracker, sessionId= - already initialized
    12/03/16 18:16:39 INFO mapred.FileInputFormat: Total input paths to
    process : 1
    12/03/16 18:16:40 INFO mapred.FileInputFormat: Total input paths to
    process : 1
    12/03/16 18:16:40 INFO hadoop.MultiInputSplit: current split input
    path: file:/Users/jtoy/sandbox/cascalog/f.json
    12/03/16 18:16:40 INFO mapred.MapTask: numReduceTasks: 1
    12/03/16 18:16:40 INFO mapred.MapTask: io.sort.mb = 100
    12/03/16 18:16:40 INFO mapred.MapTask: data buffer = 79691776/99614720
    12/03/16 18:16:40 INFO mapred.MapTask: record buffer = 262144/327680
    12/03/16 18:16:40 WARN mapred.LocalJobRunner: job_local_0002
    cascading.CascadingException: unable to load serializer for:
    clojure.lang.PersistentArrayMap from:
    org.apache.hadoop.io.serializer.SerializationFactory
    at
    cascading.tuple.hadoop.TupleSerialization.getNewSerializer(TupleSerializati on.java:
    309)
    at
    cascading.tuple.hadoop.SerializationElementWriter.write(SerializationElemen tWriter.java:
    75)
    at
    cascading.tuple.TupleOutputStream.write(TupleOutputStream.java:
    221)
    at
    cascading.tuple.TupleOutputStream.writeTuple(TupleOutputStream.java:
    179)
    at
    cascading.tuple.hadoop.TupleSerializer.serialize(TupleSerializer.java:
    37)
    at
    cascading.tuple.hadoop.TupleSerializer.serialize(TupleSerializer.java:
    28)
    at org.apache.hadoop.mapred.MapTask
    $MapOutputBuffer.collect(MapTask.java:854)
    at org.apache.hadoop.mapred.MapTask
    $OldOutputCollector.collect(MapTask.java:466)
    at cascading.pipe.Group.collectReduceGrouping(Group.java:961)
    at
    cascading.flow.stack.GroupMapperStackElement.operateGroup(GroupMapperStackE lement.java:
    82)
    at
    cascading.flow.stack.GroupMapperStackElement.collect(GroupMapperStackElemen t.java:
    70)
    at
    cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    55)
    at cascading.operation.Identity.operate(Identity.java:99)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at cascading.pipe.Each.applyFilter(Each.java:375)
    at cascading.pipe.Each.access$300(Each.java:53)
    at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at
    cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at cascalog.ClojureMap.operate(ClojureMap.java:35)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at cascading.pipe.Each.applyFilter(Each.java:375)
    at cascading.pipe.Each.access$300(Each.java:53)
    at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at
    cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    55)
    at cascading.operation.Identity.operate(Identity.java:99)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at
    cascading.flow.stack.FlowMapperStack.map(FlowMapperStack.java:220)
    at cascading.flow.FlowMapper.map(FlowMapper.java:75)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at
    org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
    at org.apache.hadoop.mapred.LocalJobRunner
    $Job.run(LocalJobRunner.java:176)
    12/03/16 18:16:40 WARN flow.FlowStep: [] task completion events
    identify failed tasks
    12/03/16 18:16:40 WARN flow.FlowStep: [] task completion events count:

    12/03/16 18:16:40 WARN flow.FlowStep: [] abandoning step: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"], predecessor failed:
    (1/2) TempHfs["SequenceFile[['?record']]"][6990c1a4-
    b41a-4cac-9473-7/37584/]
    12/03/16 18:16:40 INFO flow.FlowStep: [] stopping: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:40 WARN flow.Flow: stopping jobs
    12/03/16 18:16:40 INFO flow.FlowStep: [] stopping: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:40 INFO flow.FlowStep: [] stopping: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-b41a-4cac-9473-7/37584/]
    12/03/16 18:16:40 WARN flow.Flow: stopped jobs
    12/03/16 18:16:40 WARN flow.Flow: shutting down job executor
    12/03/16 18:16:40 WARN flow.Flow: shutdown complete
    RESULTS
    -----------------------
    -----------------------
    cascading.flow.FlowException: step failed: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-
    b41a-4cac-9473-7/37584/], with job id: job_local_0002, please see
    cluster logs for failure messages (NO_SOURCE_FILE:0)
    On Mar 16, 10:45 pm, Paul Lam wrote:
    what's the error message?
    On Friday, 16 March 2012 10:19:22 UTC, Jason Toy wrote:

    Hi, I have a beginner's problem, I am having problems loading json
    data, could someone explain why both my test queries dont work:
    (ns test.core
    (:use cascalog.api)
    (:require [clojure-csv [core :as csv]])
    (:require [clj-json [core :as json]])
    (:require [cascalog [ops :as c]]))
    (defmacro bootstrap []
    '(do
    (use (quote cascalog.api))
    (require (quote [clojure-csv [core :as csv]]))
    (require (quote [clj-json [core :as json]]))
    (require (quote [cascalog [ops :as c]]))))
    (defn parse_json_file [file]
    (let [ source (lfs-textline file)]
    (<- [?record]
    (source ?line)
    (json/parse-string ?line :> ?record))))
    (defn followers
    []
    (let [ source (lfs-textline "f.json")]
    (<- [?a_id ?b_id]
    (source ?line)
    (json/parse-string ?line :> ?a_id ?b_id))))
    (defn first_test_query
    "this is a test"
    []
    (?<- (stdout) [?record]
    ((parse_json_file "f.json" ) ?record)
    ))
    ...

    read more »
  • Sam Ritchie at Mar 19, 2012 at 5:12 pm
    Hey Jason,

    You can think of a Clojure map as a sequence of its key-value pairs. For example,

    (seq {:a "key" :b "key2"})
    => ([:a "key"] [:b "key2"])

    I think of "into" as a function that pours its second argument into its first. Pouring a map into a vector would return a vector of key-value pairs:

    (into [] {:a "key" :b "key2"})
    => [[:a "key"] [:b "key2"]]


    If you wanted to convert this map into a vector without the nesting, you'd do:

    (into [] (apply concat {:a "key" :b "key2"}))
    => [:a "key" :b "key2"]


    The problem with doing this is that maps don't guarantee order, so you might get the following:

    (into [] (apply concat {:a "key" :b "key2"}))
    => [:b "key2" :a "key"]


    Cheers,
    Sam


    --
    Sam Ritchie
    Sent with Sparrow (http://www.sparrowmailapp.com/?sig)

    On Friday, March 16, 2012 at 6:55 PM, Jason Toy wrote:

    Ok, I'll try that out, I'm actually learning both clojure and cascalog
    at the same time.
    Is this the recommended way to convert from a map to a vector:
    (into [] '(1 2 3 4))
    I saw that from:
    http://stackoverflow.com/questions/5088803/in-clojure-is-there-an-easy-way-to-convert-between-list-types
    On Mar 17, 1:33 am, Paul Lam (http://forward.co.uk)> wrote:
    nevermind, I see it now. The problem is with (json/parse-string ..), which
    is returning a map. You need to return a vector.






    On Friday, 16 March 2012 17:16:57 UTC, Paul Lam wrote:

    are you using 1.8.6 or later?
    On Friday, 16 March 2012 15:16:51 UTC, Jason Toy wrote:

    Here is the trace I get:
    12/03/16 18:16:39 INFO util.Util: using default application jar, may
    cause class not found exceptions on the cluster
    12/03/16 18:16:39 INFO flow.MultiMapReducePlanner: using application
    jar: /Users/jtoy/sandbox/cascalog/lib/cascading-core-1.2.4.jar
    12/03/16 18:16:39 INFO flow.Flow: [] starting
    12/03/16 18:16:39 INFO flow.Flow: [] source: Lfs["TextLine[['line']-
    [ALL]]"]["f.json"]"]
    12/03/16 18:16:39 INFO flow.Flow: [] sink:
    StdoutTap["SequenceFile[[UNKNOWN]->[ALL]]"]["/var/folders/kq/
    vx7c22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:39 INFO flow.Flow: [] parallel execution is enabled:
    false
    12/03/16 18:16:39 INFO flow.Flow: [] starting jobs: 2
    12/03/16 18:16:39 INFO flow.Flow: [] allocating threads: 1
    12/03/16 18:16:39 INFO flow.FlowStep: [] starting step: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-b41a-4cac-9473-7/37584/]
    12/03/16 18:16:39 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics
    with processName=JobTracker, sessionId= - already initialized
    12/03/16 18:16:39 INFO mapred.FileInputFormat: Total input paths to
    process : 1
    12/03/16 18:16:40 INFO mapred.FileInputFormat: Total input paths to
    process : 1
    12/03/16 18:16:40 INFO hadoop.MultiInputSplit: current split input
    path: file:/Users/jtoy/sandbox/cascalog/f.json
    12/03/16 18:16:40 INFO mapred.MapTask: numReduceTasks: 1
    12/03/16 18:16:40 INFO mapred.MapTask: io.sort.mb = 100
    12/03/16 18:16:40 INFO mapred.MapTask: data buffer = 79691776/99614720
    12/03/16 18:16:40 INFO mapred.MapTask: record buffer = 262144/327680
    12/03/16 18:16:40 WARN mapred.LocalJobRunner: job_local_0002
    cascading.CascadingException: unable to load serializer for:
    clojure.lang.PersistentArrayMap from:
    org.apache.hadoop.io.serializer.SerializationFactory
    at
    cascading.tuple.hadoop.TupleSerialization.getNewSerializer(TupleSerializati on.java:
    309)
    at
    cascading.tuple.hadoop.SerializationElementWriter.write(SerializationElemen tWriter.java:
    75)
    at
    cascading.tuple.TupleOutputStream.write(TupleOutputStream.java:
    221)
    at
    cascading.tuple.TupleOutputStream.writeTuple(TupleOutputStream.java:
    179)
    at
    cascading.tuple.hadoop.TupleSerializer.serialize(TupleSerializer.java:
    37)
    at
    cascading.tuple.hadoop.TupleSerializer.serialize(TupleSerializer.java:
    28)
    at org.apache.hadoop.mapred.MapTask
    $MapOutputBuffer.collect(MapTask.java:854)
    at org.apache.hadoop.mapred.MapTask
    $OldOutputCollector.collect(MapTask.java:466)
    at cascading.pipe.Group.collectReduceGrouping(Group.java:961)
    at
    cascading.flow.stack.GroupMapperStackElement.operateGroup(GroupMapperStackE lement.java:
    82)
    at
    cascading.flow.stack.GroupMapperStackElement.collect(GroupMapperStackElemen t.java:
    70)
    at
    cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    55)
    at cascading.operation.Identity.operate(Identity.java:99)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at cascading.pipe.Each.applyFilter(Each.java:375)
    at cascading.pipe.Each.access$300(Each.java:53)
    at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at
    cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at cascalog.ClojureMap.operate(ClojureMap.java:35)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at cascading.pipe.Each.applyFilter(Each.java:375)
    at cascading.pipe.Each.access$300(Each.java:53)
    at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at
    cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    55)
    at cascading.operation.Identity.operate(Identity.java:99)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at
    cascading.flow.stack.FlowMapperStack.map(FlowMapperStack.java:220)
    at cascading.flow.FlowMapper.map(FlowMapper.java:75)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at
    org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
    at org.apache.hadoop.mapred.LocalJobRunner
    $Job.run(LocalJobRunner.java:176)
    12/03/16 18:16:40 WARN flow.FlowStep: [] task completion events
    identify failed tasks
    12/03/16 18:16:40 WARN flow.FlowStep: [] task completion events count:

    12/03/16 18:16:40 WARN flow.FlowStep: [] abandoning step: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"], predecessor failed:
    (1/2) TempHfs["SequenceFile[['?record']]"][6990c1a4-
    b41a-4cac-9473-7/37584/]
    12/03/16 18:16:40 INFO flow.FlowStep: [] stopping: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:40 WARN flow.Flow: stopping jobs
    12/03/16 18:16:40 INFO flow.FlowStep: [] stopping: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:40 INFO flow.FlowStep: [] stopping: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-b41a-4cac-9473-7/37584/]
    12/03/16 18:16:40 WARN flow.Flow: stopped jobs
    12/03/16 18:16:40 WARN flow.Flow: shutting down job executor
    12/03/16 18:16:40 WARN flow.Flow: shutdown complete
    RESULTS
    -----------------------
    -----------------------
    cascading.flow.FlowException: step failed: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-
    b41a-4cac-9473-7/37584/], with job id: job_local_0002, please see
    cluster logs for failure messages (NO_SOURCE_FILE:0)
    On Mar 16, 10:45 pm, Paul Lam (http://forward.co.uk)> wrote:
    what's the error message?
    On Friday, 16 March 2012 10:19:22 UTC, Jason Toy wrote:

    Hi, I have a beginner's problem, I am having problems loading json
    data, could someone explain why both my test queries dont work:
    (ns test.core
    (:use cascalog.api)
    (:require [clojure-csv [core :as csv]])
    (:require [clj-json [core :as json]])
    (:require [cascalog [ops :as c]]))
    (defmacro bootstrap []
    '(do
    (use (quote cascalog.api))
    (require (quote [clojure-csv [core :as csv]]))
    (require (quote [clj-json [core :as json]]))
    (require (quote [cascalog [ops :as c]]))))
    (defn parse_json_file [file]
    (let [ source (lfs-textline file)]
    (<- [?record]
    (source ?line)
    (json/parse-string ?line :> ?record))))
    (defn followers
    []
    (let [ source (lfs-textline "f.json")]
    (<- [?a_id ?b_id]
    (source ?line)
    (json/parse-string ?line :> ?a_id ?b_id))))
    (defn first_test_query
    "this is a test"
    []
    (?<- (stdout) [?record]
    ((parse_json_file "f.json" ) ?record)
    ))

    ...

    read more »
  • Jason Toy at Mar 20, 2012 at 9:03 am
    Sam, thanks fot the explanation, I'm learning clojure and cascalog at
    the same time.
    On Mar 20, 1:11 am, Sam Ritchie wrote:
    Hey Jason,

    You can think of a Clojure map as a sequence of its key-value pairs. For example,

    (seq {:a "key" :b "key2"})
    => ([:a "key"] [:b "key2"])

    I think of "into" as a function that pours its second argument into its first. Pouring a map into a vector would return a vector of key-value pairs:

    (into [] {:a "key" :b "key2"})
    => [[:a "key"] [:b "key2"]]

    If you wanted to convert this map into a vector without the nesting, you'd do:

    (into [] (apply concat {:a "key" :b "key2"}))
    => [:a "key" :b "key2"]

    The problem with doing this is that maps don't guarantee order, so you might get the following:

    (into [] (apply concat {:a "key" :b "key2"}))
    => [:b "key2" :a "key"]

    Cheers,
    Sam

    --
    Sam Ritchie
    Sent with Sparrow (http://www.sparrowmailapp.com/?sig)






    On Friday, March 16, 2012 at 6:55 PM, Jason Toy wrote:
    Ok, I'll try that out, I'm actually learning both clojure and cascalog
    at the same time.
    Is this the recommended way to convert from a map to a vector:
    (into [] '(1 2 3 4))
    I saw that from:
    http://stackoverflow.com/questions/5088803/in-clojure-is-there-an-eas...
    On Mar 17, 1:33 am, Paul Lam (http://forward.co.uk)> wrote:
    nevermind, I see it now. The problem is with (json/parse-string ..), which
    is returning a map. You need to return a vector.
    On Friday, 16 March 2012 17:16:57 UTC, Paul Lam wrote:

    are you using 1.8.6 or later?
    On Friday, 16 March 2012 15:16:51 UTC, Jason Toy wrote:

    Here is the trace I get:
    12/03/16 18:16:39 INFO util.Util: using default application jar, may
    cause class not found exceptions on the cluster
    12/03/16 18:16:39 INFO flow.MultiMapReducePlanner: using application
    jar: /Users/jtoy/sandbox/cascalog/lib/cascading-core-1.2.4.jar
    12/03/16 18:16:39 INFO flow.Flow: [] starting
    12/03/16 18:16:39 INFO flow.Flow: []  source: Lfs["TextLine[['line']-
    [ALL]]"]["f.json"]"]
    12/03/16 18:16:39 INFO flow.Flow: []  sink:
    StdoutTap["SequenceFile[[UNKNOWN]->[ALL]]"]["/var/folders/kq/
    vx7c22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:39 INFO flow.Flow: []  parallel execution is enabled:
    false
    12/03/16 18:16:39 INFO flow.Flow: []  starting jobs: 2
    12/03/16 18:16:39 INFO flow.Flow: []  allocating threads: 1
    12/03/16 18:16:39 INFO flow.FlowStep: [] starting step: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-b41a-4cac-9473-7/37584/]
    12/03/16 18:16:39 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics
    with processName=JobTracker, sessionId= - already initialized
    12/03/16 18:16:39 INFO mapred.FileInputFormat: Total input paths to
    process : 1
    12/03/16 18:16:40 INFO mapred.FileInputFormat: Total input paths to
    process : 1
    12/03/16 18:16:40 INFO hadoop.MultiInputSplit: current split input
    path: file:/Users/jtoy/sandbox/cascalog/f.json
    12/03/16 18:16:40 INFO mapred.MapTask: numReduceTasks: 1
    12/03/16 18:16:40 INFO mapred.MapTask: io.sort.mb = 100
    12/03/16 18:16:40 INFO mapred.MapTask: data buffer = 79691776/99614720
    12/03/16 18:16:40 INFO mapred.MapTask: record buffer = 262144/327680
    12/03/16 18:16:40 WARN mapred.LocalJobRunner: job_local_0002
    cascading.CascadingException: unable to load serializer for:
    clojure.lang.PersistentArrayMap from:
    org.apache.hadoop.io.serializer.SerializationFactory
    at
    cascading.tuple.hadoop.TupleSerialization.getNewSerializer(TupleSerializati on.java:
    309)
    at
    cascading.tuple.hadoop.SerializationElementWriter.write(SerializationElemen tWriter.java:
    75)
    at
    cascading.tuple.TupleOutputStream.write(TupleOutputStream.java:
    221)
    at
    cascading.tuple.TupleOutputStream.writeTuple(TupleOutputStream.java:
    179)
    at
    cascading.tuple.hadoop.TupleSerializer.serialize(TupleSerializer.java:
    37)
    at
    cascading.tuple.hadoop.TupleSerializer.serialize(TupleSerializer.java:
    28)
    at org.apache.hadoop.mapred.MapTask
    $MapOutputBuffer.collect(MapTask.java:854)
    at org.apache.hadoop.mapred.MapTask
    $OldOutputCollector.collect(MapTask.java:466)
    at cascading.pipe.Group.collectReduceGrouping(Group.java:961)
    at
    cascading.flow.stack.GroupMapperStackElement.operateGroup(GroupMapperStackE lement.java:
    82)
    at
    cascading.flow.stack.GroupMapperStackElement.collect(GroupMapperStackElemen t.java:
    70)
    at
    cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    55)
    at cascading.operation.Identity.operate(Identity.java:99)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at cascading.pipe.Each.applyFilter(Each.java:375)
    at cascading.pipe.Each.access$300(Each.java:53)
    at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at
    cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at cascalog.ClojureMap.operate(ClojureMap.java:35)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at cascading.pipe.Each.applyFilter(Each.java:375)
    at cascading.pipe.Each.access$300(Each.java:53)
    at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at
    cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    55)
    at cascading.operation.Identity.operate(Identity.java:99)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at
    cascading.flow.stack.FlowMapperStack.map(FlowMapperStack.java:220)
    at cascading.flow.FlowMapper.map(FlowMapper.java:75)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at
    org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
    at org.apache.hadoop.mapred.LocalJobRunner
    $Job.run(LocalJobRunner.java:176)
    12/03/16 18:16:40 WARN flow.FlowStep: [] task completion events
    identify failed tasks
    12/03/16 18:16:40 WARN flow.FlowStep: [] task completion events count:

    12/03/16 18:16:40 WARN flow.FlowStep: [] abandoning step: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"], predecessor failed:
    (1/2) TempHfs["SequenceFile[['?record']]"][6990c1a4-
    b41a-4cac-9473-7/37584/]
    12/03/16 18:16:40 INFO flow.FlowStep:
    ...

    read more »
  • Jason Toy at Mar 17, 2012 at 4:12 am
    That fixed it,thanks. I had one more question. I modified my source
    as:
    (defn follow_parser
    [line]
    (map (json/parse-string line) ["a_id" "b_id"]))

    (defn followers
    []
    (let [ source (lfs-textline "f.json")]
    (<- [?a_id ?b_id]
    (source ?line)
    (follow_parser ?line :> ?a_id ?b_id)
    )))

    (defn second_test_query
    "this is a test2"
    []
    (let [follows (followers)]
    (?<- (stdout) [?a_id ?b_id] (follows ?a_id ?b_id)
    )
    )
    )



    How can I access followers directly in my query like in the cascalog
    examples (instead of using let) so I can write something like:


    (?<- (stdout) [?a_id ?b_id] (followers ?a_id ?b_id))


    That gives me an error of: java.lang.IllegalArgumentException: Unable
    to join predicates together (NO_SOURCE_FILE:0)



    On Mar 17, 1:33 am, Paul Lam wrote:
    nevermind, I see it now. The problem is with (json/parse-string ..), which
    is returning a map. You need to return a vector.






    On Friday, 16 March 2012 17:16:57 UTC, Paul Lam wrote:

    are you using 1.8.6 or later?
    On Friday, 16 March 2012 15:16:51 UTC, Jason Toy wrote:

    Here is the trace I get:
    12/03/16 18:16:39 INFO util.Util: using default application jar, may
    cause class not found exceptions on the cluster
    12/03/16 18:16:39 INFO flow.MultiMapReducePlanner: using application
    jar: /Users/jtoy/sandbox/cascalog/lib/cascading-core-1.2.4.jar
    12/03/16 18:16:39 INFO flow.Flow: [] starting
    12/03/16 18:16:39 INFO flow.Flow: []  source: Lfs["TextLine[['line']-
    [ALL]]"]["f.json"]"]
    12/03/16 18:16:39 INFO flow.Flow: []  sink:
    StdoutTap["SequenceFile[[UNKNOWN]->[ALL]]"]["/var/folders/kq/
    vx7c22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:39 INFO flow.Flow: []  parallel execution is enabled:
    false
    12/03/16 18:16:39 INFO flow.Flow: []  starting jobs: 2
    12/03/16 18:16:39 INFO flow.Flow: []  allocating threads: 1
    12/03/16 18:16:39 INFO flow.FlowStep: [] starting step: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-b41a-4cac-9473-7/37584/]
    12/03/16 18:16:39 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics
    with processName=JobTracker, sessionId= - already initialized
    12/03/16 18:16:39 INFO mapred.FileInputFormat: Total input paths to
    process : 1
    12/03/16 18:16:40 INFO mapred.FileInputFormat: Total input paths to
    process : 1
    12/03/16 18:16:40 INFO hadoop.MultiInputSplit: current split input
    path: file:/Users/jtoy/sandbox/cascalog/f.json
    12/03/16 18:16:40 INFO mapred.MapTask: numReduceTasks: 1
    12/03/16 18:16:40 INFO mapred.MapTask: io.sort.mb = 100
    12/03/16 18:16:40 INFO mapred.MapTask: data buffer = 79691776/99614720
    12/03/16 18:16:40 INFO mapred.MapTask: record buffer = 262144/327680
    12/03/16 18:16:40 WARN mapred.LocalJobRunner: job_local_0002
    cascading.CascadingException: unable to load serializer for:
    clojure.lang.PersistentArrayMap from:
    org.apache.hadoop.io.serializer.SerializationFactory
    at
    cascading.tuple.hadoop.TupleSerialization.getNewSerializer(TupleSerializati on.java:
    309)
    at
    cascading.tuple.hadoop.SerializationElementWriter.write(SerializationElemen tWriter.java:
    75)
    at
    cascading.tuple.TupleOutputStream.write(TupleOutputStream.java:
    221)
    at
    cascading.tuple.TupleOutputStream.writeTuple(TupleOutputStream.java:
    179)
    at
    cascading.tuple.hadoop.TupleSerializer.serialize(TupleSerializer.java:
    37)
    at
    cascading.tuple.hadoop.TupleSerializer.serialize(TupleSerializer.java:
    28)
    at org.apache.hadoop.mapred.MapTask
    $MapOutputBuffer.collect(MapTask.java:854)
    at org.apache.hadoop.mapred.MapTask
    $OldOutputCollector.collect(MapTask.java:466)
    at cascading.pipe.Group.collectReduceGrouping(Group.java:961)
    at
    cascading.flow.stack.GroupMapperStackElement.operateGroup(GroupMapperStackE lement.java:
    82)
    at
    cascading.flow.stack.GroupMapperStackElement.collect(GroupMapperStackElemen t.java:
    70)
    at
    cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    55)
    at cascading.operation.Identity.operate(Identity.java:99)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at cascading.pipe.Each.applyFilter(Each.java:375)
    at cascading.pipe.Each.access$300(Each.java:53)
    at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at
    cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at cascalog.ClojureMap.operate(ClojureMap.java:35)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at cascading.pipe.Each.applyFilter(Each.java:375)
    at cascading.pipe.Each.access$300(Each.java:53)
    at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at
    cascading.pipe.Each$EachFunctionHandler$1.collect(Each.java:532)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    71)
    at
    cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:
    55)
    at cascading.operation.Identity.operate(Identity.java:99)
    at cascading.pipe.Each.applyFunction(Each.java:380)
    at cascading.pipe.Each.access$200(Each.java:53)
    at cascading.pipe.Each$EachFunctionHandler.handle(Each.java:543)
    at cascading.pipe.Each$EachHandler.operate(Each.java:478)
    at
    cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElem ent.java:
    94)
    at
    cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement. java:
    82)
    at
    cascading.flow.stack.FlowMapperStack.map(FlowMapperStack.java:220)
    at cascading.flow.FlowMapper.map(FlowMapper.java:75)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at
    org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
    at org.apache.hadoop.mapred.LocalJobRunner
    $Job.run(LocalJobRunner.java:176)
    12/03/16 18:16:40 WARN flow.FlowStep: [] task completion events
    identify failed tasks
    12/03/16 18:16:40 WARN flow.FlowStep: [] task completion events count:

    12/03/16 18:16:40 WARN flow.FlowStep: [] abandoning step: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"], predecessor failed:
    (1/2) TempHfs["SequenceFile[['?record']]"][6990c1a4-
    b41a-4cac-9473-7/37584/]
    12/03/16 18:16:40 INFO flow.FlowStep: [] stopping: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:40 WARN flow.Flow: stopping jobs
    12/03/16 18:16:40 INFO flow.FlowStep: [] stopping: (2/2) ...
    22fx0lj6fw_cq95q9_jr0000gn/T/
    temp68506658704566355991331892999841429000"]"]
    12/03/16 18:16:40 INFO flow.FlowStep: [] stopping: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-b41a-4cac-9473-7/37584/]
    12/03/16 18:16:40 WARN flow.Flow: stopped jobs
    12/03/16 18:16:40 WARN flow.Flow: shutting down job executor
    12/03/16 18:16:40 WARN flow.Flow: shutdown complete
    RESULTS
    -----------------------
    -----------------------
    cascading.flow.FlowException: step failed: (1/2)
    TempHfs["SequenceFile[['?record']]"][6990c1a4-
    b41a-4cac-9473-7/37584/], with job id: job_local_0002, please see
    cluster logs for failure messages (NO_SOURCE_FILE:0)
    On Mar 16, 10:45 pm, Paul Lam wrote:
    what's the error message?
    On Friday, 16 March 2012 10:19:22 UTC, Jason Toy wrote:

    Hi, I have a beginner's problem, I am having problems loading json
    data, could someone explain why both my test queries dont work:
    (ns test.core
    (:use cascalog.api)
    (:require [clojure-csv [core :as csv]])
    (:require [clj-json [core :as json]])
    (:require [cascalog [ops :as c]]))
    (defmacro bootstrap []
    '(do
    (use (quote cascalog.api))
    (require (quote [clojure-csv [core :as csv]]))
    (require (quote [clj-json [core :as json]]))
    (require (quote [cascalog [ops :as c]]))))
    (defn parse_json_file [file]
    (let [ source (lfs-textline file)]
    (<- [?record]
    (source ?line)
    (json/parse-string ?line :> ?record))))
    (defn followers
    []
    (let [ source (lfs-textline "f.json")]
    (<- [?a_id ?b_id]
    (source ?line)
    (json/parse-string ?line :> ?a_id ?b_id))))
    (defn first_test_query
    "this is a test"
    []
    (?<- (stdout) [?record]
    ((parse_json_file "f.json" ) ?record)
    ))
    ...

    read more »
  • Paul Lam at Mar 17, 2012 at 2:31 pm
    try (def followers ....) instead of defn

    On Friday, March 16, 2012 10:19:22 AM UTC, Jason Toy wrote:

    Hi, I have a beginner's problem, I am having problems loading json
    data, could someone explain why both my test queries dont work:

    (ns test.core
    (:use cascalog.api)
    (:require [clojure-csv [core :as csv]])
    (:require [clj-json [core :as json]])
    (:require [cascalog [ops :as c]]))

    (defmacro bootstrap []
    '(do
    (use (quote cascalog.api))
    (require (quote [clojure-csv [core :as csv]]))
    (require (quote [clj-json [core :as json]]))
    (require (quote [cascalog [ops :as c]]))))


    (defn parse_json_file [file]
    (let [ source (lfs-textline file)]
    (<- [?record]
    (source ?line)
    (json/parse-string ?line :> ?record))))


    (defn followers
    []
    (let [ source (lfs-textline "f.json")]
    (<- [?a_id ?b_id]
    (source ?line)
    (json/parse-string ?line :> ?a_id ?b_id))))

    (defn first_test_query
    "this is a test"
    []
    (?<- (stdout) [?record]
    ((parse_json_file "f.json" ) ?record)
    ))

    (defn second_test_query
    "this is a test2"
    []
    (let [follows (followers)]
    (?<- (stdout) [?a_id ?b_id] (follows ?a_id ?b_id))
    )
    )

    and my f.json file is just:
    {"a_id":"1081","b_id":"12870592"}
    {"a_id":"1081","b_id":"1366401"}
    {"a_id":"1081","b_id":"61233"}
    {"a_id":"1081","b_id":"16259976"}
    {"a_id":"1081","b_id":"1479681"}
    {"a_id":"1081","b_id":"771648"}
    {"a_id":"1081","b_id":"622543"}
    {"a_id":"1081","b_id":"3861"}
    {"a_id":"1081","b_id":"14945059"}
    {"a_id":"1081","b_id":"17588833"}

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcascalog-user @
categoriesclojure, hadoop
postedMar 16, '12 at 10:19a
activeMar 20, '12 at 9:03a
posts10
users3
websiteclojure.org
irc#clojure

People

Translate

site design / logo © 2021 Grokbase