FAQ
Hello, guys,

I'm seeing some very weird map output data blow up which I find very hard
to explain. The intermediate map output bytes is x40 map input bytes for an
extremely simple query:

(?<- output [?pair] (input _ ?pair))

Both output and input files are hfs-seqfile.

The input has two fields: java.lang.Long and clojure.lang.PersistentVector
of java.lang.Long (always of size 2), for example:
1 [ 2, 3 ]

The query simply seeks to throw out the first field and dedupe the second.

Compression is turned on using Lzo codec. Map output compression is turned
on as well.
Serializers specified: cascading.tuple. hadoop.BytesSerialization,
cascading.tuple.hadoop.TupleSerialization,
org.apache.hadoop.io.serializer.WritableSerialization,
org.apache.hadoop.io.serializer.JavaSerialization"

Libraries used:
ls -1 lib|grep casca
cascading-core-1.2.4.jar
cascading.kryo-0.1.5.jar
cascalog-1.8.5-20120102.070300-13.jar

Please take a look at the attached screenshot:
- 175m input records at ~2Gb which is ~10 bytes per record. Sounds
reasonable.
- map output records is the same (correct), but look at map output bytes:
90Gb (!!!!!!!!!!!!!!!!!!)

How can it be? Map output should be just ?pair, map input is a long +
?pair. I am really baffled. Even if you convert records to their decimal
ascii representation, it wouldn't be x40. Could it be Cascading issue?

Thank you so much for your help!

Artem.

Search Discussions

  • Artem Boytsov at Jan 7, 2012 at 8:20 am
    I realized that map output bytes are probably before compression though x40
    compression ratio for this data still doesn't seem plausible.

    I dumped the data in a readable format (i.e. "-2147160804 [-1860497744
    681993524]") and even this file is only x3 times larger than the original,
    binary, compressed version.

    Artem.
    On Sat, Jan 7, 2012 at 12:07 AM, Artem Boytsov wrote:

    Hello, guys,

    I'm seeing some very weird map output data blow up which I find very hard
    to explain. The intermediate map output bytes is x40 map input bytes for an
    extremely simple query:

    (?<- output [?pair] (input _ ?pair))

    Both output and input files are hfs-seqfile.

    The input has two fields: java.lang.Long and clojure.lang.PersistentVector
    of java.lang.Long (always of size 2), for example:
    1 [ 2, 3 ]

    The query simply seeks to throw out the first field and dedupe the second.

    Compression is turned on using Lzo codec. Map output compression is turned
    on as well.
    Serializers specified: cascading.tuple. hadoop.BytesSerialization,
    cascading.tuple.hadoop.TupleSerialization,
    org.apache.hadoop.io.serializer.WritableSerialization,
    org.apache.hadoop.io.serializer.JavaSerialization"

    Libraries used:
    ls -1 lib|grep casca
    cascading-core-1.2.4.jar
    cascading.kryo-0.1.5.jar
    cascalog-1.8.5-20120102.070300-13.jar

    Please take a look at the attached screenshot:
    - 175m input records at ~2Gb which is ~10 bytes per record. Sounds
    reasonable.
    - map output records is the same (correct), but look at map output
    bytes: 90Gb (!!!!!!!!!!!!!!!!!!)

    How can it be? Map output should be just ?pair, map input is a long +
    ?pair. I am really baffled. Even if you convert records to their decimal
    ascii representation, it wouldn't be x40. Could it be Cascading issue?

    Thank you so much for your help!

    Artem.
  • Artem Boytsov at Jan 7, 2012 at 10:30 am
    Amazingly, if I replace vector of longs with its textual representation
    simply by (str/join " " vec), the difference between "map input bytes" and
    "map output bytes" becomes only x3 (instead of x40!!!!), and the whole
    mapreduce is finished about 10 times faster.

    Looking at my screenshot, when using a vector 2Gb of map input becomes 90Gb
    of map output. When I substitute this vector by a string, 15Gb of map input
    becomes 48Gb of map output.

    I am at a loss of words.

    Any ideas?

    Artem.
    On Sat, Jan 7, 2012 at 12:19 AM, Artem Boytsov wrote:

    I realized that map output bytes are probably before compression though
    x40 compression ratio for this data still doesn't seem plausible.

    I dumped the data in a readable format (i.e. "-2147160804 [-1860497744
    681993524]") and even this file is only x3 times larger than the original,
    binary, compressed version.

    Artem.
    On Sat, Jan 7, 2012 at 12:07 AM, Artem Boytsov wrote:

    Hello, guys,

    I'm seeing some very weird map output data blow up which I find very hard
    to explain. The intermediate map output bytes is x40 map input bytes for an
    extremely simple query:

    (?<- output [?pair] (input _ ?pair))

    Both output and input files are hfs-seqfile.

    The input has two fields: java.lang.Long and
    clojure.lang.PersistentVector of java.lang.Long (always of size 2), for
    example:
    1 [ 2, 3 ]

    The query simply seeks to throw out the first field and dedupe the
    second.

    Compression is turned on using Lzo codec. Map output compression is
    turned on as well.
    Serializers specified: cascading.tuple. hadoop.BytesSerialization,
    cascading.tuple.hadoop.TupleSerialization,
    org.apache.hadoop.io.serializer.WritableSerialization,
    org.apache.hadoop.io.serializer.JavaSerialization"

    Libraries used:
    ls -1 lib|grep casca
    cascading-core-1.2.4.jar
    cascading.kryo-0.1.5.jar
    cascalog-1.8.5-20120102.070300-13.jar

    Please take a look at the attached screenshot:
    - 175m input records at ~2Gb which is ~10 bytes per record. Sounds
    reasonable.
    - map output records is the same (correct), but look at map output
    bytes: 90Gb (!!!!!!!!!!!!!!!!!!)

    How can it be? Map output should be just ?pair, map input is a long +
    ?pair. I am really baffled. Even if you convert records to their decimal
    ascii representation, it wouldn't be x40. Could it be Cascading issue?

    Thank you so much for your help!

    Artem.
  • Sam Ritchie at Jan 7, 2012 at 3:11 pm
    Ah, that's it -- can you remove JavaSerialization from your job
    configuration? I recently added support for Kryo, which is far more
    efficient than JavaSerialization and effectively replaces it.

    It looks like the explicit value of "io.serializations" in your cluster
    configuration is overriding the value I set up from within Cascalog. Try
    changing your list of serializations to this:

    cascading.tuple. hadoop.BytesSerialization,
    cascading.tuple.hadoop.TupleSerialization,
    org.apache.hadoop.io.serializer.WritableSerialization,
    cascalog.hadoop.ClojureKryoSerialization

    You'll see big improvements, in this case and in any other case where
    JavaSerialization was grabbing tuples.
    On Sat, Jan 7, 2012 at 2:30 AM, Artem Boytsov wrote:

    Amazingly, if I replace vector of longs with its textual representation
    simply by (str/join " " vec), the difference between "map input bytes" and
    "map output bytes" becomes only x3 (instead of x40!!!!), and the whole
    mapreduce is finished about 10 times faster.

    Looking at my screenshot, when using a vector 2Gb of map input becomes
    90Gb of map output. When I substitute this vector by a string, 15Gb of map
    input becomes 48Gb of map output.

    I am at a loss of words.

    Any ideas?

    Artem.

    On Sat, Jan 7, 2012 at 12:19 AM, Artem Boytsov wrote:

    I realized that map output bytes are probably before compression though
    x40 compression ratio for this data still doesn't seem plausible.

    I dumped the data in a readable format (i.e. "-2147160804 [-1860497744
    681993524]") and even this file is only x3 times larger than the original,
    binary, compressed version.

    Artem.
    On Sat, Jan 7, 2012 at 12:07 AM, Artem Boytsov wrote:

    Hello, guys,

    I'm seeing some very weird map output data blow up which I find very
    hard to explain. The intermediate map output bytes is x40 map input bytes
    for an extremely simple query:

    (?<- output [?pair] (input _ ?pair))

    Both output and input files are hfs-seqfile.

    The input has two fields: java.lang.Long and
    clojure.lang.PersistentVector of java.lang.Long (always of size 2), for
    example:
    1 [ 2, 3 ]

    The query simply seeks to throw out the first field and dedupe the
    second.

    Compression is turned on using Lzo codec. Map output compression is
    turned on as well.
    Serializers specified: cascading.tuple. hadoop.BytesSerialization,
    cascading.tuple.hadoop.TupleSerialization,
    org.apache.hadoop.io.serializer.WritableSerialization,
    org.apache.hadoop.io.serializer.JavaSerialization"

    Libraries used:
    ls -1 lib|grep casca
    cascading-core-1.2.4.jar
    cascading.kryo-0.1.5.jar
    cascalog-1.8.5-20120102.070300-13.jar

    Please take a look at the attached screenshot:
    - 175m input records at ~2Gb which is ~10 bytes per record. Sounds
    reasonable.
    - map output records is the same (correct), but look at map output
    bytes: 90Gb (!!!!!!!!!!!!!!!!!!)

    How can it be? Map output should be just ?pair, map input is a long +
    ?pair. I am really baffled. Even if you convert records to their decimal
    ascii representation, it wouldn't be x40. Could it be Cascading issue?

    Thank you so much for your help!

    Artem.

    --
    Sam Ritchie, Twitter Inc
    703.662.1337
    @sritchie09

    (Too brief? Here's why! http://emailcharter.org)
  • Sam Ritchie at Jan 10, 2012 at 10:18 pm
    Artem, did this end up fixing your issue? I wonder if, on your particular
    cluster, the io.serializations is set to "final". This would prevent the
    job-specific settings that Cascalog creates from catching, and force all
    Clojure data structures through JavaSerialization.
    On Sat, Jan 7, 2012 at 10:11 AM, Sam Ritchie wrote:

    Ah, that's it -- can you remove JavaSerialization from your job
    configuration? I recently added support for Kryo, which is far more
    efficient than JavaSerialization and effectively replaces it.

    It looks like the explicit value of "io.serializations" in your cluster
    configuration is overriding the value I set up from within Cascalog. Try
    changing your list of serializations to this:

    cascading.tuple. hadoop.BytesSerialization, cascading.tuple.hadoop.TupleSerialization,
    org.apache.hadoop.io.serializer.WritableSerialization,
    cascalog.hadoop.ClojureKryoSerialization

    You'll see big improvements, in this case and in any other case where
    JavaSerialization was grabbing tuples.
    On Sat, Jan 7, 2012 at 2:30 AM, Artem Boytsov wrote:

    Amazingly, if I replace vector of longs with its textual representation
    simply by (str/join " " vec), the difference between "map input bytes" and
    "map output bytes" becomes only x3 (instead of x40!!!!), and the whole
    mapreduce is finished about 10 times faster.

    Looking at my screenshot, when using a vector 2Gb of map input becomes
    90Gb of map output. When I substitute this vector by a string, 15Gb of map
    input becomes 48Gb of map output.

    I am at a loss of words.

    Any ideas?

    Artem.

    On Sat, Jan 7, 2012 at 12:19 AM, Artem Boytsov wrote:

    I realized that map output bytes are probably before compression though
    x40 compression ratio for this data still doesn't seem plausible.

    I dumped the data in a readable format (i.e. "-2147160804 [-1860497744
    681993524]") and even this file is only x3 times larger than the original,
    binary, compressed version.

    Artem.
    On Sat, Jan 7, 2012 at 12:07 AM, Artem Boytsov wrote:

    Hello, guys,

    I'm seeing some very weird map output data blow up which I find very
    hard to explain. The intermediate map output bytes is x40 map input bytes
    for an extremely simple query:

    (?<- output [?pair] (input _ ?pair))

    Both output and input files are hfs-seqfile.

    The input has two fields: java.lang.Long and
    clojure.lang.PersistentVector of java.lang.Long (always of size 2), for
    example:
    1 [ 2, 3 ]

    The query simply seeks to throw out the first field and dedupe the
    second.

    Compression is turned on using Lzo codec. Map output compression is
    turned on as well.
    Serializers specified: cascading.tuple. hadoop.BytesSerialization,
    cascading.tuple.hadoop.TupleSerialization,
    org.apache.hadoop.io.serializer.WritableSerialization,
    org.apache.hadoop.io.serializer.JavaSerialization"

    Libraries used:
    ls -1 lib|grep casca
    cascading-core-1.2.4.jar
    cascading.kryo-0.1.5.jar
    cascalog-1.8.5-20120102.070300-13.jar

    Please take a look at the attached screenshot:
    - 175m input records at ~2Gb which is ~10 bytes per record. Sounds
    reasonable.
    - map output records is the same (correct), but look at map output
    bytes: 90Gb (!!!!!!!!!!!!!!!!!!)

    How can it be? Map output should be just ?pair, map input is a long +
    ?pair. I am really baffled. Even if you convert records to their decimal
    ascii representation, it wouldn't be x40. Could it be Cascading issue?

    Thank you so much for your help!

    Artem.

    --
    Sam Ritchie, Twitter Inc
    703.662.1337
    @sritchie09

    (Too brief? Here's why! http://emailcharter.org)

    --
    Sam Ritchie, Twitter Inc
    703.662.1337
    @sritchie09

    (Too brief? Here's why! http://emailcharter.org)
  • Artem Boytsov at Jan 11, 2012 at 4:32 am
    Hello, Sam,

    I apologize for the delayed response. I can't thank you enough for the
    suggestion - I tried it on Sunday and it worked. I tried it as soon as you
    replied so your quick response was quickly appreciated. But since I'm in a
    big rush, it took some time for me to respond.

    Now the intermediate data is just x7 times the input (as opposed to x40).
    It still seems a little bit excessive, and I'm not sure where it's coming
    from. But it's certainly much better.

    See the attached screenshot.

    Regards,
    Artem.
    On Tue, Jan 10, 2012 at 2:18 PM, Sam Ritchie wrote:

    Artem, did this end up fixing your issue? I wonder if, on your particular
    cluster, the io.serializations is set to "final". This would prevent the
    job-specific settings that Cascalog creates from catching, and force all
    Clojure data structures through JavaSerialization.

    On Sat, Jan 7, 2012 at 10:11 AM, Sam Ritchie wrote:

    Ah, that's it -- can you remove JavaSerialization from your job
    configuration? I recently added support for Kryo, which is far more
    efficient than JavaSerialization and effectively replaces it.

    It looks like the explicit value of "io.serializations" in your cluster
    configuration is overriding the value I set up from within Cascalog. Try
    changing your list of serializations to this:

    cascading.tuple. hadoop.BytesSerialization, cascading.tuple.hadoop.TupleSerialization,
    org.apache.hadoop.io.serializer.WritableSerialization,
    cascalog.hadoop.ClojureKryoSerialization

    You'll see big improvements, in this case and in any other case where
    JavaSerialization was grabbing tuples.
    On Sat, Jan 7, 2012 at 2:30 AM, Artem Boytsov wrote:

    Amazingly, if I replace vector of longs with its textual representation
    simply by (str/join " " vec), the difference between "map input bytes" and
    "map output bytes" becomes only x3 (instead of x40!!!!), and the whole
    mapreduce is finished about 10 times faster.

    Looking at my screenshot, when using a vector 2Gb of map input becomes
    90Gb of map output. When I substitute this vector by a string, 15Gb of map
    input becomes 48Gb of map output.

    I am at a loss of words.

    Any ideas?

    Artem.

    On Sat, Jan 7, 2012 at 12:19 AM, Artem Boytsov wrote:

    I realized that map output bytes are probably before compression though
    x40 compression ratio for this data still doesn't seem plausible.

    I dumped the data in a readable format (i.e. "-2147160804 [-1860497744
    681993524]") and even this file is only x3 times larger than the original,
    binary, compressed version.

    Artem.
    On Sat, Jan 7, 2012 at 12:07 AM, Artem Boytsov wrote:

    Hello, guys,

    I'm seeing some very weird map output data blow up which I find very
    hard to explain. The intermediate map output bytes is x40 map input bytes
    for an extremely simple query:

    (?<- output [?pair] (input _ ?pair))

    Both output and input files are hfs-seqfile.

    The input has two fields: java.lang.Long and
    clojure.lang.PersistentVector of java.lang.Long (always of size 2), for
    example:
    1 [ 2, 3 ]

    The query simply seeks to throw out the first field and dedupe the
    second.

    Compression is turned on using Lzo codec. Map output compression is
    turned on as well.
    Serializers specified: cascading.tuple. hadoop.BytesSerialization,
    cascading.tuple.hadoop.TupleSerialization,
    org.apache.hadoop.io.serializer.WritableSerialization,
    org.apache.hadoop.io.serializer.JavaSerialization"

    Libraries used:
    ls -1 lib|grep casca
    cascading-core-1.2.4.jar
    cascading.kryo-0.1.5.jar
    cascalog-1.8.5-20120102.070300-13.jar

    Please take a look at the attached screenshot:
    - 175m input records at ~2Gb which is ~10 bytes per record. Sounds
    reasonable.
    - map output records is the same (correct), but look at map output
    bytes: 90Gb (!!!!!!!!!!!!!!!!!!)

    How can it be? Map output should be just ?pair, map input is a long +
    ?pair. I am really baffled. Even if you convert records to their decimal
    ascii representation, it wouldn't be x40. Could it be Cascading issue?

    Thank you so much for your help!

    Artem.

    --
    Sam Ritchie, Twitter Inc
    703.662.1337
    @sritchie09

    (Too brief? Here's why! http://emailcharter.org)

    --
    Sam Ritchie, Twitter Inc
    703.662.1337
    @sritchie09

    (Too brief? Here's why! http://emailcharter.org)

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcascalog-user @
categoriesclojure, hadoop
postedJan 7, '12 at 8:08a
activeJan 11, '12 at 4:32a
posts6
users2
websiteclojure.org
irc#clojure

2 users in discussion

Artem Boytsov: 4 posts Sam Ritchie: 2 posts

People

Translate

site design / logo © 2022 Grokbase