Grokbase Groups Pig user March 2010
FAQ
hi,

I've got a long running daemon application that periodically kicks of Pig
jobs via quartz (Pig version 0.4.0). It uses a wrapper class that initilizes
an instance of PigServer before parsing and executing a pig script. As
implemented, the app would leak memory and after a while jobs would fail to
run with messages like this appearing in the logs:

[Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low memory
handler called

To fix the issue, I created an instance of PigServer at application
initialization and I re-use that instance for all jobs for the life of the
daemon. Problem solved.

So my question is, is this a bug in PigServer that it leaks memory when
multiple instances are created, or is that just improper use of the class?

thanks,
Bill

Search Discussions

  • Bill Graham at Mar 9, 2010 at 7:43 pm
    Actually, upon closer investigation, re-using PigServer isn't working as
    well as I thought. I'm digging into the issue.

    To step back a bit though, I want to pose a different question: What is the
    intended usage of PigServer and PigContext w.r.t. it's scope? Should a new
    instance of each be used for every job or is one or the other intended for
    re-use throughout the lifecycle of the VM instance?

    Digging into the code of PigServer it seems like it's intended to be used
    for a single script's execution only, but it's not entirely clear if that's
    the case.


    On Tue, Mar 9, 2010 at 9:29 AM, Bill Graham wrote:

    hi,

    I've got a long running daemon application that periodically kicks of Pig
    jobs via quartz (Pig version 0.4.0). It uses a wrapper class that initilizes
    an instance of PigServer before parsing and executing a pig script. As
    implemented, the app would leak memory and after a while jobs would fail to
    run with messages like this appearing in the logs:

    [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low memory
    handler called

    To fix the issue, I created an instance of PigServer at application
    initialization and I re-use that instance for all jobs for the life of the
    daemon. Problem solved.

    So my question is, is this a bug in PigServer that it leaks memory when
    multiple instances are created, or is that just improper use of the class?

    thanks,
    Bill
  • Ashutosh Chauhan at Mar 10, 2010 at 5:14 am
    PigServer maintains some static state and in current implementation it
    is not safe to reuse it across different queries. You should create a
    new instance for every query.

    As for memory leak : Are you running exactly same query over same
    dataset repeatedly ? If yes and you run out of memory then there is
    memory leak somewhere. But, I doubt thats what you are doing. More
    likely, the message you are seeing has nothing to do with PigServer
    and is because of the query and/or dataset. That is your query may not
    be taking advantages of optimizations in pig. When you see that
    message, run same query using grunt or bin/pig and you should see same
    messages. Send the query you are firing, there might be way to
    optimize it and to avoid those messages.

    Hope that helps,
    Ashutosh
    On Tue, Mar 9, 2010 at 11:42, Bill Graham wrote:
    Actually, upon closer investigation, re-using PigServer isn't working as
    well as I thought. I'm digging into the issue.

    To step back a bit though, I want to pose a different question: What is the
    intended usage of PigServer and PigContext w.r.t. it's scope? Should a new
    instance of each be used for every job or is one or the other intended for
    re-use throughout the lifecycle of the VM instance?

    Digging into the code of PigServer it seems like it's intended to be used
    for a single script's execution only, but it's not entirely clear if that's
    the case.


    On Tue, Mar 9, 2010 at 9:29 AM, Bill Graham wrote:

    hi,

    I've got a long running daemon application that periodically kicks of Pig
    jobs via quartz (Pig version 0.4.0). It uses a wrapper class that initilizes
    an instance of PigServer before parsing and executing a pig script. As
    implemented, the app would leak memory and after a while jobs would fail to
    run with messages like this appearing in the logs:

    [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low memory
    handler called

    To fix the issue, I created an instance of PigServer at application
    initialization and I re-use that instance for all jobs for the life of the
    daemon. Problem solved.

    So my question is, is this a bug in PigServer that it leaks memory when
    multiple instances are created, or is that just improper use of the class?

    thanks,
    Bill
  • Ashutosh Chauhan at Mar 10, 2010 at 6:16 pm
    Posting for Bill.


    ---------- Forwarded message ----------
    From: Bill Graham <billgraham@gmail.com>
    Date: Wed, Mar 10, 2010 at 10:11
    Subject: Re: PigServer memory leak
    To: ashutosh.chauhan@gmail.com


    Thanks for the reply, Ashutosh.

    [hadoop.apache.org keeps flagging my reply as spam, so I'm replying
    directly to you. Feel free to push this conversation back onto the
    list, if you can. :)]

    I'm running the same two scripts, one after the other, every 5
    minutes. The scripts have dynamic tokens substituted to change the
    input and output directories. Besides that, they have the same logic.

    I will try to execute the script from grunt next time it happens, but
    I don't see how a lack of pig MR optimizations could cause a memory
    issue on the client? If I bounce my daemon, the next jobs to run
    executes without a problem upon start, so I would also expect a script
    run through grunt at that time to run without a problem as well.

    I reverted back to re-initializing PigServer for every run. I have
    other places in my scheduled workflow where I interact with HDFS which
    I've now modified to re-use an instance of Hadoop's Configuration
    object for the life of the VM. I was re-initializing that many times
    per run. Looking at the Configuration code it seems to re-parse the
    XML configs into a DOM every time it's called, so this certainly looks
    like a place for a potential leak. If nothing else it should give me
    an optimization. Configuration seems to be stateless and read-only
    after initiation so this seems safe.

    Anyway, here are my two scripts. The first generates summaries, the
    second makes a report from the summaries and they run in separate
    PigServer instances via registerQuery(..). Let me know if you see
    anything that seems off:


    define chukwaLoader org.apache.hadoop.chukwa.
    ChukwaStorage();
    define tokenize     com.foo.hadoop.mapreduce.pig.udf.TOKENIZE();
    define regexMatch   com.foo.hadoop.mapreduce.pig.udf.REGEX_MATCH();
    define timePeriod   org.apache.hadoop.chukwa.TimePartition('@TIME_PERIOD@');

    raw = LOAD '@HADOOP_INPUT_LOCATION@'
    USING chukwaLoader AS (ts: long, fields);
    bodies = FOREACH raw GENERATE tokenize((chararray)fields#'body') as
    tokens, timePeriod(ts) as time;

    -- pull values out of the URL
    tokens1 = FOREACH bodies GENERATE
    (int)regexMatch($0.token4, '(?:[?&])ptId=([^&]*)', 1) as pageTypeId,
    (int)regexMatch($0.token4, '(?:[?&])sId=([^&]*)', 1) as siteId,
    (int)regexMatch($0.token4, '(?:[?&])aId=([^&]*)', 1) as assetId, time,
    regexMatch($0.token4, '(?:[?&])tag=([^&]*)', 1) as tagValue;

    -- filter out entries without an assetId
    tokens2 = FILTER tokens1 BY
    (assetId is not null) AND (pageTypeId is not null) AND (siteId is not null);

    -- group by tagValue, time, assetId and flatten to get counts
    grouped = GROUP tokens2 BY (tagValue, time, assetId, pageTypeId, siteId);
    flattened = FOREACH grouped GENERATE
    FLATTEN(group) as (tagValue, time, assetId, pageTypeId, siteId),
    COUNT(tokens2) as count;

    shifted = FOREACH flattened GENERATE time, count, assetId, pageTypeId,
    siteId, tagValue;

    -- order and store
    ordered = ORDER shifted BY tagValue ASC, count DESC, assetId DESC,
    pageTypeId ASC, siteId ASC, time DESC;
    STORE ordered INTO '@HADOOP_OUTPUT_LOCATION@';





    raw = LOAD '@HADOOP_INPUT_LOCATION@' USING PigStorage('\t') AS
    (ts: long, count: int, assetId: int, pageTypeId: chararray,
    siteId: int, tagValue: chararray);

    -- now store most popular overall - filtered by pageTypeId
    most_popular_filtered = FILTER raw BY
    (siteId == 162) AND (pageTypeId matches
    '(2100)|(1606)|(1801)|(2300)|(2718)');
    most_popular = GROUP most_popular_filtered BY (ts, assetId, pageTypeId);
    most_popular_flattened = FOREACH most_popular GENERATE
    FLATTEN(group) as (ts, assetId, pageTypeId),
    SUM(most_popular_filtered.count) as count;
    most_popular_shifted = FOREACH most_popular_flattened
    GENERATE ts, count, assetId, (int)pageTypeId;

    most_popular_ordered = ORDER most_popular_shifted
    BY ts DESC, count DESC, assetId ASC, pageTypeId ASC;
    STORE most_popular_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_overall/';

    -- now store most popular by pagetype - filtered by pageTypeId
    STORE most_popular_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype_temp' USING
    com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype/', '3', 'none', '\t');

    -- now store most popular by tags - not filtered by pageTypeId
    most_popular_by_tag_filtered = FILTER raw BY
    (siteId == 162) AND (tagValue is not null);
    most_popular_by_tag = GROUP most_popular_by_tag_filtered BY (ts,
    assetId, pageTypeId, tagValue);
    most_popular_flattened = FOREACH most_popular_by_tag
    GENERATE FLATTEN(group) as (ts, assetId, pageTypeId, tagValue),
    SUM(most_popular_by_tag_filtered.count) as count;
    most_popular_by_tag_shifted = FOREACH most_popular_flattened
    GENERATE ts, count, assetId, (int)pageTypeId, tagValue;
    most_popular_by_tag_ordered = ORDER most_popular_by_tag_shifted
    BY ts DESC, tagValue ASC, count DESC, assetId ASC, pageTypeId ASC;
    STORE most_popular_by_tag_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag_temp' USING
    com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag/', '4', 'none', '\t');

    On Tue, Mar 9, 2010 at 9:13 PM, Ashutosh Chauhan
    wrote:
    PigServer maintains some static state and in current implementation it
    is not safe to reuse it across different queries. You should create a
    new instance for every query.

    As for memory leak : Are you running exactly same query over same
    dataset repeatedly ? If yes and you run out of memory then there is
    memory leak somewhere. But, I doubt thats what you are doing. More
    likely, the message you are seeing has nothing to do with PigServer
    and is because of the query and/or dataset. That is your query may not
    be taking advantages of optimizations in pig. When you see that
    message, run same query using grunt or bin/pig and you should see same
    messages. Send the query you are firing, there might be way to
    optimize it and to avoid those messages.

    Hope that helps,
    Ashutosh
    On Tue, Mar 9, 2010 at 11:42, Bill Graham wrote:
    Actually, upon closer investigation, re-using PigServer isn't working as
    well as I thought. I'm digging into the issue.

    To step back a bit though, I want to pose a different question: What is the
    intended usage of PigServer and PigContext w.r.t. it's scope? Should a new
    instance of each be used for every job or is one or the other intended for
    re-use throughout the lifecycle of the VM instance?

    Digging into the code of PigServer it seems like it's intended to be used
    for a single script's execution only, but it's not entirely clear if that's
    the case.


    On Tue, Mar 9, 2010 at 9:29 AM, Bill Graham wrote:

    hi,

    I've got a long running daemon application that periodically kicks of Pig
    jobs via quartz (Pig version 0.4.0). It uses a wrapper class that initilizes
    an instance of PigServer before parsing and executing a pig script. As
    implemented, the app would leak memory and after a while jobs would fail to
    run with messages like this appearing in the logs:

    [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low memory
    handler called

    To fix the issue, I created an instance of PigServer at application
    initialization and I re-use that instance for all jobs for the life of the
    daemon. Problem solved.

    So my question is, is this a bug in PigServer that it leaks memory when
    multiple instances are created, or is that just improper use of the class?

    thanks,
    Bill
  • Ashutosh Chauhan at Mar 10, 2010 at 6:56 pm
    [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low memory
    handler called

    Are you seeing this warning on client side, in pig logs? If so, then
    are you sure your job is actually running on real hadoop cluster.
    Because these logs should appear in task-tracker logs not in client
    logs. This may imply that you job is getting executed locally in local
    mode and not actually submitted to cluster. Look for the very first
    lines in the client logs, where Pig tries to connect to the cluster.
    See, if its successful in doing so.



    On Wed, Mar 10, 2010 at 10:15, Ashutosh Chauhan
    wrote:
    Posting for Bill.


    ---------- Forwarded message ----------
    From: Bill Graham <billgraham@gmail.com>
    Date: Wed, Mar 10, 2010 at 10:11
    Subject: Re: PigServer memory leak
    To: ashutosh.chauhan@gmail.com


    Thanks for the reply, Ashutosh.

    [hadoop.apache.org keeps flagging my reply as spam, so I'm replying
    directly to you. Feel free to push this conversation back onto the
    list, if you can. :)]

    I'm running the same two scripts, one after the other, every 5
    minutes. The scripts have dynamic tokens substituted to change the
    input and output directories. Besides that, they have the same logic.

    I will try to execute the script from grunt next time it happens, but
    I don't see how a lack of pig MR optimizations could cause a memory
    issue on the client? If I bounce my daemon, the next jobs to run
    executes without a problem upon start, so I would also expect a script
    run through grunt at that time to run without a problem as well.

    I reverted back to re-initializing PigServer for every run. I have
    other places in my scheduled workflow where I interact with HDFS which
    I've now modified to re-use an instance of Hadoop's Configuration
    object for the life of the VM. I was re-initializing that many times
    per run. Looking at the Configuration code it seems to re-parse the
    XML configs into a DOM every time it's called, so this certainly looks
    like a place for a potential leak. If nothing else it should give me
    an optimization. Configuration seems to be stateless and read-only
    after initiation so this seems safe.

    Anyway, here are my two scripts. The first generates summaries, the
    second makes a report from the summaries and they run in separate
    PigServer instances via registerQuery(..). Let me know if you see
    anything that seems off:


    define chukwaLoader org.apache.hadoop.chukwa.
    ChukwaStorage();
    define tokenize     com.foo.hadoop.mapreduce.pig.udf.TOKENIZE();
    define regexMatch   com.foo.hadoop.mapreduce.pig.udf.REGEX_MATCH();
    define timePeriod   org.apache.hadoop.chukwa.TimePartition('@TIME_PERIOD@');

    raw = LOAD '@HADOOP_INPUT_LOCATION@'
    USING chukwaLoader AS (ts: long, fields);
    bodies = FOREACH raw GENERATE tokenize((chararray)fields#'body') as
    tokens, timePeriod(ts) as time;

    -- pull values out of the URL
    tokens1 = FOREACH bodies GENERATE
    (int)regexMatch($0.token4, '(?:[?&])ptId=([^&]*)', 1) as pageTypeId,
    (int)regexMatch($0.token4, '(?:[?&])sId=([^&]*)', 1) as siteId,
    (int)regexMatch($0.token4, '(?:[?&])aId=([^&]*)', 1) as assetId, time,
    regexMatch($0.token4, '(?:[?&])tag=([^&]*)', 1) as tagValue;

    -- filter out entries without an assetId
    tokens2 = FILTER tokens1 BY
    (assetId is not null) AND (pageTypeId is not null) AND (siteId is not null);

    -- group by tagValue, time, assetId and flatten to get counts
    grouped = GROUP tokens2 BY (tagValue, time, assetId, pageTypeId, siteId);
    flattened = FOREACH grouped GENERATE
    FLATTEN(group) as (tagValue, time, assetId, pageTypeId, siteId),
    COUNT(tokens2) as count;

    shifted = FOREACH flattened GENERATE time, count, assetId, pageTypeId,
    siteId, tagValue;

    -- order and store
    ordered = ORDER shifted BY tagValue ASC, count DESC, assetId DESC,
    pageTypeId ASC, siteId ASC, time DESC;
    STORE ordered INTO '@HADOOP_OUTPUT_LOCATION@';





    raw = LOAD '@HADOOP_INPUT_LOCATION@' USING PigStorage('\t') AS
    (ts: long, count: int, assetId: int, pageTypeId: chararray,
    siteId: int, tagValue: chararray);

    -- now store most popular overall - filtered by pageTypeId
    most_popular_filtered = FILTER raw BY
    (siteId == 162) AND (pageTypeId matches
    '(2100)|(1606)|(1801)|(2300)|(2718)');
    most_popular = GROUP most_popular_filtered BY (ts, assetId, pageTypeId);
    most_popular_flattened = FOREACH most_popular GENERATE
    FLATTEN(group) as (ts, assetId, pageTypeId),
    SUM(most_popular_filtered.count) as count;
    most_popular_shifted = FOREACH most_popular_flattened
    GENERATE ts, count, assetId, (int)pageTypeId;

    most_popular_ordered = ORDER most_popular_shifted
    BY ts DESC, count DESC, assetId ASC, pageTypeId ASC;
    STORE most_popular_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_overall/';

    -- now store most popular by pagetype - filtered by pageTypeId
    STORE most_popular_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype_temp' USING
    com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype/', '3', 'none', '\t');

    -- now store most popular by tags - not filtered by pageTypeId
    most_popular_by_tag_filtered = FILTER raw BY
    (siteId == 162) AND (tagValue is not null);
    most_popular_by_tag = GROUP most_popular_by_tag_filtered BY (ts,
    assetId, pageTypeId, tagValue);
    most_popular_flattened = FOREACH most_popular_by_tag
    GENERATE FLATTEN(group) as (ts, assetId, pageTypeId, tagValue),
    SUM(most_popular_by_tag_filtered.count) as count;
    most_popular_by_tag_shifted = FOREACH most_popular_flattened
    GENERATE ts, count, assetId, (int)pageTypeId, tagValue;
    most_popular_by_tag_ordered = ORDER most_popular_by_tag_shifted
    BY ts DESC, tagValue ASC, count DESC, assetId ASC, pageTypeId ASC;
    STORE most_popular_by_tag_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag_temp' USING
    com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag/', '4', 'none', '\t');

    On Tue, Mar 9, 2010 at 9:13 PM, Ashutosh Chauhan
    wrote:
    PigServer maintains some static state and in current implementation it
    is not safe to reuse it across different queries. You should create a
    new instance for every query.

    As for memory leak : Are you running exactly same query over same
    dataset repeatedly ? If yes and you run out of memory then there is
    memory leak somewhere. But, I doubt thats what you are doing. More
    likely, the message you are seeing has nothing to do with PigServer
    and is because of the query and/or dataset. That is your query may not
    be taking advantages of optimizations in pig. When you see that
    message, run same query using grunt or bin/pig and you should see same
    messages. Send the query you are firing, there might be way to
    optimize it and to avoid those messages.

    Hope that helps,
    Ashutosh
    On Tue, Mar 9, 2010 at 11:42, Bill Graham wrote:
    Actually, upon closer investigation, re-using PigServer isn't working as
    well as I thought. I'm digging into the issue.

    To step back a bit though, I want to pose a different question: What is the
    intended usage of PigServer and PigContext w.r.t. it's scope? Should a new
    instance of each be used for every job or is one or the other intended for
    re-use throughout the lifecycle of the VM instance?

    Digging into the code of PigServer it seems like it's intended to be used
    for a single script's execution only, but it's not entirely clear if that's
    the case.


    On Tue, Mar 9, 2010 at 9:29 AM, Bill Graham wrote:

    hi,

    I've got a long running daemon application that periodically kicks of Pig
    jobs via quartz (Pig version 0.4.0). It uses a wrapper class that initilizes
    an instance of PigServer before parsing and executing a pig script. As
    implemented, the app would leak memory and after a while jobs would fail to
    run with messages like this appearing in the logs:

    [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low memory
    handler called

    To fix the issue, I created an instance of PigServer at application
    initialization and I re-use that instance for all jobs for the life of the
    daemon. Problem solved.

    So my question is, is this a bug in PigServer that it leaks memory when
    multiple instances are created, or is that just improper use of the class?

    thanks,
    Bill
  • Bill Graham at Mar 10, 2010 at 7:15 pm
    Yes, these errors appear in the Pig client and the jobs are definitely being
    executed on the cluster. I can see the data in HDFS and the jobs in the
    JobTracker UI of the cluster.
    On Wed, Mar 10, 2010 at 10:54 AM, Ashutosh Chauhan wrote:

    [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low memory
    handler called

    Are you seeing this warning on client side, in pig logs? If so, then
    are you sure your job is actually running on real hadoop cluster.
    Because these logs should appear in task-tracker logs not in client
    logs. This may imply that you job is getting executed locally in local
    mode and not actually submitted to cluster. Look for the very first
    lines in the client logs, where Pig tries to connect to the cluster.
    See, if its successful in doing so.



    On Wed, Mar 10, 2010 at 10:15, Ashutosh Chauhan
    wrote:
    Posting for Bill.


    ---------- Forwarded message ----------
    From: Bill Graham <billgraham@gmail.com>
    Date: Wed, Mar 10, 2010 at 10:11
    Subject: Re: PigServer memory leak
    To: ashutosh.chauhan@gmail.com


    Thanks for the reply, Ashutosh.

    [hadoop.apache.org keeps flagging my reply as spam, so I'm replying
    directly to you. Feel free to push this conversation back onto the
    list, if you can. :)]

    I'm running the same two scripts, one after the other, every 5
    minutes. The scripts have dynamic tokens substituted to change the
    input and output directories. Besides that, they have the same logic.

    I will try to execute the script from grunt next time it happens, but
    I don't see how a lack of pig MR optimizations could cause a memory
    issue on the client? If I bounce my daemon, the next jobs to run
    executes without a problem upon start, so I would also expect a script
    run through grunt at that time to run without a problem as well.

    I reverted back to re-initializing PigServer for every run. I have
    other places in my scheduled workflow where I interact with HDFS which
    I've now modified to re-use an instance of Hadoop's Configuration
    object for the life of the VM. I was re-initializing that many times
    per run. Looking at the Configuration code it seems to re-parse the
    XML configs into a DOM every time it's called, so this certainly looks
    like a place for a potential leak. If nothing else it should give me
    an optimization. Configuration seems to be stateless and read-only
    after initiation so this seems safe.

    Anyway, here are my two scripts. The first generates summaries, the
    second makes a report from the summaries and they run in separate
    PigServer instances via registerQuery(..). Let me know if you see
    anything that seems off:


    define chukwaLoader org.apache.hadoop.chukwa.
    ChukwaStorage();
    define tokenize com.foo.hadoop.mapreduce.pig.udf.TOKENIZE();
    define regexMatch com.foo.hadoop.mapreduce.pig.udf.REGEX_MATCH();
    define timePeriod org.apache.hadoop.chukwa.TimePartition('@TIME_PERIOD@ ');
    raw = LOAD '@HADOOP_INPUT_LOCATION@'
    USING chukwaLoader AS (ts: long, fields);
    bodies = FOREACH raw GENERATE tokenize((chararray)fields#'body') as
    tokens, timePeriod(ts) as time;

    -- pull values out of the URL
    tokens1 = FOREACH bodies GENERATE
    (int)regexMatch($0.token4, '(?:[?&])ptId=([^&]*)', 1) as
    pageTypeId,
    (int)regexMatch($0.token4, '(?:[?&])sId=([^&]*)', 1) as siteId,
    (int)regexMatch($0.token4, '(?:[?&])aId=([^&]*)', 1) as assetId, time,
    regexMatch($0.token4, '(?:[?&])tag=([^&]*)', 1) as tagValue;

    -- filter out entries without an assetId
    tokens2 = FILTER tokens1 BY
    (assetId is not null) AND (pageTypeId is not null) AND (siteId is not null);
    -- group by tagValue, time, assetId and flatten to get counts
    grouped = GROUP tokens2 BY (tagValue, time, assetId, pageTypeId, siteId);
    flattened = FOREACH grouped GENERATE
    FLATTEN(group) as (tagValue, time, assetId, pageTypeId, siteId),
    COUNT(tokens2) as count;

    shifted = FOREACH flattened GENERATE time, count, assetId, pageTypeId,
    siteId, tagValue;

    -- order and store
    ordered = ORDER shifted BY tagValue ASC, count DESC, assetId DESC,
    pageTypeId ASC, siteId ASC, time DESC;
    STORE ordered INTO '@HADOOP_OUTPUT_LOCATION@';





    raw = LOAD '@HADOOP_INPUT_LOCATION@' USING PigStorage('\t') AS
    (ts: long, count: int, assetId: int, pageTypeId: chararray,
    siteId: int, tagValue: chararray);

    -- now store most popular overall - filtered by pageTypeId
    most_popular_filtered = FILTER raw BY
    (siteId == 162) AND (pageTypeId matches
    '(2100)|(1606)|(1801)|(2300)|(2718)');
    most_popular = GROUP most_popular_filtered BY (ts, assetId, pageTypeId);
    most_popular_flattened = FOREACH most_popular GENERATE
    FLATTEN(group) as (ts, assetId, pageTypeId),
    SUM(most_popular_filtered.count) as count;
    most_popular_shifted = FOREACH most_popular_flattened
    GENERATE ts, count, assetId, (int)pageTypeId;

    most_popular_ordered = ORDER most_popular_shifted
    BY ts DESC, count DESC, assetId ASC, pageTypeId ASC;
    STORE most_popular_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_overall/';

    -- now store most popular by pagetype - filtered by pageTypeId
    STORE most_popular_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype_temp' USING
    com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype/', '3', 'none', '\t');
    -- now store most popular by tags - not filtered by pageTypeId
    most_popular_by_tag_filtered = FILTER raw BY
    (siteId == 162) AND (tagValue is not null);
    most_popular_by_tag = GROUP most_popular_by_tag_filtered BY (ts,
    assetId, pageTypeId, tagValue);
    most_popular_flattened = FOREACH most_popular_by_tag
    GENERATE FLATTEN(group) as (ts, assetId, pageTypeId, tagValue),
    SUM(most_popular_by_tag_filtered.count) as count;
    most_popular_by_tag_shifted = FOREACH most_popular_flattened
    GENERATE ts, count, assetId, (int)pageTypeId, tagValue;
    most_popular_by_tag_ordered = ORDER most_popular_by_tag_shifted
    BY ts DESC, tagValue ASC, count DESC, assetId ASC, pageTypeId ASC;
    STORE most_popular_by_tag_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag_temp' USING
    com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag/', '4', 'none', '\t');
    On Tue, Mar 9, 2010 at 9:13 PM, Ashutosh Chauhan
    wrote:
    PigServer maintains some static state and in current implementation it
    is not safe to reuse it across different queries. You should create a
    new instance for every query.

    As for memory leak : Are you running exactly same query over same
    dataset repeatedly ? If yes and you run out of memory then there is
    memory leak somewhere. But, I doubt thats what you are doing. More
    likely, the message you are seeing has nothing to do with PigServer
    and is because of the query and/or dataset. That is your query may not
    be taking advantages of optimizations in pig. When you see that
    message, run same query using grunt or bin/pig and you should see same
    messages. Send the query you are firing, there might be way to
    optimize it and to avoid those messages.

    Hope that helps,
    Ashutosh
    On Tue, Mar 9, 2010 at 11:42, Bill Graham wrote:
    Actually, upon closer investigation, re-using PigServer isn't working
    as
    well as I thought. I'm digging into the issue.

    To step back a bit though, I want to pose a different question: What
    is the
    intended usage of PigServer and PigContext w.r.t. it's scope? Should a
    new
    instance of each be used for every job or is one or the other intended
    for
    re-use throughout the lifecycle of the VM instance?

    Digging into the code of PigServer it seems like it's intended to be
    used
    for a single script's execution only, but it's not entirely clear if
    that's
    the case.


    On Tue, Mar 9, 2010 at 9:29 AM, Bill Graham wrote:

    hi,

    I've got a long running daemon application that periodically kicks of
    Pig
    jobs via quartz (Pig version 0.4.0). It uses a wrapper class that
    initilizes
    an instance of PigServer before parsing and executing a pig script.
    As
    implemented, the app would leak memory and after a while jobs would
    fail to
    run with messages like this appearing in the logs:

    [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low
    memory
    handler called

    To fix the issue, I created an instance of PigServer at application
    initialization and I re-use that instance for all jobs for the life
    of the
    daemon. Problem solved.

    So my question is, is this a bug in PigServer that it leaks memory
    when
    multiple instances are created, or is that just improper use of the
    class?
    thanks,
    Bill
  • Bill Graham at Mar 19, 2010 at 5:34 pm
    I believe I've found the cause of my Pig memory leak so I wanted to report
    back. I profiled my app after letting it run for a couple of days and found
    that the static toDelete Stack in the FileLocalizer object was growing over
    time without getting flushed. I had thousands of HFile objects in that
    stack. This produced a memory leak both in my app and in HDFS.

    The fix seems straightforward enough in my app. I suspect calling
    FileLocalizer.deleteTempFiles() after each usage of PigServer for a given
    execution of a given pig script will do the trick.

    This seems to be a major gotcha though that will likely burn others. I
    suggest we add FileLocalizer.deleteTempFiles() to the shutdown() method of
    PigServer. Thoughts?

    Currently shutdown isn't doing much:

    public void shutdown() {
    // clean-up activities
    // TODO: reclaim scope to free up resources. Currently
    // this is not implemented and throws an exception
    // hence, for now, we won't call it.
    //
    // pigContext.getExecutionEngine().reclaimScope(this.scope);
    }

    thanks,
    Bill


    On Wed, Mar 10, 2010 at 12:15 PM, Bill Graham wrote:

    Yes, these errors appear in the Pig client and the jobs are definitely
    being executed on the cluster. I can see the data in HDFS and the jobs in
    the JobTracker UI of the cluster.


    On Wed, Mar 10, 2010 at 10:54 AM, Ashutosh Chauhan <
    ashutosh.chauhan@gmail.com> wrote:
    [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low memory
    handler called

    Are you seeing this warning on client side, in pig logs? If so, then
    are you sure your job is actually running on real hadoop cluster.
    Because these logs should appear in task-tracker logs not in client
    logs. This may imply that you job is getting executed locally in local
    mode and not actually submitted to cluster. Look for the very first
    lines in the client logs, where Pig tries to connect to the cluster.
    See, if its successful in doing so.



    On Wed, Mar 10, 2010 at 10:15, Ashutosh Chauhan
    wrote:
    Posting for Bill.


    ---------- Forwarded message ----------
    From: Bill Graham <billgraham@gmail.com>
    Date: Wed, Mar 10, 2010 at 10:11
    Subject: Re: PigServer memory leak
    To: ashutosh.chauhan@gmail.com


    Thanks for the reply, Ashutosh.

    [hadoop.apache.org keeps flagging my reply as spam, so I'm replying
    directly to you. Feel free to push this conversation back onto the
    list, if you can. :)]

    I'm running the same two scripts, one after the other, every 5
    minutes. The scripts have dynamic tokens substituted to change the
    input and output directories. Besides that, they have the same logic.

    I will try to execute the script from grunt next time it happens, but
    I don't see how a lack of pig MR optimizations could cause a memory
    issue on the client? If I bounce my daemon, the next jobs to run
    executes without a problem upon start, so I would also expect a script
    run through grunt at that time to run without a problem as well.

    I reverted back to re-initializing PigServer for every run. I have
    other places in my scheduled workflow where I interact with HDFS which
    I've now modified to re-use an instance of Hadoop's Configuration
    object for the life of the VM. I was re-initializing that many times
    per run. Looking at the Configuration code it seems to re-parse the
    XML configs into a DOM every time it's called, so this certainly looks
    like a place for a potential leak. If nothing else it should give me
    an optimization. Configuration seems to be stateless and read-only
    after initiation so this seems safe.

    Anyway, here are my two scripts. The first generates summaries, the
    second makes a report from the summaries and they run in separate
    PigServer instances via registerQuery(..). Let me know if you see
    anything that seems off:


    define chukwaLoader org.apache.hadoop.chukwa.
    ChukwaStorage();
    define tokenize com.foo.hadoop.mapreduce.pig.udf.TOKENIZE();
    define regexMatch com.foo.hadoop.mapreduce.pig.udf.REGEX_MATCH();
    define timePeriod
    org.apache.hadoop.chukwa.TimePartition('@TIME_PERIOD@');
    raw = LOAD '@HADOOP_INPUT_LOCATION@'
    USING chukwaLoader AS (ts: long, fields);
    bodies = FOREACH raw GENERATE tokenize((chararray)fields#'body') as
    tokens, timePeriod(ts) as time;

    -- pull values out of the URL
    tokens1 = FOREACH bodies GENERATE
    (int)regexMatch($0.token4, '(?:[?&])ptId=([^&]*)', 1) as
    pageTypeId,
    (int)regexMatch($0.token4, '(?:[?&])sId=([^&]*)', 1) as siteId,
    (int)regexMatch($0.token4, '(?:[?&])aId=([^&]*)', 1) as assetId, time,
    regexMatch($0.token4, '(?:[?&])tag=([^&]*)', 1) as tagValue;

    -- filter out entries without an assetId
    tokens2 = FILTER tokens1 BY
    (assetId is not null) AND (pageTypeId is not null) AND (siteId is
    not null);
    -- group by tagValue, time, assetId and flatten to get counts
    grouped = GROUP tokens2 BY (tagValue, time, assetId, pageTypeId, siteId);
    flattened = FOREACH grouped GENERATE
    FLATTEN(group) as (tagValue, time, assetId, pageTypeId, siteId),
    COUNT(tokens2) as count;

    shifted = FOREACH flattened GENERATE time, count, assetId, pageTypeId,
    siteId, tagValue;

    -- order and store
    ordered = ORDER shifted BY tagValue ASC, count DESC, assetId DESC,
    pageTypeId ASC, siteId ASC, time DESC;
    STORE ordered INTO '@HADOOP_OUTPUT_LOCATION@';





    raw = LOAD '@HADOOP_INPUT_LOCATION@' USING PigStorage('\t') AS
    (ts: long, count: int, assetId: int, pageTypeId: chararray,
    siteId: int, tagValue: chararray);

    -- now store most popular overall - filtered by pageTypeId
    most_popular_filtered = FILTER raw BY
    (siteId == 162) AND (pageTypeId matches
    '(2100)|(1606)|(1801)|(2300)|(2718)');
    most_popular = GROUP most_popular_filtered BY (ts, assetId, pageTypeId);
    most_popular_flattened = FOREACH most_popular GENERATE
    FLATTEN(group) as (ts, assetId, pageTypeId),
    SUM(most_popular_filtered.count) as count;
    most_popular_shifted = FOREACH most_popular_flattened
    GENERATE ts, count, assetId, (int)pageTypeId;

    most_popular_ordered = ORDER most_popular_shifted
    BY ts DESC, count DESC, assetId ASC, pageTypeId ASC;
    STORE most_popular_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_overall/';

    -- now store most popular by pagetype - filtered by pageTypeId
    STORE most_popular_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype_temp' USING
    com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype/', '3',
    'none', '\t');
    -- now store most popular by tags - not filtered by pageTypeId
    most_popular_by_tag_filtered = FILTER raw BY
    (siteId == 162) AND (tagValue is not null);
    most_popular_by_tag = GROUP most_popular_by_tag_filtered BY (ts,
    assetId, pageTypeId, tagValue);
    most_popular_flattened = FOREACH most_popular_by_tag
    GENERATE FLATTEN(group) as (ts, assetId, pageTypeId, tagValue),
    SUM(most_popular_by_tag_filtered.count) as count;
    most_popular_by_tag_shifted = FOREACH most_popular_flattened
    GENERATE ts, count, assetId, (int)pageTypeId, tagValue;
    most_popular_by_tag_ordered = ORDER most_popular_by_tag_shifted
    BY ts DESC, tagValue ASC, count DESC, assetId ASC, pageTypeId ASC;
    STORE most_popular_by_tag_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag_temp' USING
    com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag/', '4', 'none', '\t');
    On Tue, Mar 9, 2010 at 9:13 PM, Ashutosh Chauhan
    wrote:
    PigServer maintains some static state and in current implementation it
    is not safe to reuse it across different queries. You should create a
    new instance for every query.

    As for memory leak : Are you running exactly same query over same
    dataset repeatedly ? If yes and you run out of memory then there is
    memory leak somewhere. But, I doubt thats what you are doing. More
    likely, the message you are seeing has nothing to do with PigServer
    and is because of the query and/or dataset. That is your query may not
    be taking advantages of optimizations in pig. When you see that
    message, run same query using grunt or bin/pig and you should see same
    messages. Send the query you are firing, there might be way to
    optimize it and to avoid those messages.

    Hope that helps,
    Ashutosh

    On Tue, Mar 9, 2010 at 11:42, Bill Graham <billgraham@gmail.com>
    wrote:
    Actually, upon closer investigation, re-using PigServer isn't working
    as
    well as I thought. I'm digging into the issue.

    To step back a bit though, I want to pose a different question: What
    is the
    intended usage of PigServer and PigContext w.r.t. it's scope? Should
    a new
    instance of each be used for every job or is one or the other
    intended for
    re-use throughout the lifecycle of the VM instance?

    Digging into the code of PigServer it seems like it's intended to be
    used
    for a single script's execution only, but it's not entirely clear if
    that's
    the case.



    On Tue, Mar 9, 2010 at 9:29 AM, Bill Graham <billgraham@gmail.com>
    wrote:
    hi,

    I've got a long running daemon application that periodically kicks
    of Pig
    jobs via quartz (Pig version 0.4.0). It uses a wrapper class that
    initilizes
    an instance of PigServer before parsing and executing a pig script.
    As
    implemented, the app would leak memory and after a while jobs would
    fail to
    run with messages like this appearing in the logs:

    [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low
    memory
    handler called

    To fix the issue, I created an instance of PigServer at application
    initialization and I re-use that instance for all jobs for the life
    of the
    daemon. Problem solved.

    So my question is, is this a bug in PigServer that it leaks memory
    when
    multiple instances are created, or is that just improper use of the
    class?
    thanks,
    Bill
  • Ashutosh Chauhan at Mar 19, 2010 at 8:06 pm
    Great work, Bill in chasing this down. Can you open a jira putting all
    this info + any additional info you have on this issue. If you can
    also put script and steps to reproduce the memory leak, that will be
    awesome.

    Ashutosh
    On Fri, Mar 19, 2010 at 10:34, Bill Graham wrote:
    I believe I've found the cause of my Pig memory leak so I wanted to report
    back. I profiled my app after letting it run for a couple of days and found
    that the static toDelete Stack in the FileLocalizer object was growing over
    time without getting flushed. I had thousands of HFile objects in that
    stack. This produced a memory leak both in my app and in HDFS.

    The fix seems straightforward enough in my app. I suspect calling
    FileLocalizer.deleteTempFiles() after each usage of PigServer for a given
    execution of a given pig script will do the trick.

    This seems to be a major gotcha though that will likely burn others. I
    suggest we add FileLocalizer.deleteTempFiles() to the shutdown() method of
    PigServer. Thoughts?

    Currently shutdown isn't doing much:

    public void shutdown() {
    // clean-up activities
    // TODO: reclaim scope to free up resources. Currently
    // this is not implemented and throws an exception
    // hence, for now, we won't call it.
    //
    // pigContext.getExecutionEngine().reclaimScope(this.scope);
    }

    thanks,
    Bill


    On Wed, Mar 10, 2010 at 12:15 PM, Bill Graham wrote:

    Yes, these errors appear in the Pig client and the jobs are definitely
    being executed on the cluster. I can see the data in HDFS and the jobs in
    the JobTracker UI of the cluster.


    On Wed, Mar 10, 2010 at 10:54 AM, Ashutosh Chauhan <
    ashutosh.chauhan@gmail.com> wrote:
    [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low memory
    handler called

    Are you seeing this warning on client side, in pig logs? If so, then
    are you sure your job is actually running on real hadoop cluster.
    Because these logs should appear in task-tracker logs not in client
    logs. This may imply that you job is getting executed locally in local
    mode and not actually submitted to cluster. Look for the very first
    lines in the client logs, where Pig tries to connect to the cluster.
    See, if its successful in doing so.



    On Wed, Mar 10, 2010 at 10:15, Ashutosh Chauhan
    wrote:
    Posting for Bill.


    ---------- Forwarded message ----------
    From: Bill Graham <billgraham@gmail.com>
    Date: Wed, Mar 10, 2010 at 10:11
    Subject: Re: PigServer memory leak
    To: ashutosh.chauhan@gmail.com


    Thanks for the reply, Ashutosh.

    [hadoop.apache.org keeps flagging my reply as spam, so I'm replying
    directly to you. Feel free to push this conversation back onto the
    list, if you can. :)]

    I'm running the same two scripts, one after the other, every 5
    minutes. The scripts have dynamic tokens substituted to change the
    input and output directories. Besides that, they have the same logic.

    I will try to execute the script from grunt next time it happens, but
    I don't see how a lack of pig MR optimizations could cause a memory
    issue on the client? If I bounce my daemon, the next jobs to run
    executes without a problem upon start, so I would also expect a script
    run through grunt at that time to run without a problem as well.

    I reverted back to re-initializing PigServer for every run. I have
    other places in my scheduled workflow where I interact with HDFS which
    I've now modified to re-use an instance of Hadoop's Configuration
    object for the life of the VM. I was re-initializing that many times
    per run. Looking at the Configuration code it seems to re-parse the
    XML configs into a DOM every time it's called, so this certainly looks
    like a place for a potential leak. If nothing else it should give me
    an optimization. Configuration seems to be stateless and read-only
    after initiation so this seems safe.

    Anyway, here are my two scripts. The first generates summaries, the
    second makes a report from the summaries and they run in separate
    PigServer instances via registerQuery(..). Let me know if you see
    anything that seems off:


    define chukwaLoader org.apache.hadoop.chukwa.
    ChukwaStorage();
    define tokenize     com.foo.hadoop.mapreduce.pig.udf.TOKENIZE();
    define regexMatch   com.foo.hadoop.mapreduce.pig.udf.REGEX_MATCH();
    define timePeriod
    org.apache.hadoop.chukwa.TimePartition('@TIME_PERIOD@');
    raw = LOAD '@HADOOP_INPUT_LOCATION@'
    USING chukwaLoader AS (ts: long, fields);
    bodies = FOREACH raw GENERATE tokenize((chararray)fields#'body') as
    tokens, timePeriod(ts) as time;

    -- pull values out of the URL
    tokens1 = FOREACH bodies GENERATE
    (int)regexMatch($0.token4, '(?:[?&])ptId=([^&]*)', 1) as
    pageTypeId,
    (int)regexMatch($0.token4, '(?:[?&])sId=([^&]*)', 1) as siteId,
    (int)regexMatch($0.token4, '(?:[?&])aId=([^&]*)', 1) as assetId, time,
    regexMatch($0.token4, '(?:[?&])tag=([^&]*)', 1) as tagValue;

    -- filter out entries without an assetId
    tokens2 = FILTER tokens1 BY
    (assetId is not null) AND (pageTypeId is not null) AND (siteId is
    not null);
    -- group by tagValue, time, assetId and flatten to get counts
    grouped = GROUP tokens2 BY (tagValue, time, assetId, pageTypeId, siteId);
    flattened = FOREACH grouped GENERATE
    FLATTEN(group) as (tagValue, time, assetId, pageTypeId, siteId),
    COUNT(tokens2) as count;

    shifted = FOREACH flattened GENERATE time, count, assetId, pageTypeId,
    siteId, tagValue;

    -- order and store
    ordered = ORDER shifted BY tagValue ASC, count DESC, assetId DESC,
    pageTypeId ASC, siteId ASC, time DESC;
    STORE ordered INTO '@HADOOP_OUTPUT_LOCATION@';





    raw = LOAD '@HADOOP_INPUT_LOCATION@' USING PigStorage('\t') AS
    (ts: long, count: int, assetId: int, pageTypeId: chararray,
    siteId: int, tagValue: chararray);

    -- now store most popular overall - filtered by pageTypeId
    most_popular_filtered = FILTER raw BY
    (siteId == 162) AND (pageTypeId matches
    '(2100)|(1606)|(1801)|(2300)|(2718)');
    most_popular = GROUP most_popular_filtered BY (ts, assetId, pageTypeId);
    most_popular_flattened = FOREACH most_popular GENERATE
    FLATTEN(group) as (ts, assetId, pageTypeId),
    SUM(most_popular_filtered.count) as count;
    most_popular_shifted = FOREACH most_popular_flattened
    GENERATE ts, count, assetId, (int)pageTypeId;

    most_popular_ordered = ORDER most_popular_shifted
    BY ts DESC, count DESC, assetId ASC, pageTypeId ASC;
    STORE most_popular_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_overall/';

    -- now store most popular by pagetype - filtered by pageTypeId
    STORE most_popular_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype_temp' USING
    com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype/', '3',
    'none', '\t');
    -- now store most popular by tags - not filtered by pageTypeId
    most_popular_by_tag_filtered = FILTER raw BY
    (siteId == 162) AND (tagValue is not null);
    most_popular_by_tag = GROUP most_popular_by_tag_filtered BY (ts,
    assetId, pageTypeId, tagValue);
    most_popular_flattened = FOREACH most_popular_by_tag
    GENERATE FLATTEN(group) as (ts, assetId, pageTypeId, tagValue),
    SUM(most_popular_by_tag_filtered.count) as count;
    most_popular_by_tag_shifted = FOREACH most_popular_flattened
    GENERATE ts, count, assetId, (int)pageTypeId, tagValue;
    most_popular_by_tag_ordered = ORDER most_popular_by_tag_shifted
    BY ts DESC, tagValue ASC, count DESC, assetId ASC, pageTypeId ASC;
    STORE most_popular_by_tag_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag_temp' USING
    com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag/', '4', 'none', '\t');
    On Tue, Mar 9, 2010 at 9:13 PM, Ashutosh Chauhan
    wrote:
    PigServer maintains some static state and in current implementation it
    is not safe to reuse it across different queries. You should create a
    new instance for every query.

    As for memory leak : Are you running exactly same query over same
    dataset repeatedly ? If yes and you run out of memory then there is
    memory leak somewhere. But, I doubt thats what you are doing. More
    likely, the message you are seeing has nothing to do with PigServer
    and is because of the query and/or dataset. That is your query may not
    be taking advantages of optimizations in pig. When you see that
    message, run same query using grunt or bin/pig and you should see same
    messages. Send the query you are firing, there might be way to
    optimize it and to avoid those messages.

    Hope that helps,
    Ashutosh

    On Tue, Mar 9, 2010 at 11:42, Bill Graham <billgraham@gmail.com>
    wrote:
    Actually, upon closer investigation, re-using PigServer isn't working
    as
    well as I thought. I'm digging into the issue.

    To step back a bit though, I want to pose a different question: What
    is the
    intended usage of PigServer and PigContext w.r.t. it's scope? Should
    a new
    instance of each be used for every job or is one or the other
    intended for
    re-use throughout the lifecycle of the VM instance?

    Digging into the code of PigServer it seems like it's intended to be
    used
    for a single script's execution only, but it's not entirely clear if
    that's
    the case.



    On Tue, Mar 9, 2010 at 9:29 AM, Bill Graham <billgraham@gmail.com>
    wrote:
    hi,

    I've got a long running daemon application that periodically kicks
    of Pig
    jobs via quartz (Pig version 0.4.0). It uses a wrapper class that
    initilizes
    an instance of PigServer before parsing and executing a pig script.
    As
    implemented, the app would leak memory and after a while jobs would
    fail to
    run with messages like this appearing in the logs:

    [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low
    memory
    handler called

    To fix the issue, I created an instance of PigServer at application
    initialization and I re-use that instance for all jobs for the life
    of the
    daemon. Problem solved.

    So my question is, is this a bug in PigServer that it leaks memory
    when
    multiple instances are created, or is that just improper use of the
    class?
    thanks,
    Bill
  • Daniel Dai at Mar 22, 2010 at 4:16 pm
    Thanks for digging into it. Can you open a Jira for that?

    Daniel

    --------------------------------------------------
    From: "Bill Graham" <billgraham@gmail.com>
    Sent: Friday, March 19, 2010 10:34 AM
    To: <pig-user@hadoop.apache.org>
    Subject: Re: PigServer memory leak
    I believe I've found the cause of my Pig memory leak so I wanted to report
    back. I profiled my app after letting it run for a couple of days and
    found
    that the static toDelete Stack in the FileLocalizer object was growing
    over
    time without getting flushed. I had thousands of HFile objects in that
    stack. This produced a memory leak both in my app and in HDFS.

    The fix seems straightforward enough in my app. I suspect calling
    FileLocalizer.deleteTempFiles() after each usage of PigServer for a given
    execution of a given pig script will do the trick.

    This seems to be a major gotcha though that will likely burn others. I
    suggest we add FileLocalizer.deleteTempFiles() to the shutdown() method of
    PigServer. Thoughts?

    Currently shutdown isn't doing much:

    public void shutdown() {
    // clean-up activities
    // TODO: reclaim scope to free up resources. Currently
    // this is not implemented and throws an exception
    // hence, for now, we won't call it.
    //
    // pigContext.getExecutionEngine().reclaimScope(this.scope);
    }

    thanks,
    Bill


    On Wed, Mar 10, 2010 at 12:15 PM, Bill Graham wrote:

    Yes, these errors appear in the Pig client and the jobs are definitely
    being executed on the cluster. I can see the data in HDFS and the jobs in
    the JobTracker UI of the cluster.


    On Wed, Mar 10, 2010 at 10:54 AM, Ashutosh Chauhan <
    ashutosh.chauhan@gmail.com> wrote:
    [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low memory
    handler called

    Are you seeing this warning on client side, in pig logs? If so, then
    are you sure your job is actually running on real hadoop cluster.
    Because these logs should appear in task-tracker logs not in client
    logs. This may imply that you job is getting executed locally in local
    mode and not actually submitted to cluster. Look for the very first
    lines in the client logs, where Pig tries to connect to the cluster.
    See, if its successful in doing so.



    On Wed, Mar 10, 2010 at 10:15, Ashutosh Chauhan
    wrote:
    Posting for Bill.


    ---------- Forwarded message ----------
    From: Bill Graham <billgraham@gmail.com>
    Date: Wed, Mar 10, 2010 at 10:11
    Subject: Re: PigServer memory leak
    To: ashutosh.chauhan@gmail.com


    Thanks for the reply, Ashutosh.

    [hadoop.apache.org keeps flagging my reply as spam, so I'm replying
    directly to you. Feel free to push this conversation back onto the
    list, if you can. :)]

    I'm running the same two scripts, one after the other, every 5
    minutes. The scripts have dynamic tokens substituted to change the
    input and output directories. Besides that, they have the same logic.

    I will try to execute the script from grunt next time it happens, but
    I don't see how a lack of pig MR optimizations could cause a memory
    issue on the client? If I bounce my daemon, the next jobs to run
    executes without a problem upon start, so I would also expect a script
    run through grunt at that time to run without a problem as well.

    I reverted back to re-initializing PigServer for every run. I have
    other places in my scheduled workflow where I interact with HDFS which
    I've now modified to re-use an instance of Hadoop's Configuration
    object for the life of the VM. I was re-initializing that many times
    per run. Looking at the Configuration code it seems to re-parse the
    XML configs into a DOM every time it's called, so this certainly looks
    like a place for a potential leak. If nothing else it should give me
    an optimization. Configuration seems to be stateless and read-only
    after initiation so this seems safe.

    Anyway, here are my two scripts. The first generates summaries, the
    second makes a report from the summaries and they run in separate
    PigServer instances via registerQuery(..). Let me know if you see
    anything that seems off:


    define chukwaLoader org.apache.hadoop.chukwa.
    ChukwaStorage();
    define tokenize com.foo.hadoop.mapreduce.pig.udf.TOKENIZE();
    define regexMatch com.foo.hadoop.mapreduce.pig.udf.REGEX_MATCH();
    define timePeriod
    org.apache.hadoop.chukwa.TimePartition('@TIME_PERIOD@');
    raw = LOAD '@HADOOP_INPUT_LOCATION@'
    USING chukwaLoader AS (ts: long, fields);
    bodies = FOREACH raw GENERATE tokenize((chararray)fields#'body') as
    tokens, timePeriod(ts) as time;

    -- pull values out of the URL
    tokens1 = FOREACH bodies GENERATE
    (int)regexMatch($0.token4, '(?:[?&])ptId=([^&]*)', 1) as
    pageTypeId,
    (int)regexMatch($0.token4, '(?:[?&])sId=([^&]*)', 1) as siteId,
    (int)regexMatch($0.token4, '(?:[?&])aId=([^&]*)', 1) as assetId, time,
    regexMatch($0.token4, '(?:[?&])tag=([^&]*)', 1) as tagValue;

    -- filter out entries without an assetId
    tokens2 = FILTER tokens1 BY
    (assetId is not null) AND (pageTypeId is not null) AND (siteId is
    not null);
    -- group by tagValue, time, assetId and flatten to get counts
    grouped = GROUP tokens2 BY (tagValue, time, assetId, pageTypeId, siteId);
    flattened = FOREACH grouped GENERATE
    FLATTEN(group) as (tagValue, time, assetId, pageTypeId, siteId),
    COUNT(tokens2) as count;

    shifted = FOREACH flattened GENERATE time, count, assetId, pageTypeId,
    siteId, tagValue;

    -- order and store
    ordered = ORDER shifted BY tagValue ASC, count DESC, assetId DESC,
    pageTypeId ASC, siteId ASC, time DESC;
    STORE ordered INTO '@HADOOP_OUTPUT_LOCATION@';





    raw = LOAD '@HADOOP_INPUT_LOCATION@' USING PigStorage('\t') AS
    (ts: long, count: int, assetId: int, pageTypeId: chararray,
    siteId: int, tagValue: chararray);

    -- now store most popular overall - filtered by pageTypeId
    most_popular_filtered = FILTER raw BY
    (siteId == 162) AND (pageTypeId matches
    '(2100)|(1606)|(1801)|(2300)|(2718)');
    most_popular = GROUP most_popular_filtered BY (ts, assetId,
    pageTypeId);
    most_popular_flattened = FOREACH most_popular GENERATE
    FLATTEN(group) as (ts, assetId, pageTypeId),
    SUM(most_popular_filtered.count) as count;
    most_popular_shifted = FOREACH most_popular_flattened
    GENERATE ts, count, assetId, (int)pageTypeId;

    most_popular_ordered = ORDER most_popular_shifted
    BY ts DESC, count DESC, assetId ASC, pageTypeId ASC;
    STORE most_popular_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_overall/';

    -- now store most popular by pagetype - filtered by pageTypeId
    STORE most_popular_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype_temp' USING
    com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype/', '3',
    'none', '\t');
    -- now store most popular by tags - not filtered by pageTypeId
    most_popular_by_tag_filtered = FILTER raw BY
    (siteId == 162) AND (tagValue is not null);
    most_popular_by_tag = GROUP most_popular_by_tag_filtered BY (ts,
    assetId, pageTypeId, tagValue);
    most_popular_flattened = FOREACH most_popular_by_tag
    GENERATE FLATTEN(group) as (ts, assetId, pageTypeId, tagValue),
    SUM(most_popular_by_tag_filtered.count) as count;
    most_popular_by_tag_shifted = FOREACH most_popular_flattened
    GENERATE ts, count, assetId, (int)pageTypeId, tagValue;
    most_popular_by_tag_ordered = ORDER most_popular_by_tag_shifted
    BY ts DESC, tagValue ASC, count DESC, assetId ASC, pageTypeId ASC;
    STORE most_popular_by_tag_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag_temp' USING
    com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag/', '4', 'none', '\t');
    On Tue, Mar 9, 2010 at 9:13 PM, Ashutosh Chauhan
    wrote:
    PigServer maintains some static state and in current implementation
    it
    is not safe to reuse it across different queries. You should create a
    new instance for every query.

    As for memory leak : Are you running exactly same query over same
    dataset repeatedly ? If yes and you run out of memory then there is
    memory leak somewhere. But, I doubt thats what you are doing. More
    likely, the message you are seeing has nothing to do with PigServer
    and is because of the query and/or dataset. That is your query may
    not
    be taking advantages of optimizations in pig. When you see that
    message, run same query using grunt or bin/pig and you should see
    same
    messages. Send the query you are firing, there might be way to
    optimize it and to avoid those messages.

    Hope that helps,
    Ashutosh

    On Tue, Mar 9, 2010 at 11:42, Bill Graham <billgraham@gmail.com>
    wrote:
    Actually, upon closer investigation, re-using PigServer isn't
    working
    as
    well as I thought. I'm digging into the issue.

    To step back a bit though, I want to pose a different question:
    What
    is the
    intended usage of PigServer and PigContext w.r.t. it's scope?
    Should
    a new
    instance of each be used for every job or is one or the other
    intended for
    re-use throughout the lifecycle of the VM instance?

    Digging into the code of PigServer it seems like it's intended to
    be
    used
    for a single script's execution only, but it's not entirely clear
    if
    that's
    the case.



    On Tue, Mar 9, 2010 at 9:29 AM, Bill Graham <billgraham@gmail.com>
    wrote:
    hi,

    I've got a long running daemon application that periodically kicks
    of Pig
    jobs via quartz (Pig version 0.4.0). It uses a wrapper class that
    initilizes
    an instance of PigServer before parsing and executing a pig
    script.
    As
    implemented, the app would leak memory and after a while jobs
    would
    fail to
    run with messages like this appearing in the logs:

    [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low
    memory
    handler called

    To fix the issue, I created an instance of PigServer at
    application
    initialization and I re-use that instance for all jobs for the
    life
    of the
    daemon. Problem solved.

    So my question is, is this a bug in PigServer that it leaks memory
    when
    multiple instances are created, or is that just improper use of
    the
    class?
    thanks,
    Bill
  • Bill Graham at Mar 22, 2010 at 5:14 pm
    Sure, here it is:
    https://issues.apache.org/jira/browse/PIG-1313

    I've written a test script to try to reproduce, but it's not working as
    expected. This could be because my dummy script is too simple to properly
    reproduce the problem, or maybe because it runs on local HDFS.

    Either way, I started calling FileLocalizer.deleteTempFiles() in my server
    application and my memory usage has been stable over the weekend, so I'm
    confident that there is in fact a leak. I think this approach is flawed
    though (see discussion in the JIRA).

    thanks,
    Bill
    On Sun, Mar 21, 2010 at 10:54 PM, Daniel Dai wrote:

    Thanks for digging into it. Can you open a Jira for that?

    Daniel

    --------------------------------------------------

    From: "Bill Graham" <billgraham@gmail.com>
    Sent: Friday, March 19, 2010 10:34 AM
    To: <pig-user@hadoop.apache.org>

    Subject: Re: PigServer memory leak

    I believe I've found the cause of my Pig memory leak so I wanted to report
    back. I profiled my app after letting it run for a couple of days and
    found
    that the static toDelete Stack in the FileLocalizer object was growing
    over
    time without getting flushed. I had thousands of HFile objects in that
    stack. This produced a memory leak both in my app and in HDFS.

    The fix seems straightforward enough in my app. I suspect calling
    FileLocalizer.deleteTempFiles() after each usage of PigServer for a given
    execution of a given pig script will do the trick.

    This seems to be a major gotcha though that will likely burn others. I
    suggest we add FileLocalizer.deleteTempFiles() to the shutdown() method of
    PigServer. Thoughts?

    Currently shutdown isn't doing much:

    public void shutdown() {
    // clean-up activities
    // TODO: reclaim scope to free up resources. Currently
    // this is not implemented and throws an exception
    // hence, for now, we won't call it.
    //
    // pigContext.getExecutionEngine().reclaimScope(this.scope);
    }

    thanks,
    Bill



    On Wed, Mar 10, 2010 at 12:15 PM, Bill Graham <billgraham@gmail.com>
    wrote:

    Yes, these errors appear in the Pig client and the jobs are definitely
    being executed on the cluster. I can see the data in HDFS and the jobs in
    the JobTracker UI of the cluster.


    On Wed, Mar 10, 2010 at 10:54 AM, Ashutosh Chauhan <
    ashutosh.chauhan@gmail.com> wrote:

    [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low memory
    handler called

    Are you seeing this warning on client side, in pig logs? If so, then
    are you sure your job is actually running on real hadoop cluster.
    Because these logs should appear in task-tracker logs not in client
    logs. This may imply that you job is getting executed locally in local
    mode and not actually submitted to cluster. Look for the very first
    lines in the client logs, where Pig tries to connect to the cluster.
    See, if its successful in doing so.



    On Wed, Mar 10, 2010 at 10:15, Ashutosh Chauhan
    wrote:
    Posting for Bill.


    ---------- Forwarded message ----------
    From: Bill Graham <billgraham@gmail.com>
    Date: Wed, Mar 10, 2010 at 10:11
    Subject: Re: PigServer memory leak
    To: ashutosh.chauhan@gmail.com


    Thanks for the reply, Ashutosh.

    [hadoop.apache.org keeps flagging my reply as spam, so I'm replying
    directly to you. Feel free to push this conversation back onto the
    list, if you can. :)]

    I'm running the same two scripts, one after the other, every 5
    minutes. The scripts have dynamic tokens substituted to change the
    input and output directories. Besides that, they have the same logic.

    I will try to execute the script from grunt next time it happens, but
    I don't see how a lack of pig MR optimizations could cause a memory
    issue on the client? If I bounce my daemon, the next jobs to run
    executes without a problem upon start, so I would also expect a script
    run through grunt at that time to run without a problem as well.

    I reverted back to re-initializing PigServer for every run. I have
    other places in my scheduled workflow where I interact with HDFS which
    I've now modified to re-use an instance of Hadoop's Configuration
    object for the life of the VM. I was re-initializing that many times
    per run. Looking at the Configuration code it seems to re-parse the
    XML configs into a DOM every time it's called, so this certainly looks
    like a place for a potential leak. If nothing else it should give me
    an optimization. Configuration seems to be stateless and read-only
    after initiation so this seems safe.

    Anyway, here are my two scripts. The first generates summaries, the
    second makes a report from the summaries and they run in separate
    PigServer instances via registerQuery(..). Let me know if you see
    anything that seems off:


    define chukwaLoader org.apache.hadoop.chukwa.
    ChukwaStorage();
    define tokenize com.foo.hadoop.mapreduce.pig.udf.TOKENIZE();
    define regexMatch com.foo.hadoop.mapreduce.pig.udf.REGEX_MATCH();
    define timePeriod
    org.apache.hadoop.chukwa.TimePartition('@TIME_PERIOD@');
    raw = LOAD '@HADOOP_INPUT_LOCATION@'
    USING chukwaLoader AS (ts: long, fields);
    bodies = FOREACH raw GENERATE tokenize((chararray)fields#'body') as
    tokens, timePeriod(ts) as time;

    -- pull values out of the URL
    tokens1 = FOREACH bodies GENERATE
    (int)regexMatch($0.token4, '(?:[?&])ptId=([^&]*)', 1) as
    pageTypeId,
    (int)regexMatch($0.token4, '(?:[?&])sId=([^&]*)', 1) as siteId,
    (int)regexMatch($0.token4, '(?:[?&])aId=([^&]*)', 1) as assetId, time,
    regexMatch($0.token4, '(?:[?&])tag=([^&]*)', 1) as tagValue;

    -- filter out entries without an assetId
    tokens2 = FILTER tokens1 BY
    (assetId is not null) AND (pageTypeId is not null) AND (siteId is
    not null);
    -- group by tagValue, time, assetId and flatten to get counts
    grouped = GROUP tokens2 BY (tagValue, time, assetId, pageTypeId, siteId);
    flattened = FOREACH grouped GENERATE
    FLATTEN(group) as (tagValue, time, assetId, pageTypeId, siteId),
    COUNT(tokens2) as count;

    shifted = FOREACH flattened GENERATE time, count, assetId, pageTypeId,
    siteId, tagValue;

    -- order and store
    ordered = ORDER shifted BY tagValue ASC, count DESC, assetId DESC,
    pageTypeId ASC, siteId ASC, time DESC;
    STORE ordered INTO '@HADOOP_OUTPUT_LOCATION@';





    raw = LOAD '@HADOOP_INPUT_LOCATION@' USING PigStorage('\t') AS
    (ts: long, count: int, assetId: int, pageTypeId: chararray,
    siteId: int, tagValue: chararray);

    -- now store most popular overall - filtered by pageTypeId
    most_popular_filtered = FILTER raw BY
    (siteId == 162) AND (pageTypeId matches
    '(2100)|(1606)|(1801)|(2300)|(2718)');
    most_popular = GROUP most_popular_filtered BY (ts, assetId,
    pageTypeId);
    most_popular_flattened = FOREACH most_popular GENERATE
    FLATTEN(group) as (ts, assetId, pageTypeId),
    SUM(most_popular_filtered.count) as count;
    most_popular_shifted = FOREACH most_popular_flattened
    GENERATE ts, count, assetId, (int)pageTypeId;

    most_popular_ordered = ORDER most_popular_shifted
    BY ts DESC, count DESC, assetId ASC, pageTypeId ASC;
    STORE most_popular_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_overall/';

    -- now store most popular by pagetype - filtered by pageTypeId
    STORE most_popular_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype_temp' USING
    com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_pagetype/', '3',
    'none', '\t');
    -- now store most popular by tags - not filtered by pageTypeId
    most_popular_by_tag_filtered = FILTER raw BY
    (siteId == 162) AND (tagValue is not null);
    most_popular_by_tag = GROUP most_popular_by_tag_filtered BY (ts,
    assetId, pageTypeId, tagValue);
    most_popular_flattened = FOREACH most_popular_by_tag
    GENERATE FLATTEN(group) as (ts, assetId, pageTypeId, tagValue),
    SUM(most_popular_by_tag_filtered.count) as count;
    most_popular_by_tag_shifted = FOREACH most_popular_flattened
    GENERATE ts, count, assetId, (int)pageTypeId, tagValue;
    most_popular_by_tag_ordered = ORDER most_popular_by_tag_shifted
    BY ts DESC, tagValue ASC, count DESC, assetId ASC, pageTypeId ASC;
    STORE most_popular_by_tag_ordered INTO
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag_temp' USING
    com.foo.hadoop.mapreduce.pig.storage.MultiStorage(
    '@HADOOP_OUTPUT_LOCATION@/most_popular_by_tag/', '4', 'none', '\t');
    On Tue, Mar 9, 2010 at 9:13 PM, Ashutosh Chauhan
    wrote:
    PigServer maintains some static state and in current implementation
    it
    is not safe to reuse it across different queries. You should create a
    new instance for every query.

    As for memory leak : Are you running exactly same query over same
    dataset repeatedly ? If yes and you run out of memory then there is
    memory leak somewhere. But, I doubt thats what you are doing. More
    likely, the message you are seeing has nothing to do with PigServer
    and is because of the query and/or dataset. That is your query may
    not
    be taking advantages of optimizations in pig. When you see that
    message, run same query using grunt or bin/pig and you should see
    same
    messages. Send the query you are firing, there might be way to
    optimize it and to avoid those messages.

    Hope that helps,
    Ashutosh

    On Tue, Mar 9, 2010 at 11:42, Bill Graham <billgraham@gmail.com>
    wrote:
    Actually, upon closer investigation, re-using PigServer isn't
    working
    as
    well as I thought. I'm digging into the issue.

    To step back a bit though, I want to pose a different question:
    What
    is the
    intended usage of PigServer and PigContext w.r.t. it's scope?
    Should
    a new
    instance of each be used for every job or is one or the other
    intended for
    re-use throughout the lifecycle of the VM instance?

    Digging into the code of PigServer it seems like it's intended to
    be
    used
    for a single script's execution only, but it's not entirely clear
    if
    that's
    the case.



    On Tue, Mar 9, 2010 at 9:29 AM, Bill Graham <billgraham@gmail.com>
    wrote:
    hi,

    I've got a long running daemon application that periodically kicks
    of Pig
    jobs via quartz (Pig version 0.4.0). It uses a wrapper class that
    initilizes
    an instance of PigServer before parsing and executing a pig
    script.
    As
    implemented, the app would leak memory and after a while jobs
    would
    fail to
    run with messages like this appearing in the logs:

    [Low Memory Detector] [INFO] SpillableMemoryManager.java:143 low
    memory
    handler called

    To fix the issue, I created an instance of PigServer at
    application
    initialization and I re-use that instance for all jobs for the
    life
    of the
    daemon. Problem solved.

    So my question is, is this a bug in PigServer that it leaks memory
    when
    multiple instances are created, or is that just improper use of
    the
    class?
    thanks,
    Bill

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupuser @
categoriespig, hadoop
postedMar 9, '10 at 5:30p
activeMar 22, '10 at 5:14p
posts10
users3
websitepig.apache.org

People

Translate

site design / logo © 2021 Grokbase