Grokbase Groups Pig user January 2010
FAQ
Hi Mridul,
Thanks your approach works fine. This is how my current pig script
looks like :

define CMD `s3fetch.py` SHIP('/root/s3fetch.py');
r1 = LOAD '/ip/s3fetch_input_files' AS (filename:chararray);
grp_r1 = GROUP r1 BY filename PARALLEL 5;
r2 = FOREACH grp_r1 GENERATE FLATTEN(r1);
r3 = STREAM r2 through CMD;
store r3 INTO '/op/s3fetch_debug_log';

And here is my s3fetch.py :
for word in sys.stdin:
word=word.rstrip()
str='/usr/local/hadoop-0.20.0/bin/hadoop fs -cp
s3n://<s3-credentials>@bucket/dir-name/'+word+' /ip/data/.';
sys.stdout.write('\n\n'+word+ ':\t'+str+'\n')
(input_str,out_err) = os.popen4(str);
for line in out_err.readlines():
sys.stdout.write('\t'+word+'::\t'+line)



So, the job starts fine and I see that my hadoop directory ( /ip/data/.)
starts filling up with s3 files. But after sometime it gets stuck. I see
lots of failed/restarted jobs in the jobtracker. And the number of files
dont increase in /ip/data.

Could this be happening because of parallel hdfs writes ( via hadoop fs -cp
<> <> ) making primary-name-node a blocking server ?

Any help is greatly appreciated.

-Thanks,
Prasen

On Mon, Jan 25, 2010 at 8:58 AM, Mridul Muralidharan
wrote:
If each line from your file has to be processed by a different mapper -
other than by writing a custom slicer, a very dirty hack would be to :
a) create N number of files with one line each.
b) Or, do something like :
input_lines = load 'my_s3_list_file' as (location_line:chararray);
grp_op = GROUP input_lines BY location_line PARALLEL $NUM_MAPPERS_REQUIRED;
actual_result = FOREACH grp_op GENERATE MY_S3_UDF(group);


The preferred way, as Dmitriy mentioned, would be to use a custom Slicer
ofcourse !

Regards,
Mridul


prasenjit mukherjee wrote:
I want to use Pig to paralelize processing on a number of requests. There
are ~ 300 request which needs to be processed. Each processing consist of
following :
1. Fetch file from s3 to local
2. Do some preprocessing
3. Put it into hdfs

My input is a small file with 300 lines. The problem is that pig seems to
be
always creating a single mapper, because of which the load is not properly
distributed. Any way I can enforce splitting of smaller input files as
well
? Below is the pig output which tends to indicate that there is only 1
mapper. Let me know if my understanding is wrong.

2010-01-24 05:31:53,148 [main] INFO

org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
- MR plan size before optimization: 1
2010-01-24 05:31:53,148 [main] INFO

org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
- MR plan size after optimization: 1
2010-01-24 05:31:55,006 [main] INFO

org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler
- Setting up single store job

Thanks
-Prasen.

Search Discussions

  • Dmitriy Ryaboy at Jan 26, 2010 at 4:25 pm
    Do you know why the jobs are failing? Take a look at the logs. I
    suspect it may be due to s3, not hadoop.

    -D

    On Tue, Jan 26, 2010 at 7:57 AM, prasenjit mukherjee
    wrote:
    Hi Mridul,
    Thanks your approach  works fine. This is how my current pig script
    looks like :

    define CMD `s3fetch.py` SHIP('/root/s3fetch.py');
    r1 = LOAD '/ip/s3fetch_input_files' AS (filename:chararray);
    grp_r1 = GROUP r1 BY filename PARALLEL 5;
    r2 = FOREACH grp_r1 GENERATE FLATTEN(r1);
    r3 = STREAM r2 through CMD;
    store r3 INTO '/op/s3fetch_debug_log';

    And here is my s3fetch.py :
    for word in sys.stdin:
    word=word.rstrip()
    str='/usr/local/hadoop-0.20.0/bin/hadoop fs -cp
    s3n://<s3-credentials>@bucket/dir-name/'+word+' /ip/data/.';
    sys.stdout.write('\n\n'+word+ ':\t'+str+'\n')
    (input_str,out_err) = os.popen4(str);
    for line in out_err.readlines():
    sys.stdout.write('\t'+word+'::\t'+line)



    So, the job starts fine and I see that my hadoop directory ( /ip/data/.)
    starts filling up with s3 files. But after sometime it gets stuck. I see
    lots of failed/restarted jobs  in the jobtracker. And the number of files
    dont increase in /ip/data.

    Could this be happening because of parallel hdfs writes ( via hadoop fs -cp
    <> <> ) making primary-name-node a blocking server ?

    Any help is greatly appreciated.

    -Thanks,
    Prasen

    On Mon, Jan 25, 2010 at 8:58 AM, Mridul Muralidharan
    wrote:
    If each line from your file has to be processed by a different mapper -
    other than by writing a custom slicer, a very dirty hack would be to :
    a) create N number of files with one line each.
    b) Or, do something like :
    input_lines = load 'my_s3_list_file' as (location_line:chararray);
    grp_op = GROUP input_lines BY location_line PARALLEL $NUM_MAPPERS_REQUIRED;
    actual_result = FOREACH grp_op GENERATE MY_S3_UDF(group);


    The preferred way, as Dmitriy mentioned, would be to use a custom Slicer
    ofcourse !

    Regards,
    Mridul


    prasenjit mukherjee wrote:
    I want to use Pig to paralelize processing on a number of  requests. There
    are ~ 300 request which needs to be  processed. Each processing consist of
    following :
    1. Fetch file from s3 to local
    2. Do some preprocessing
    3. Put it into hdfs

    My input is a small file with 300 lines. The problem is that pig seems to
    be
    always creating a single mapper, because of which the load is not properly
    distributed. Any way I can enforce splitting of smaller input files as
    well
    ? Below is the pig output which tends to indicate that there is only 1
    mapper. Let me know if my understanding is wrong.

    2010-01-24 05:31:53,148 [main] INFO

    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - MR plan size before optimization: 1
    2010-01-24 05:31:53,148 [main] INFO

    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - MR plan size after optimization: 1
    2010-01-24 05:31:55,006 [main] INFO

    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler
    - Setting up single store job

    Thanks
    -Prasen.
  • Prasenjit mukherjee at Jan 28, 2010 at 5:23 am
    Now I see. The tasks are failing with the following error message :

    *Task attempt_201001272359_0001_r_000000_0 failed to report status for 600
    seconds. Killing!*

    Looks like hadoop kills/restarts jobs which takes more than 600 seconds. Is
    there any way I can increase it to some very high number ?

    -Thanks,
    Prasenjit


    On Tue, Jan 26, 2010 at 9:55 PM, Dmitriy Ryaboy wrote:

    Do you know why the jobs are failing? Take a look at the logs. I
    suspect it may be due to s3, not hadoop.

    -D

    On Tue, Jan 26, 2010 at 7:57 AM, prasenjit mukherjee
    wrote:
    Hi Mridul,
    Thanks your approach works fine. This is how my current pig script
    looks like :

    define CMD `s3fetch.py` SHIP('/root/s3fetch.py');
    r1 = LOAD '/ip/s3fetch_input_files' AS (filename:chararray);
    grp_r1 = GROUP r1 BY filename PARALLEL 5;
    r2 = FOREACH grp_r1 GENERATE FLATTEN(r1);
    r3 = STREAM r2 through CMD;
    store r3 INTO '/op/s3fetch_debug_log';

    And here is my s3fetch.py :
    for word in sys.stdin:
    word=word.rstrip()
    str='/usr/local/hadoop-0.20.0/bin/hadoop fs -cp
    s3n://<s3-credentials>@bucket/dir-name/'+word+' /ip/data/.';
    sys.stdout.write('\n\n'+word+ ':\t'+str+'\n')
    (input_str,out_err) = os.popen4(str);
    for line in out_err.readlines():
    sys.stdout.write('\t'+word+'::\t'+line)



    So, the job starts fine and I see that my hadoop directory ( /ip/data/.)
    starts filling up with s3 files. But after sometime it gets stuck. I see
    lots of failed/restarted jobs in the jobtracker. And the number of
    files
    dont increase in /ip/data.

    Could this be happening because of parallel hdfs writes ( via hadoop fs
    -cp
    <> <> ) making primary-name-node a blocking server ?

    Any help is greatly appreciated.

    -Thanks,
    Prasen

    On Mon, Jan 25, 2010 at 8:58 AM, Mridul Muralidharan
    wrote:
    If each line from your file has to be processed by a different mapper -
    other than by writing a custom slicer, a very dirty hack would be to :
    a) create N number of files with one line each.
    b) Or, do something like :
    input_lines = load 'my_s3_list_file' as (location_line:chararray);
    grp_op = GROUP input_lines BY location_line PARALLEL
    $NUM_MAPPERS_REQUIRED;
    actual_result = FOREACH grp_op GENERATE MY_S3_UDF(group);


    The preferred way, as Dmitriy mentioned, would be to use a custom
    Slicer
    ofcourse !

    Regards,
    Mridul


    prasenjit mukherjee wrote:
    I want to use Pig to paralelize processing on a number of requests.
    There
    are ~ 300 request which needs to be processed. Each processing
    consist of
    following :
    1. Fetch file from s3 to local
    2. Do some preprocessing
    3. Put it into hdfs

    My input is a small file with 300 lines. The problem is that pig seems
    to
    be
    always creating a single mapper, because of which the load is not
    properly
    distributed. Any way I can enforce splitting of smaller input files as
    well
    ? Below is the pig output which tends to indicate that there is only 1
    mapper. Let me know if my understanding is wrong.

    2010-01-24 05:31:53,148 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - MR plan size before optimization: 1
    2010-01-24 05:31:53,148 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - MR plan size after optimization: 1
    2010-01-24 05:31:55,006 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler
    - Setting up single store job

    Thanks
    -Prasen.
  • Amogh Vasekar at Jan 28, 2010 at 5:28 am
    Yes, parameter is mapred.task.timeout in mS.
    You can also update status / output to stdout after some time chunks to avoid this :)

    Amogh


    On 1/28/10 10:52 AM, "prasenjit mukherjee" wrote:

    Now I see. The tasks are failing with the following error message :

    *Task attempt_201001272359_0001_r_000000_0 failed to report status for 600
    seconds. Killing!*

    Looks like hadoop kills/restarts jobs which takes more than 600 seconds. Is
    there any way I can increase it to some very high number ?

    -Thanks,
    Prasenjit


    On Tue, Jan 26, 2010 at 9:55 PM, Dmitriy Ryaboy wrote:

    Do you know why the jobs are failing? Take a look at the logs. I
    suspect it may be due to s3, not hadoop.

    -D

    On Tue, Jan 26, 2010 at 7:57 AM, prasenjit mukherjee
    wrote:
    Hi Mridul,
    Thanks your approach works fine. This is how my current pig script
    looks like :

    define CMD `s3fetch.py` SHIP('/root/s3fetch.py');
    r1 = LOAD '/ip/s3fetch_input_files' AS (filename:chararray);
    grp_r1 = GROUP r1 BY filename PARALLEL 5;
    r2 = FOREACH grp_r1 GENERATE FLATTEN(r1);
    r3 = STREAM r2 through CMD;
    store r3 INTO '/op/s3fetch_debug_log';

    And here is my s3fetch.py :
    for word in sys.stdin:
    word=word.rstrip()
    str='/usr/local/hadoop-0.20.0/bin/hadoop fs -cp
    s3n://<s3-credentials>@bucket/dir-name/'+word+' /ip/data/.';
    sys.stdout.write('\n\n'+word+ ':\t'+str+'\n')
    (input_str,out_err) = os.popen4(str);
    for line in out_err.readlines():
    sys.stdout.write('\t'+word+'::\t'+line)



    So, the job starts fine and I see that my hadoop directory ( /ip/data/.)
    starts filling up with s3 files. But after sometime it gets stuck. I see
    lots of failed/restarted jobs in the jobtracker. And the number of
    files
    dont increase in /ip/data.

    Could this be happening because of parallel hdfs writes ( via hadoop fs
    -cp
    <> <> ) making primary-name-node a blocking server ?

    Any help is greatly appreciated.

    -Thanks,
    Prasen

    On Mon, Jan 25, 2010 at 8:58 AM, Mridul Muralidharan
    wrote:
    If each line from your file has to be processed by a different mapper -
    other than by writing a custom slicer, a very dirty hack would be to :
    a) create N number of files with one line each.
    b) Or, do something like :
    input_lines = load 'my_s3_list_file' as (location_line:chararray);
    grp_op = GROUP input_lines BY location_line PARALLEL
    $NUM_MAPPERS_REQUIRED;
    actual_result = FOREACH grp_op GENERATE MY_S3_UDF(group);


    The preferred way, as Dmitriy mentioned, would be to use a custom
    Slicer
    ofcourse !

    Regards,
    Mridul


    prasenjit mukherjee wrote:
    I want to use Pig to paralelize processing on a number of requests.
    There
    are ~ 300 request which needs to be processed. Each processing
    consist of
    following :
    1. Fetch file from s3 to local
    2. Do some preprocessing
    3. Put it into hdfs

    My input is a small file with 300 lines. The problem is that pig seems
    to
    be
    always creating a single mapper, because of which the load is not
    properly
    distributed. Any way I can enforce splitting of smaller input files as
    well
    ? Below is the pig output which tends to indicate that there is only 1
    mapper. Let me know if my understanding is wrong.

    2010-01-24 05:31:53,148 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - MR plan size before optimization: 1
    2010-01-24 05:31:53,148 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - MR plan size after optimization: 1
    2010-01-24 05:31:55,006 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler
    - Setting up single store job

    Thanks
    -Prasen.
  • Prasenjit mukherjee at Jan 28, 2010 at 5:34 am
    Thanks Amogh for your quick response. Changing this property only on
    master's hadoop-site.xml will do or I need to do it on all the slaves as
    well ?

    Any way I can do this from PIG ( or I guess I am asking too much here :) )
    On Thu, Jan 28, 2010 at 10:57 AM, Amogh Vasekar wrote:

    Yes, parameter is mapred.task.timeout in mS.
    You can also update status / output to stdout after some time chunks to
    avoid this :)

    Amogh


    On 1/28/10 10:52 AM, "prasenjit mukherjee" wrote:

    Now I see. The tasks are failing with the following error message :

    *Task attempt_201001272359_0001_r_000000_0 failed to report status for 600
    seconds. Killing!*

    Looks like hadoop kills/restarts jobs which takes more than 600 seconds.
    Is
    there any way I can increase it to some very high number ?

    -Thanks,
    Prasenjit


    On Tue, Jan 26, 2010 at 9:55 PM, Dmitriy Ryaboy wrote:

    Do you know why the jobs are failing? Take a look at the logs. I
    suspect it may be due to s3, not hadoop.

    -D

    On Tue, Jan 26, 2010 at 7:57 AM, prasenjit mukherjee
    wrote:
    Hi Mridul,
    Thanks your approach works fine. This is how my current pig script
    looks like :

    define CMD `s3fetch.py` SHIP('/root/s3fetch.py');
    r1 = LOAD '/ip/s3fetch_input_files' AS (filename:chararray);
    grp_r1 = GROUP r1 BY filename PARALLEL 5;
    r2 = FOREACH grp_r1 GENERATE FLATTEN(r1);
    r3 = STREAM r2 through CMD;
    store r3 INTO '/op/s3fetch_debug_log';

    And here is my s3fetch.py :
    for word in sys.stdin:
    word=word.rstrip()
    str='/usr/local/hadoop-0.20.0/bin/hadoop fs -cp
    s3n://<s3-credentials>@bucket/dir-name/'+word+' /ip/data/.';
    sys.stdout.write('\n\n'+word+ ':\t'+str+'\n')
    (input_str,out_err) = os.popen4(str);
    for line in out_err.readlines():
    sys.stdout.write('\t'+word+'::\t'+line)



    So, the job starts fine and I see that my hadoop directory (
    /ip/data/.)
    starts filling up with s3 files. But after sometime it gets stuck. I
    see
    lots of failed/restarted jobs in the jobtracker. And the number of
    files
    dont increase in /ip/data.

    Could this be happening because of parallel hdfs writes ( via hadoop fs
    -cp
    <> <> ) making primary-name-node a blocking server ?

    Any help is greatly appreciated.

    -Thanks,
    Prasen

    On Mon, Jan 25, 2010 at 8:58 AM, Mridul Muralidharan
    wrote:
    If each line from your file has to be processed by a different mapper
    -
    other than by writing a custom slicer, a very dirty hack would be to :
    a) create N number of files with one line each.
    b) Or, do something like :
    input_lines = load 'my_s3_list_file' as (location_line:chararray);
    grp_op = GROUP input_lines BY location_line PARALLEL
    $NUM_MAPPERS_REQUIRED;
    actual_result = FOREACH grp_op GENERATE MY_S3_UDF(group);


    The preferred way, as Dmitriy mentioned, would be to use a custom
    Slicer
    ofcourse !

    Regards,
    Mridul


    prasenjit mukherjee wrote:
    I want to use Pig to paralelize processing on a number of requests.
    There
    are ~ 300 request which needs to be processed. Each processing
    consist of
    following :
    1. Fetch file from s3 to local
    2. Do some preprocessing
    3. Put it into hdfs

    My input is a small file with 300 lines. The problem is that pig
    seems
    to
    be
    always creating a single mapper, because of which the load is not
    properly
    distributed. Any way I can enforce splitting of smaller input files
    as
    well
    ? Below is the pig output which tends to indicate that there is only
    1
    mapper. Let me know if my understanding is wrong.

    2010-01-24 05:31:53,148 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - MR plan size before optimization: 1
    2010-01-24 05:31:53,148 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - MR plan size after optimization: 1
    2010-01-24 05:31:55,006 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler
    - Setting up single store job

    Thanks
    -Prasen.
  • Amogh Vasekar at Jan 28, 2010 at 5:38 am
    Hi,
    You should be able to pass this as a cmd line argument using -D ... If you want to change it for all jobs on your own cluster, it would be in mapred-site.

    Amogh


    On 1/28/10 11:03 AM, "prasenjit mukherjee" wrote:

    Thanks Amogh for your quick response. Changing this property only on
    master's hadoop-site.xml will do or I need to do it on all the slaves as
    well ?

    Any way I can do this from PIG ( or I guess I am asking too much here :) )
    On Thu, Jan 28, 2010 at 10:57 AM, Amogh Vasekar wrote:

    Yes, parameter is mapred.task.timeout in mS.
    You can also update status / output to stdout after some time chunks to
    avoid this :)

    Amogh


    On 1/28/10 10:52 AM, "prasenjit mukherjee" wrote:

    Now I see. The tasks are failing with the following error message :

    *Task attempt_201001272359_0001_r_000000_0 failed to report status for 60
    seconds. Killing!*

    Looks like hadoop kills/restarts jobs which takes more than 600 seconds.
    Is
    there any way I can increase it to some very high number ?

    -Thanks,
    Prasenjit


    On Tue, Jan 26, 2010 at 9:55 PM, Dmitriy Ryaboy wrote:

    Do you know why the jobs are failing? Take a look at the logs. I
    suspect it may be due to s3, not hadoop.

    -D

    On Tue, Jan 26, 2010 at 7:57 AM, prasenjit mukherjee
    wrote:
    Hi Mridul,
    Thanks your approach works fine. This is how my current pig script
    looks like :

    define CMD `s3fetch.py` SHIP('/root/s3fetch.py');
    r1 = LOAD '/ip/s3fetch_input_files' AS (filename:chararray);
    grp_r1 = GROUP r1 BY filename PARALLEL 5;
    r2 = FOREACH grp_r1 GENERATE FLATTEN(r1);
    r3 = STREAM r2 through CMD;
    store r3 INTO '/op/s3fetch_debug_log';

    And here is my s3fetch.py :
    for word in sys.stdin:
    word=word.rstrip()
    str='/usr/local/hadoop-0.20.0/bin/hadoop fs -cp
    s3n://<s3-credentials>@bucket/dir-name/'+word+' /ip/data/.';
    sys.stdout.write('\n\n'+word+ ':\t'+str+'\n')
    (input_str,out_err) = os.popen4(str);
    for line in out_err.readlines():
    sys.stdout.write('\t'+word+'::\t'+line)



    So, the job starts fine and I see that my hadoop directory (
    /ip/data/.)
    starts filling up with s3 files. But after sometime it gets stuck. I
    see
    lots of failed/restarted jobs in the jobtracker. And the number of
    files
    dont increase in /ip/data.

    Could this be happening because of parallel hdfs writes ( via hadoop fs
    -cp
    <> <> ) making primary-name-node a blocking server ?

    Any help is greatly appreciated.

    -Thanks,
    Prasen

    On Mon, Jan 25, 2010 at 8:58 AM, Mridul Muralidharan
    wrote:
    If each line from your file has to be processed by a different mapper
    -
    other than by writing a custom slicer, a very dirty hack would be to :
    a) create N number of files with one line each.
    b) Or, do something like :
    input_lines = load 'my_s3_list_file' as (location_line:chararray);
    grp_op = GROUP input_lines BY location_line PARALLEL
    $NUM_MAPPERS_REQUIRED;
    actual_result = FOREACH grp_op GENERATE MY_S3_UDF(group);


    The preferred way, as Dmitriy mentioned, would be to use a custom
    Slicer
    ofcourse !

    Regards,
    Mridul


    prasenjit mukherjee wrote:
    I want to use Pig to paralelize processing on a number of requests.
    There
    are ~ 300 request which needs to be processed. Each processing
    consist of
    following :
    1. Fetch file from s3 to local
    2. Do some preprocessing
    3. Put it into hdfs

    My input is a small file with 300 lines. The problem is that pig
    seems
    to
    be
    always creating a single mapper, because of which the load is not
    properly
    distributed. Any way I can enforce splitting of smaller input files
    as
    well
    ? Below is the pig output which tends to indicate that there is only
    1
    mapper. Let me know if my understanding is wrong.

    2010-01-24 05:31:53,148 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - MR plan size before optimization: 1
    2010-01-24 05:31:53,148 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - MR plan size after optimization: 1
    2010-01-24 05:31:55,006 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler
    - Setting up single store job

    Thanks
    -Prasen.
  • Prasenjit mukherjee at Jan 28, 2010 at 5:56 am
    Not sure I understand. Are you saying that pig takes -D<> parameters
    directly. Will the following work :

    "pig -Dmapred.task.timeout=0 -f myfile.pig"

    On Thu, Jan 28, 2010 at 11:08 AM, Amogh Vasekar wrote:

    Hi,
    You should be able to pass this as a cmd line argument using -D ... If you
    want to change it for all jobs on your own cluster, it would be in
    mapred-site.

    Amogh


    On 1/28/10 11:03 AM, "prasenjit mukherjee" wrote:

    Thanks Amogh for your quick response. Changing this property only on
    master's hadoop-site.xml will do or I need to do it on all the slaves as
    well ?

    Any way I can do this from PIG ( or I guess I am asking too much here :) )
    On Thu, Jan 28, 2010 at 10:57 AM, Amogh Vasekar wrote:

    Yes, parameter is mapred.task.timeout in mS.
    You can also update status / output to stdout after some time chunks to
    avoid this :)

    Amogh


    On 1/28/10 10:52 AM, "prasenjit mukherjee" <
    pmukherjee@quattrowireless.com>
    wrote:

    Now I see. The tasks are failing with the following error message :

    *Task attempt_201001272359_0001_r_000000_0 failed to report status for 600
    seconds. Killing!*

    Looks like hadoop kills/restarts jobs which takes more than 600 seconds.
    Is
    there any way I can increase it to some very high number ?

    -Thanks,
    Prasenjit



    On Tue, Jan 26, 2010 at 9:55 PM, Dmitriy Ryaboy <dvryaboy@gmail.com>
    wrote:
    Do you know why the jobs are failing? Take a look at the logs. I
    suspect it may be due to s3, not hadoop.

    -D

    On Tue, Jan 26, 2010 at 7:57 AM, prasenjit mukherjee
    wrote:
    Hi Mridul,
    Thanks your approach works fine. This is how my current pig
    script
    looks like :

    define CMD `s3fetch.py` SHIP('/root/s3fetch.py');
    r1 = LOAD '/ip/s3fetch_input_files' AS (filename:chararray);
    grp_r1 = GROUP r1 BY filename PARALLEL 5;
    r2 = FOREACH grp_r1 GENERATE FLATTEN(r1);
    r3 = STREAM r2 through CMD;
    store r3 INTO '/op/s3fetch_debug_log';

    And here is my s3fetch.py :
    for word in sys.stdin:
    word=word.rstrip()
    str='/usr/local/hadoop-0.20.0/bin/hadoop fs -cp
    s3n://<s3-credentials>@bucket/dir-name/'+word+' /ip/data/.';
    sys.stdout.write('\n\n'+word+ ':\t'+str+'\n')
    (input_str,out_err) = os.popen4(str);
    for line in out_err.readlines():
    sys.stdout.write('\t'+word+'::\t'+line)



    So, the job starts fine and I see that my hadoop directory (
    /ip/data/.)
    starts filling up with s3 files. But after sometime it gets stuck. I
    see
    lots of failed/restarted jobs in the jobtracker. And the number of
    files
    dont increase in /ip/data.

    Could this be happening because of parallel hdfs writes ( via hadoop
    fs
    -cp
    <> <> ) making primary-name-node a blocking server ?

    Any help is greatly appreciated.

    -Thanks,
    Prasen

    On Mon, Jan 25, 2010 at 8:58 AM, Mridul Muralidharan
    wrote:
    If each line from your file has to be processed by a different
    mapper
    -
    other than by writing a custom slicer, a very dirty hack would be to
    :
    a) create N number of files with one line each.
    b) Or, do something like :
    input_lines = load 'my_s3_list_file' as (location_line:chararray);
    grp_op = GROUP input_lines BY location_line PARALLEL
    $NUM_MAPPERS_REQUIRED;
    actual_result = FOREACH grp_op GENERATE MY_S3_UDF(group);


    The preferred way, as Dmitriy mentioned, would be to use a custom
    Slicer
    ofcourse !

    Regards,
    Mridul


    prasenjit mukherjee wrote:
    I want to use Pig to paralelize processing on a number of
    requests.
    There
    are ~ 300 request which needs to be processed. Each processing
    consist of
    following :
    1. Fetch file from s3 to local
    2. Do some preprocessing
    3. Put it into hdfs

    My input is a small file with 300 lines. The problem is that pig
    seems
    to
    be
    always creating a single mapper, because of which the load is not
    properly
    distributed. Any way I can enforce splitting of smaller input files
    as
    well
    ? Below is the pig output which tends to indicate that there is
    only
    1
    mapper. Let me know if my understanding is wrong.

    2010-01-24 05:31:53,148 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - MR plan size before optimization: 1
    2010-01-24 05:31:53,148 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - MR plan size after optimization: 1
    2010-01-24 05:31:55,006 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler
    - Setting up single store job

    Thanks
    -Prasen.
  • Prasenjit mukherjee at Jan 28, 2010 at 9:39 am
    So I changed the property in hadoop-site.xml and it works great. Thanks
    to you guys for your help. s3->hdfs-put time got reduced by over 90%.
    On Thu, Jan 28, 2010 at 11:25 AM, prasenjit mukherjee wrote:

    Not sure I understand. Are you saying that pig takes -D<> parameters
    directly. Will the following work :

    "pig -Dmapred.task.timeout=0 -f myfile.pig"


    On Thu, Jan 28, 2010 at 11:08 AM, Amogh Vasekar wrote:

    Hi,
    You should be able to pass this as a cmd line argument using -D ... If you
    want to change it for all jobs on your own cluster, it would be in
    mapred-site.

    Amogh


    On 1/28/10 11:03 AM, "prasenjit mukherjee" <
    pmukherjee@quattrowireless.com> wrote:

    Thanks Amogh for your quick response. Changing this property only on
    master's hadoop-site.xml will do or I need to do it on all the slaves as
    well ?

    Any way I can do this from PIG ( or I guess I am asking too much here :) )

    On Thu, Jan 28, 2010 at 10:57 AM, Amogh Vasekar <amogh@yahoo-inc.com>
    wrote:
    Yes, parameter is mapred.task.timeout in mS.
    You can also update status / output to stdout after some time chunks to
    avoid this :)

    Amogh


    On 1/28/10 10:52 AM, "prasenjit mukherjee" <
    pmukherjee@quattrowireless.com>
    wrote:

    Now I see. The tasks are failing with the following error message :

    *Task attempt_201001272359_0001_r_000000_0 failed to report status for 600
    seconds. Killing!*

    Looks like hadoop kills/restarts jobs which takes more than 600 seconds.
    Is
    there any way I can increase it to some very high number ?

    -Thanks,
    Prasenjit



    On Tue, Jan 26, 2010 at 9:55 PM, Dmitriy Ryaboy <dvryaboy@gmail.com>
    wrote:
    Do you know why the jobs are failing? Take a look at the logs. I
    suspect it may be due to s3, not hadoop.

    -D

    On Tue, Jan 26, 2010 at 7:57 AM, prasenjit mukherjee
    wrote:
    Hi Mridul,
    Thanks your approach works fine. This is how my current pig
    script
    looks like :

    define CMD `s3fetch.py` SHIP('/root/s3fetch.py');
    r1 = LOAD '/ip/s3fetch_input_files' AS (filename:chararray);
    grp_r1 = GROUP r1 BY filename PARALLEL 5;
    r2 = FOREACH grp_r1 GENERATE FLATTEN(r1);
    r3 = STREAM r2 through CMD;
    store r3 INTO '/op/s3fetch_debug_log';

    And here is my s3fetch.py :
    for word in sys.stdin:
    word=word.rstrip()
    str='/usr/local/hadoop-0.20.0/bin/hadoop fs -cp
    s3n://<s3-credentials>@bucket/dir-name/'+word+' /ip/data/.';
    sys.stdout.write('\n\n'+word+ ':\t'+str+'\n')
    (input_str,out_err) = os.popen4(str);
    for line in out_err.readlines():
    sys.stdout.write('\t'+word+'::\t'+line)



    So, the job starts fine and I see that my hadoop directory (
    /ip/data/.)
    starts filling up with s3 files. But after sometime it gets stuck. I
    see
    lots of failed/restarted jobs in the jobtracker. And the number of
    files
    dont increase in /ip/data.

    Could this be happening because of parallel hdfs writes ( via hadoop
    fs
    -cp
    <> <> ) making primary-name-node a blocking server ?

    Any help is greatly appreciated.

    -Thanks,
    Prasen

    On Mon, Jan 25, 2010 at 8:58 AM, Mridul Muralidharan
    wrote:
    If each line from your file has to be processed by a different
    mapper
    -
    other than by writing a custom slicer, a very dirty hack would be
    to :
    a) create N number of files with one line each.
    b) Or, do something like :
    input_lines = load 'my_s3_list_file' as (location_line:chararray);
    grp_op = GROUP input_lines BY location_line PARALLEL
    $NUM_MAPPERS_REQUIRED;
    actual_result = FOREACH grp_op GENERATE MY_S3_UDF(group);


    The preferred way, as Dmitriy mentioned, would be to use a custom
    Slicer
    ofcourse !

    Regards,
    Mridul


    prasenjit mukherjee wrote:
    I want to use Pig to paralelize processing on a number of
    requests.
    There
    are ~ 300 request which needs to be processed. Each processing
    consist of
    following :
    1. Fetch file from s3 to local
    2. Do some preprocessing
    3. Put it into hdfs

    My input is a small file with 300 lines. The problem is that pig
    seems
    to
    be
    always creating a single mapper, because of which the load is not
    properly
    distributed. Any way I can enforce splitting of smaller input
    files
    as
    well
    ? Below is the pig output which tends to indicate that there is
    only
    1
    mapper. Let me know if my understanding is wrong.

    2010-01-24 05:31:53,148 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - MR plan size before optimization: 1
    2010-01-24 05:31:53,148 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - MR plan size after optimization: 1
    2010-01-24 05:31:55,006 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler
    - Setting up single store job

    Thanks
    -Prasen.
  • Ashutosh Chauhan at Jan 30, 2010 at 4:30 am
    You can set it through Pig as well as you have mentioned. Advantage is
    that instead of setting permanently to high value through
    hadoop-site.xml (which will then affect all subsequent hadoop jobs of
    your cluster) through Pig you can set it on per job basis.

    Ashutosh

    On Wed, Jan 27, 2010 at 21:55, prasenjit mukherjee
    wrote:
    Not sure I understand. Are you saying that pig takes -D<> parameters
    directly. Will the following  work :

    "pig -Dmapred.task.timeout=0 -f myfile.pig"

    On Thu, Jan 28, 2010 at 11:08 AM, Amogh Vasekar wrote:

    Hi,
    You should be able to pass this as a cmd line argument using -D ... If you
    want to change it for all jobs on your own cluster, it would be in
    mapred-site.

    Amogh


    On 1/28/10 11:03 AM, "prasenjit mukherjee" <pmukherjee@quattrowireless.com>
    wrote:

    Thanks Amogh for your quick response. Changing this property only on
    master's hadoop-site.xml will do or I need to do it on all the slaves as
    well ?

    Any way I can do this from PIG ( or I guess I am asking too much here :) )

    On Thu, Jan 28, 2010 at 10:57 AM, Amogh Vasekar <amogh@yahoo-inc.com>
    wrote:
    Yes, parameter is mapred.task.timeout in mS.
    You can also update status / output to stdout after some time chunks to
    avoid this :)

    Amogh


    On 1/28/10 10:52 AM, "prasenjit mukherjee" <
    pmukherjee@quattrowireless.com>
    wrote:

    Now I see. The tasks are failing with the following error message :

    *Task attempt_201001272359_0001_r_000000_0 failed to report status for 600
    seconds. Killing!*

    Looks like hadoop kills/restarts  jobs which takes more than 600 seconds.
    Is
    there any way I can increase it to some very high number  ?

    -Thanks,
    Prasenjit



    On Tue, Jan 26, 2010 at 9:55 PM, Dmitriy Ryaboy <dvryaboy@gmail.com>
    wrote:
    Do you know why the jobs are failing? Take a look at the logs. I
    suspect it may be due to s3, not hadoop.

    -D

    On Tue, Jan 26, 2010 at 7:57 AM, prasenjit mukherjee
    wrote:
    Hi Mridul,
    Thanks your approach  works fine. This is how my current pig
    script
    looks like :

    define CMD `s3fetch.py` SHIP('/root/s3fetch.py');
    r1 = LOAD '/ip/s3fetch_input_files' AS (filename:chararray);
    grp_r1 = GROUP r1 BY filename PARALLEL 5;
    r2 = FOREACH grp_r1 GENERATE FLATTEN(r1);
    r3 = STREAM r2 through CMD;
    store r3 INTO '/op/s3fetch_debug_log';

    And here is my s3fetch.py :
    for word in sys.stdin:
    word=word.rstrip()
    str='/usr/local/hadoop-0.20.0/bin/hadoop fs -cp
    s3n://<s3-credentials>@bucket/dir-name/'+word+' /ip/data/.';
    sys.stdout.write('\n\n'+word+ ':\t'+str+'\n')
    (input_str,out_err) = os.popen4(str);
    for line in out_err.readlines():
    sys.stdout.write('\t'+word+'::\t'+line)



    So, the job starts fine and I see that my hadoop directory (
    /ip/data/.)
    starts filling up with s3 files. But after sometime it gets stuck. I
    see
    lots of failed/restarted jobs  in the jobtracker. And the number of
    files
    dont increase in /ip/data.

    Could this be happening because of parallel hdfs writes ( via hadoop
    fs
    -cp
    <> <> ) making primary-name-node a blocking server ?

    Any help is greatly appreciated.

    -Thanks,
    Prasen

    On Mon, Jan 25, 2010 at 8:58 AM, Mridul Muralidharan
    wrote:
    If each line from your file has to be processed by a different
    mapper
    -
    other than by writing a custom slicer, a very dirty hack would be to
    :
    a) create N number of files with one line each.
    b) Or, do something like :
    input_lines = load 'my_s3_list_file' as (location_line:chararray);
    grp_op = GROUP input_lines BY location_line PARALLEL
    $NUM_MAPPERS_REQUIRED;
    actual_result = FOREACH grp_op GENERATE MY_S3_UDF(group);


    The preferred way, as Dmitriy mentioned, would be to use a custom
    Slicer
    ofcourse !

    Regards,
    Mridul


    prasenjit mukherjee wrote:
    I want to use Pig to paralelize processing on a number of
    requests.
    There
    are ~ 300 request which needs to be  processed. Each processing
    consist of
    following :
    1. Fetch file from s3 to local
    2. Do some preprocessing
    3. Put it into hdfs

    My input is a small file with 300 lines. The problem is that pig
    seems
    to
    be
    always creating a single mapper, because of which the load is not
    properly
    distributed. Any way I can enforce splitting of smaller input files
    as
    well
    ? Below is the pig output which tends to indicate that there is
    only
    1
    mapper. Let me know if my understanding is wrong.

    2010-01-24 05:31:53,148 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - MR plan size before optimization: 1
    2010-01-24 05:31:53,148 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
    - MR plan size after optimization: 1
    2010-01-24 05:31:55,006 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler
    - Setting up single store job

    Thanks
    -Prasen.

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupuser @
categoriespig, hadoop
postedJan 26, '10 at 3:58p
activeJan 30, '10 at 4:30a
posts9
users4
websitepig.apache.org

People

Translate

site design / logo © 2022 Grokbase