I'm just not understanding how Cascalog queries get assigned to mappers I
think. I have a generator that generates a list of directories that looks
like this:

[[ "dir1" "/data/dir1"] ["dir2" "/data/dir2"] ["dir3" "/data/dir3"]]

and a defmapcatop operation defined that reads the files in the directory,
extracts data from them and emits a sequence of tuples something like this:

(["field1" "field2" "field3"] ["field1" "field2" "fields3"])

and the sink is a TextDelimited sink.

It's all working perfectly when I run it locally, but when I try to run on
our Hadoop cluster it only runs on one mapper. I thought that it would
create a mapper for each tuple coming off the generator (up to the max
number of tuples).

What am I misunderstanding about Hadoop, Cascading or Cascalog?

Search Discussions

  • David Kincaid at Sep 30, 2012 at 2:28 am
    After a good deal of reading this evening I think I now understand what it
    going on. I'm pretty new to Hadoop and haven't even ever written a basic
    MapReduce job (always used Cascading or Cascalog), so I didn't understand
    about InputFormat's and InputSplit's. Now that I do I see why I only get
    one mapper when using a Clojure seq as a generator. Cascalog is using
    MemorySourceTap from com.twitter.maple which only ever uses a single split
    on the tuple stream.

    So I'm going to extend MemorySourceTap and create my own
    SplittableMemorySourceTap that will create splits on the tuple list.

    It seems like never ending learning in this Hadoop world... That's what
    makes it so fun, right?

    - Dave
    On Saturday, September 29, 2012 6:45:40 PM UTC-5, David Kincaid wrote:

    I'm just not understanding how Cascalog queries get assigned to mappers I
    think. I have a generator that generates a list of directories that looks
    like this:

    [[ "dir1" "/data/dir1"] ["dir2" "/data/dir2"] ["dir3" "/data/dir3"]]

    and a defmapcatop operation defined that reads the files in the directory,
    extracts data from them and emits a sequence of tuples something like this:

    (["field1" "field2" "field3"] ["field1" "field2" "fields3"])

    and the sink is a TextDelimited sink.

    It's all working perfectly when I run it locally, but when I try to run on
    our Hadoop cluster it only runs on one mapper. I thought that it would
    create a mapper for each tuple coming off the generator (up to the max
    number of tuples).

    What am I misunderstanding about Hadoop, Cascading or Cascalog?
  • Bertrand Dechoux at Sep 30, 2012 at 10:16 am
    I am a bit curious but why are you using a clojure seq/MemorySourceTap
    on a cluster.
    That's indeed great for testing but on a cluster it would mean that
    you are trying to process a small amount of data (ie not a significant
    multiple of 64MB). As a consequence, using the cluster would be
    overkill and it could even be slower than executing everything locally
    in memory. (By the way, does cascalog handle the cascading local mode
    now? I haven't checked.)

    Regards

    Bertrand
    On Sep 30, 4:28 am, David Kincaid wrote:
    After a good deal of reading this evening I think I now understand what it
    going on. I'm pretty new to Hadoop and haven't even ever written a basic
    MapReduce job (always used Cascading or Cascalog), so I didn't understand
    about InputFormat's and InputSplit's. Now that I do I see why I only get
    one mapper when using a Clojure seq as a generator. Cascalog is using
    MemorySourceTap from com.twitter.maple which only ever uses a single split
    on the tuple stream.

    So I'm going to extend MemorySourceTap and create my own
    SplittableMemorySourceTap that will create splits on the tuple list.

    It seems like never ending learning in this Hadoop world... That's what
    makes it so fun, right?

    - Dave






    On Saturday, September 29, 2012 6:45:40 PM UTC-5, David Kincaid wrote:

    I'm just not understanding how Cascalog queries get assigned to mappers I
    think. I have a generator that generates a list of directories that looks
    like this:
    [[ "dir1" "/data/dir1"] ["dir2" "/data/dir2"] ["dir3" "/data/dir3"]]
    and a defmapcatop operation defined that reads the files in the directory,
    extracts data from them and emits a sequence of tuples something like this:
    (["field1" "field2" "field3"] ["field1" "field2" "fields3"])
    and the sink is a TextDelimited sink.
    It's all working perfectly when I run it locally, but when I try to run on
    our Hadoop cluster it only runs on one mapper. I thought that it would
    create a mapper for each tuple coming off the generator (up to the max
    number of tuples).
    What am I misunderstanding about Hadoop, Cascading or Cascalog?
  • David Kincaid at Sep 30, 2012 at 6:31 pm
    I have a directory of 700 binary formatted databases (each database
    contains multiple files in its own database). What I need to do is to run
    an extraction function on each directory and generate records from the
    database. The result of the extraction should be a pipe delimited file with
    about 10 fields in each row. The extraction of each takes 10-15 minutes to
    run as it spits out 100's of thousands of records from each database. I
    wanted to parallelize the processing since each database extraction is
    independent of all the others and Hadoop seemed like the sensible way to do
    that so I didn't have to write my own distributed queue, worker
    coordination, etc.

    So I have a generator function that takes a directory path and returns a
    list of 1-tuples all the subdirectories with one directory name per tuple.
    Then I have a defmapcatop operator that takes a directory name and performs
    the extraction from the database in that directory emitting 10-tuples for
    each record extracted from the database.

    I just finally got everything working about 10 minutes ago. I ended up
    deeper in the guts of Hadoop than I wanted as I was implementing my
    SplittableMemorySourceTap. It splits the tuple into N pieces and provides a
    RecordReader over each split. It was great learning experience for sure.


    On Sunday, September 30, 2012 5:16:31 AM UTC-5, Bertrand Dechoux wrote:

    I am a bit curious but why are you using a clojure seq/MemorySourceTap
    on a cluster.
    That's indeed great for testing but on a cluster it would mean that
    you are trying to process a small amount of data (ie not a significant
    multiple of 64MB). As a consequence, using the cluster would be
    overkill and it could even be slower than executing everything locally
    in memory. (By the way, does cascalog handle the cascading local mode
    now? I haven't checked.)

    Regards

    Bertrand
    On Sep 30, 4:28 am, David Kincaid wrote:
    After a good deal of reading this evening I think I now understand what it
    going on. I'm pretty new to Hadoop and haven't even ever written a basic
    MapReduce job (always used Cascading or Cascalog), so I didn't
    understand
    about InputFormat's and InputSplit's. Now that I do I see why I only get
    one mapper when using a Clojure seq as a generator. Cascalog is using
    MemorySourceTap from com.twitter.maple which only ever uses a single split
    on the tuple stream.

    So I'm going to extend MemorySourceTap and create my own
    SplittableMemorySourceTap that will create splits on the tuple list.

    It seems like never ending learning in this Hadoop world... That's what
    makes it so fun, right?

    - Dave






    On Saturday, September 29, 2012 6:45:40 PM UTC-5, David Kincaid wrote:

    I'm just not understanding how Cascalog queries get assigned to
    mappers I
    think. I have a generator that generates a list of directories that
    looks
    like this:
    [[ "dir1" "/data/dir1"] ["dir2" "/data/dir2"] ["dir3" "/data/dir3"]]
    and a defmapcatop operation defined that reads the files in the
    directory,
    extracts data from them and emits a sequence of tuples something like
    this:
    (["field1" "field2" "field3"] ["field1" "field2" "fields3"])
    and the sink is a TextDelimited sink.
    It's all working perfectly when I run it locally, but when I try to
    run on
    our Hadoop cluster it only runs on one mapper. I thought that it would
    create a mapper for each tuple coming off the generator (up to the max
    number of tuples).
    What am I misunderstanding about Hadoop, Cascading or Cascalog?
  • Bertrand Dechoux at Oct 1, 2012 at 6:12 am
    Are all your 'databases' inside HDFS? Wouldn't it be possible to
    provide Hadoop only the paths to your files and let it handle split
    and locality?
    Even if the setup is complex, you could do your directory scan before
    running hadoop and provide Hadoop only the programmatically filtered
    list of files that you want to process.
    By reading your description of SplittableMemorySourceTap, it seems
    like you are inventing the wheel again and not necessarily the fastest
    one. Mainly because your are writing custom code and that it seems
    like your method disregards data locality.
    "Data is conceptually record-oriented in the Hadoop programming framework. Individual input files are broken into lines or into other formats specific to the application logic. Each > process running on a node in the cluster then processes a subset of these records. The Hadoop framework then schedules these processes in proximity to the location of
    data/records using knowledge from the distributed file system. Since files are spread across the distributed file system as chunks, each compute process running on a node
    operates on a subset of the data. Which data operated on by a node is chosen based on its locality to the node: most data is read from the local disk straight into the CPU,
    alleviating strain on network bandwidth and preventing unnecessary network transfers. This strategy of moving computation to the data, instead of moving the data to the
    computation allows Hadoop to achieve high data locality which in turn results in high performance."
    http://developer.yahoo.com/hadoop/tutorial/module1.html

    Regards

    Bertrand

    On Sep 30, 8:30 pm, David Kincaid wrote:
    I have a directory of 700 binary formatted databases (each database
    contains multiple files in its own database). What I need to do is to run
    an extraction function on each directory and generate records from the
    database. The result of the extraction should be a pipe delimited file with
    about 10 fields in each row. The extraction of each takes 10-15 minutes to
    run as it spits out 100's of thousands of records from each database. I
    wanted to parallelize the processing since each database extraction is
    independent of all the others and Hadoop seemed like the sensible way to do
    that so I didn't have to write my own distributed queue, worker
    coordination, etc.

    So I have a generator function that takes a directory path and returns a
    list of 1-tuples all the subdirectories with one directory name per tuple.
    Then I have a defmapcatop operator that takes a directory name and performs
    the extraction from the database in that directory emitting 10-tuples for
    each record extracted from the database.

    I just finally got everything working about 10 minutes ago. I ended up
    deeper in the guts of Hadoop than I wanted as I was implementing my
    SplittableMemorySourceTap. It splits the tuple into N pieces and provides a
    RecordReader over each split. It was great learning experience for sure.






    On Sunday, September 30, 2012 5:16:31 AM UTC-5, Bertrand Dechoux wrote:

    I am a bit curious but why are you using a clojure seq/MemorySourceTap
    on a cluster.
    That's indeed great for testing but on a cluster it would mean that
    you are trying to process a small amount of data (ie not a significant
    multiple of 64MB). As a consequence, using the cluster would be
    overkill and it could even be slower than executing everything locally
    in memory. (By the way, does cascalog handle the cascading local mode
    now? I haven't checked.)
    Regards
    Bertrand
    On Sep 30, 4:28 am, David Kincaid wrote:
    After a good deal of reading this evening I think I now understand what it
    going on. I'm pretty new to Hadoop and haven't even ever written a basic
    MapReduce job (always used Cascading or Cascalog), so I didn't
    understand
    about InputFormat's and InputSplit's. Now that I do I see why I only get
    one mapper when using a Clojure seq as a generator. Cascalog is using
    MemorySourceTap from com.twitter.maple which only ever uses a single split
    on the tuple stream.
    So I'm going to extend MemorySourceTap and create my own
    SplittableMemorySourceTap that will create splits on the tuple list.
    It seems like never ending learning in this Hadoop world... That's what
    makes it so fun, right?
    - Dave
    On Saturday, September 29, 2012 6:45:40 PM UTC-5, David Kincaid wrote:

    I'm just not understanding how Cascalog queries get assigned to
    mappers I
    think. I have a generator that generates a list of directories that
    looks
    like this:
    [[ "dir1" "/data/dir1"] ["dir2" "/data/dir2"] ["dir3" "/data/dir3"]]
    and a defmapcatop operation defined that reads the files in the
    directory,
    extracts data from them and emits a sequence of tuples something like
    this:
    (["field1" "field2" "field3"] ["field1" "field2" "fields3"])
    and the sink is a TextDelimited sink.
    It's all working perfectly when I run it locally, but when I try to
    run on
    our Hadoop cluster it only runs on one mapper. I thought that it would
    create a mapper for each tuple coming off the generator (up to the max
    number of tuples).
    What am I misunderstanding about Hadoop, Cascading or Cascalog?
  • David Kincaid at Oct 1, 2012 at 12:31 pm
    The databases are not in HDFS right now, but certainly could be. These are
    truly databases and each one consists of several binary files. Think about
    taking the binary data files from Oracle or MySQL, except these are a
    custom format. We have a library that we wrote that can read these files
    (as a group) and produce records. But they only operate on whole files and
    not just pieces of them. So I don't see how I can let Hadoop split up those
    files into chunks and produce records from one of those chunks. Especially
    since there is no delimiter on records in these files. They are just fixed
    length records. The other catch is that each file can contain multiple
    record types (for example the account file has payments, transactions and
    invoices) and the record types have to be taken together to make sense.

    So unless I'm missing something basic, we would still have to write our own
    InputFormat, RecordReader and InputSplit for these files if we wanted to
    break them into chunks and let each mapper process just pieces of each
    database. And these would be much more complicated that the way I did it
    this weekend. I understand it's not the most efficient way to do it, but
    it's working and doesn't seem terribly outside of Hadoop's purpose.

    Thanks for the feedback and suggestions. I'm still processing it and maybe
    there is something I'm not seeing right now.

    Dave


    On Monday, October 1, 2012 1:12:42 AM UTC-5, Bertrand Dechoux wrote:

    Are all your 'databases' inside HDFS? Wouldn't it be possible to
    provide Hadoop only the paths to your files and let it handle split
    and locality?
    Even if the setup is complex, you could do your directory scan before
    running hadoop and provide Hadoop only the programmatically filtered
    list of files that you want to process.
    By reading your description of SplittableMemorySourceTap, it seems
    like you are inventing the wheel again and not necessarily the fastest
    one. Mainly because your are writing custom code and that it seems
    like your method disregards data locality.
    "Data is conceptually record-oriented in the Hadoop programming
    framework. Individual input files are broken into lines or into other
    formats specific to the application logic. Each > process running on a node
    in the cluster then processes a subset of these records. The Hadoop
    framework then schedules these processes in proximity to the location of
    data/records using knowledge from the distributed file system. Since
    files are spread across the distributed file system as chunks, each compute
    process running on a node
    operates on a subset of the data. Which data operated on by a node is
    chosen based on its locality to the node: most data is read from the local
    disk straight into the CPU,
    alleviating strain on network bandwidth and preventing unnecessary
    network transfers. This strategy of moving computation to the data, instead
    of moving the data to the
    computation allows Hadoop to achieve high data locality which in turn
    results in high performance."

    http://developer.yahoo.com/hadoop/tutorial/module1.html

    Regards

    Bertrand

    On Sep 30, 8:30 pm, David Kincaid wrote:
    I have a directory of 700 binary formatted databases (each database
    contains multiple files in its own database). What I need to do is to run
    an extraction function on each directory and generate records from the
    database. The result of the extraction should be a pipe delimited file with
    about 10 fields in each row. The extraction of each takes 10-15 minutes to
    run as it spits out 100's of thousands of records from each database. I
    wanted to parallelize the processing since each database extraction is
    independent of all the others and Hadoop seemed like the sensible way to do
    that so I didn't have to write my own distributed queue, worker
    coordination, etc.

    So I have a generator function that takes a directory path and returns a
    list of 1-tuples all the subdirectories with one directory name per tuple.
    Then I have a defmapcatop operator that takes a directory name and performs
    the extraction from the database in that directory emitting 10-tuples for
    each record extracted from the database.

    I just finally got everything working about 10 minutes ago. I ended up
    deeper in the guts of Hadoop than I wanted as I was implementing my
    SplittableMemorySourceTap. It splits the tuple into N pieces and
    provides a
    RecordReader over each split. It was great learning experience for sure.






    On Sunday, September 30, 2012 5:16:31 AM UTC-5, Bertrand Dechoux wrote:

    I am a bit curious but why are you using a clojure seq/MemorySourceTap
    on a cluster.
    That's indeed great for testing but on a cluster it would mean that
    you are trying to process a small amount of data (ie not a significant
    multiple of 64MB). As a consequence, using the cluster would be
    overkill and it could even be slower than executing everything locally
    in memory. (By the way, does cascalog handle the cascading local mode
    now? I haven't checked.)
    Regards
    Bertrand
    On Sep 30, 4:28 am, David Kincaid wrote:
    After a good deal of reading this evening I think I now understand
    what
    it
    going on. I'm pretty new to Hadoop and haven't even ever written a
    basic
    MapReduce job (always used Cascading or Cascalog), so I didn't
    understand
    about InputFormat's and InputSplit's. Now that I do I see why I only
    get
    one mapper when using a Clojure seq as a generator. Cascalog is
    using
    MemorySourceTap from com.twitter.maple which only ever uses a single split
    on the tuple stream.
    So I'm going to extend MemorySourceTap and create my own
    SplittableMemorySourceTap that will create splits on the tuple list.
    It seems like never ending learning in this Hadoop world... That's
    what
    makes it so fun, right?
    - Dave
    On Saturday, September 29, 2012 6:45:40 PM UTC-5, David Kincaid
    wrote:
    I'm just not understanding how Cascalog queries get assigned to
    mappers I
    think. I have a generator that generates a list of directories
    that
    looks
    like this:
    [[ "dir1" "/data/dir1"] ["dir2" "/data/dir2"] ["dir3"
    "/data/dir3"]]
    and a defmapcatop operation defined that reads the files in the
    directory,
    extracts data from them and emits a sequence of tuples something
    like
    this:
    (["field1" "field2" "field3"] ["field1" "field2" "fields3"])
    and the sink is a TextDelimited sink.
    It's all working perfectly when I run it locally, but when I try
    to
    run on
    our Hadoop cluster it only runs on one mapper. I thought that it
    would
    create a mapper for each tuple coming off the generator (up to the
    max
    number of tuples).
    What am I misunderstanding about Hadoop, Cascading or Cascalog?

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcascalog-user @
categoriesclojure, hadoop
postedSep 29, '12 at 11:45p
activeOct 1, '12 at 12:31p
posts6
users2
websiteclojure.org
irc#clojure

2 users in discussion

David Kincaid: 4 posts Bertrand Dechoux: 2 posts

People

Translate

site design / logo © 2022 Grokbase