Hi,

I wanted to read/write avro from/to Hdfs using cascalog.
And I have coded a simple Lfs avro tap using cascading-avro 2.1.0<https://github.com/bixolabs/cascading.avro>
Lfs avro tap looks like this:

(defn lfs-avro
( [in-out-path scheme-path fields]
(let [url (get-url scheme-path)
schema (-> (Schema$Parser.)
(.parse (.openStream url)))
avro-scheme (AvroScheme. schema)]
(.setSourceFields avro-scheme (Fields. (into-array fields)))
(Lfs. avro-scheme in-out-path)))
( [in-out-path scheme-path sink? fields & opts]
(let [url (get-url scheme-path)
schema (-> (Schema$Parser.)
(.parse (.openStream url)))
avro-scheme (AvroScheme. schema)]
(when (= sink? true)
(.setSinkFields avro-scheme (Fields. (into-array fields))))
(apply tap/lfs-tap avro-scheme in-out-path opts))))

To write clojure struct tuples into Lfs via Lfs-avro tap:

(defmapop struct2vec
"convert struct to vector"
[s]
[(:user-id s) (:isbn s) (:user-review-id s)])

(deftest write-avro
(?<- (lfs-avro "/tmp/avro-result" "/avro/user-reading-book.avsc"
true ["?user-id" "?isbn" "?user-review-id"])
[?user-id ?isbn ?user-review-id]
(user-reading-book-struct-list ?user-reading-book)
(struct2vec :< ?user-reading-book :> ?user-id ?isbn ?user-review-id)
)
)

With setting SinkFields parameter, writing avro to Lfs works fine.

But the problem ocurred when reading avro from Lfs:

(deftest avro-read-test
(let [avro-src (lfs-avro "/tmp/avro-result"
"/avro/user-reading-book.avsc" ["?user-id" "?isbn" "?user-review-id"])
avro-result (<- [?user-id ?isbn]
(avro-src :> ?user-id ?isbn ?user-review-id)
)]
(?<- (stdout)
[?user-id ?count]
(avro-result ?user-id ?isbn)
(c/!count ?isbn :> ?count) ; HERE, Aggregation does not work!!!
)
))


I have noticed that reading avro from Lsf works ok(here subquery
'avro-result'),
but when Aggregation function is called, that is (c/!count ..), error
messages are thrown like this:

java.lang.NullPointerException
at java.io.StringReader.(Schema.java:971)
at org.apache.avro.Schema.parse(Schema.java:1020)
at org.apache.avro.mapred.AvroJob.getMapOutputSchema(AvroJob.java:78)
at
org.apache.avro.mapred.AvroKeyComparator.setConf(AvroKeyComparator.java:39)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:62)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)

Is there anybody who has met such case using cascading-avro like this?

- Kidong Lee.

--
You received this message because you are subscribed to the Google Groups "cascalog-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascalog-user+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Search Discussions

  • Kidong Lee at Feb 21, 2013 at 2:23 am
    I asked the same question to cascading-avro mailing list.
    The answer for that is to use bug fix version 2.1 from branch:
    https://github.com/bixolabs/cascading.avro/tree/2.1-bugfix

    With this cascading avro 2.1 bugfix version, it works fine!

    Sorry for noise.

    - Kidong.



    2013/2/20 Kidong Lee <mykidong@gmail.com>
    Hi,

    I wanted to read/write avro from/to Hdfs using cascalog.
    And I have coded a simple Lfs avro tap using cascading-avro 2.1.0<https://github.com/bixolabs/cascading.avro>
    Lfs avro tap looks like this:

    (defn lfs-avro
    ( [in-out-path scheme-path fields]
    (let [url (get-url scheme-path)
    schema (-> (Schema$Parser.)
    (.parse (.openStream url)))
    avro-scheme (AvroScheme. schema)]
    (.setSourceFields avro-scheme (Fields. (into-array fields)))
    (Lfs. avro-scheme in-out-path)))
    ( [in-out-path scheme-path sink? fields & opts]
    (let [url (get-url scheme-path)
    schema (-> (Schema$Parser.)
    (.parse (.openStream url)))
    avro-scheme (AvroScheme. schema)]
    (when (= sink? true)
    (.setSinkFields avro-scheme (Fields. (into-array fields))))
    (apply tap/lfs-tap avro-scheme in-out-path opts))))

    To write clojure struct tuples into Lfs via Lfs-avro tap:

    (defmapop struct2vec
    "convert struct to vector"
    [s]
    [(:user-id s) (:isbn s) (:user-review-id s)])

    (deftest write-avro
    (?<- (lfs-avro "/tmp/avro-result" "/avro/user-reading-book.avsc"
    true ["?user-id" "?isbn" "?user-review-id"])
    [?user-id ?isbn ?user-review-id]
    (user-reading-book-struct-list ?user-reading-book)
    (struct2vec :< ?user-reading-book :> ?user-id ?isbn ?user-review-id)
    )
    )

    With setting SinkFields parameter, writing avro to Lfs works fine.

    But the problem ocurred when reading avro from Lfs:

    (deftest avro-read-test
    (let [avro-src (lfs-avro "/tmp/avro-result"
    "/avro/user-reading-book.avsc" ["?user-id" "?isbn" "?user-review-id"])
    avro-result (<- [?user-id ?isbn]
    (avro-src :> ?user-id ?isbn ?user-review-id)
    )]
    (?<- (stdout)
    [?user-id ?count]
    (avro-result ?user-id ?isbn)
    (c/!count ?isbn :> ?count) ; HERE, Aggregation does not work!!!
    )
    ))


    I have noticed that reading avro from Lsf works ok(here subquery
    'avro-result'),
    but when Aggregation function is called, that is (c/!count ..), error
    messages are thrown like this:

    java.lang.NullPointerException
    at java.io.StringReader.<init>(StringReader.java:50)
    at org.apache.avro.Schema$Parser.parse(Schema.java:971)
    at org.apache.avro.Schema.parse(Schema.java:1020)
    at org.apache.avro.mapred.AvroJob.getMapOutputSchema(AvroJob.java:78)
    at
    org.apache.avro.mapred.AvroKeyComparator.setConf(AvroKeyComparator.java:39)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:62)
    at
    org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)

    Is there anybody who has met such case using cascading-avro like this?

    - Kidong Lee.

    --
    You received this message because you are subscribed to the Google Groups
    "cascalog-user" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to cascalog-user+unsubscribe@googlegroups.com.
    For more options, visit https://groups.google.com/groups/opt_out.

    --
    You received this message because you are subscribed to the Google Groups "cascalog-user" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to cascalog-user+unsubscribe@googlegroups.com.
    For more options, visit https://groups.google.com/groups/opt_out.

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcascalog-user @
categoriesclojure, hadoop
postedFeb 20, '13 at 8:29a
activeFeb 21, '13 at 2:23a
posts2
users1
websiteclojure.org
irc#clojure

1 user in discussion

Kidong Lee: 2 posts

People

Translate

site design / logo © 2021 Grokbase