FAQ
I searched through the topics but I could not find any example on how to
pull from a Source in the case where data is coming in bursts. I built a
customized Source that read from an external source. Data is coming in
bursts, the flow can stop for a while so I need to keep pulling the Source
in such a way I can keep reading data (the actual read process is done in
the old poll fashion). I guess I need something like a KeepAlive type of
mechanism for the data stream.

BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

Thanks
Claudio


--
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Search Discussions

  • Endre Varga at Feb 3, 2016 at 8:17 am
    Hi Claudio,

    I don't really understand your question here, a Source is alive until you
    close its output port. Can you show a simplified code example of what you
    mean?

    -Endre
    On Wed, Feb 3, 2016 at 12:04 AM, clca wrote:

    I searched through the topics but I could not find any example on how to
    pull from a Source in the case where data is coming in bursts. I built a
    customized Source that read from an external source. Data is coming in
    bursts, the flow can stop for a while so I need to keep pulling the Source
    in such a way I can keep reading data (the actual read process is done in
    the old poll fashion). I guess I need something like a KeepAlive type of
    mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • Viktor Klang at Feb 3, 2016 at 8:33 am
    Perhaps a Source with a buffer attached with the appropriate dropping
    strategy?

    --
    Cheers,

    On Feb 3, 2016 9:17 AM, "Endre Varga" wrote:

    Hi Claudio,

    I don't really understand your question here, a Source is alive until you
    close its output port. Can you show a simplified code example of what you
    mean?

    -Endre
    On Wed, Feb 3, 2016 at 12:04 AM, clca wrote:

    I searched through the topics but I could not find any example on how to
    pull from a Source in the case where data is coming in bursts. I built a
    customized Source that read from an external source. Data is coming in
    bursts, the flow can stop for a while so I need to keep pulling the Source
    in such a way I can keep reading data (the actual read process is done in
    the old poll fashion). I guess I need something like a KeepAlive type of
    mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • John Vieten at Feb 3, 2016 at 8:55 am
    I've done something similar.
    I adapted this JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    When no data is available( for example when it recieves a Request(16)
    Messag) it starts a" polling Actor" which polls an external Database for
    more data.
    Does this help?

    Am Mittwoch, 3. Februar 2016 08:33:16 UTC+1 schrieb clca:
    I searched through the topics but I could not find any example on how to
    pull from a Source in the case where data is coming in bursts. I built a
    customized Source that read from an external source. Data is coming in
    bursts, the flow can stop for a while so I need to keep pulling the Source
    in such a way I can keep reading data (the actual read process is done in
    the old poll fashion). I guess I need something like a KeepAlive type of
    mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio
    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • Clca at Feb 3, 2016 at 7:49 pm
    Yes this is close to what I need to do.
    The code in the onPull method is something like

    val m = ReadFromService(...)
    if(m != null)
        push(out, m)

    in a traditional app reading would be done in a loop

    while(true) {
       val m = ReadFromService(...)
    //do something with the new message
    }

    So I'll add a mechanism in the Source to keep polling the external service.

    Thanks!


    On Wednesday, February 3, 2016 at 12:55:16 AM UTC-8, john....@gmail.com
    wrote:
    I've done something similar.
    I adapted this JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    When no data is available( for example when it recieves a Request(16)
    Messag) it starts a" polling Actor" which polls an external Database for
    more data.
    Does this help?

    Am Mittwoch, 3. Februar 2016 08:33:16 UTC+1 schrieb clca:
    I searched through the topics but I could not find any example on how to
    pull from a Source in the case where data is coming in bursts. I built a
    customized Source that read from an external source. Data is coming in
    bursts, the flow can stop for a while so I need to keep pulling the Source
    in such a way I can keep reading data (the actual read process is done in
    the old poll fashion). I guess I need something like a KeepAlive type of
    mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio
    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • Viktor Klang at Feb 3, 2016 at 7:51 pm
    I don't see why you'd need to write a custom GraphStage for this.

    unfold/unfoldAsync paired with a buffer with an appropriate dropping policy
    should work, no?
    On Wed, Feb 3, 2016 at 8:47 PM, clca wrote:

    Yes this is close to what I need to do.
    The code in the onPull method is something like

    val m = ReadFromService(...)
    if(m != null)
    push(out, m)

    in a traditional app reading would be done in a loop

    while(true) {
    val m = ReadFromService(...)
    //do something with the new message
    }

    So I'll add a mechanism in the Source to keep polling the external
    service.

    Thanks!


    On Wednesday, February 3, 2016 at 12:55:16 AM UTC-8, john....@gmail.com
    wrote:
    I've done something similar.
    I adapted this JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    When no data is available( for example when it recieves a Request(16)
    Messag) it starts a" polling Actor" which polls an external Database for
    more data.
    Does this help?

    Am Mittwoch, 3. Februar 2016 08:33:16 UTC+1 schrieb clca:
    I searched through the topics but I could not find any example on how to
    pull from a Source in the case where data is coming in bursts. I built a
    customized Source that read from an external source. Data is coming in
    bursts, the flow can stop for a while so I need to keep pulling the Source
    in such a way I can keep reading data (the actual read process is done in
    the old poll fashion). I guess I need something like a KeepAlive type of
    mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • John Vieten at Feb 3, 2016 at 10:06 pm
    where do I find unfold/unfoldAsync ? I looked
    at http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/stages-overview.html?
    Many Greetings
    John


    Am Mittwoch, 3. Februar 2016 20:51:17 UTC+1 schrieb √:
    I don't see why you'd need to write a custom GraphStage for this.

    unfold/unfoldAsync paired with a buffer with an appropriate dropping
    policy should work, no?

    On Wed, Feb 3, 2016 at 8:47 PM, clca <clau...@outlook.com <javascript:>>
    wrote:
    Yes this is close to what I need to do.
    The code in the onPull method is something like

    val m = ReadFromService(...)
    if(m != null)
    push(out, m)

    in a traditional app reading would be done in a loop

    while(true) {
    val m = ReadFromService(...)
    //do something with the new message
    }

    So I'll add a mechanism in the Source to keep polling the external
    service.

    Thanks!


    On Wednesday, February 3, 2016 at 12:55:16 AM UTC-8, john....@gmail.com
    wrote:
    I've done something similar.
    I adapted this JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    When no data is available( for example when it recieves a Request(16)
    Messag) it starts a" polling Actor" which polls an external Database for
    more data.
    Does this help?

    Am Mittwoch, 3. Februar 2016 08:33:16 UTC+1 schrieb clca:
    I searched through the topics but I could not find any example on how
    to pull from a Source in the case where data is coming in bursts. I built a
    customized Source that read from an external source. Data is coming in
    bursts, the flow can stop for a while so I need to keep pulling the Source
    in such a way I can keep reading data (the actual read process is done in
    the old poll fashion). I guess I need something like a KeepAlive type of
    mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+...@googlegroups.com <javascript:>.
    To post to this group, send email to akka...@googlegroups.com
    <javascript:>.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • Viktor Klang at Feb 3, 2016 at 10:10 pm
    http://doc.akka.io/api/akka-stream-and-http-experimental/2.0.3/?_ga=1.45749860.1579561034.1353497989#akka.stream.scaladsl.Source$
    On Wed, Feb 3, 2016 at 11:06 PM, wrote:

    where do I find unfold/unfoldAsync ? I looked at
    http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/stages-overview.html
    ?
    Many Greetings
    John


    Am Mittwoch, 3. Februar 2016 20:51:17 UTC+1 schrieb √:
    I don't see why you'd need to write a custom GraphStage for this.

    unfold/unfoldAsync paired with a buffer with an appropriate dropping
    policy should work, no?
    On Wed, Feb 3, 2016 at 8:47 PM, clca wrote:

    Yes this is close to what I need to do.
    The code in the onPull method is something like

    val m = ReadFromService(...)
    if(m != null)
    push(out, m)

    in a traditional app reading would be done in a loop

    while(true) {
    val m = ReadFromService(...)
    //do something with the new message
    }

    So I'll add a mechanism in the Source to keep polling the external
    service.

    Thanks!


    On Wednesday, February 3, 2016 at 12:55:16 AM UTC-8, john....@gmail.com
    wrote:
    I've done something similar.
    I adapted this JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    When no data is available( for example when it recieves a Request(16)
    Messag) it starts a" polling Actor" which polls an external Database for
    more data.
    Does this help?

    Am Mittwoch, 3. Februar 2016 08:33:16 UTC+1 schrieb clca:
    I searched through the topics but I could not find any example on how
    to pull from a Source in the case where data is coming in bursts. I built a
    customized Source that read from an external source. Data is coming in
    bursts, the flow can stop for a while so I need to keep pulling the Source
    in such a way I can keep reading data (the actual read process is done in
    the old poll fashion). I guess I need something like a KeepAlive type of
    mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • John Vieten at Feb 7, 2016 at 11:52 am
    Hi Victor,
    I know you love simple elegant code but
    I looked at unfoldAsync and I don't see that it solves the usecase I have
    in mind.

    I'll try to explain why because I am not sure if I am overlooking something
    obvious?

    An external system writes events into a log table. Since this is a
    non-reactive-sql-database the consumer needs to poll the log table (for
    example every 2 seconds) for new events.

      To convert this scenario into a streaming source using unfoldAsync
      I need to implement a polling loop inside the future so that
    Future.success gets only called when new events are inserted in the log
    table.

    If I don't use polling the future would send 0 Events upstream and the
    stream would come to an end?

    That's why I like using instead of unfoldAsync an Actor like JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    as an Source. Within the Actor I can be more fine grained and use the
    scheduler to implement the polling logic.

    Many Greetings
    John











    Am Mittwoch, 3. Februar 2016 23:10:15 UTC+1 schrieb √:

    http://doc.akka.io/api/akka-stream-and-http-experimental/2.0.3/?_ga=1.45749860.1579561034.1353497989#akka.stream.scaladsl.Source$
    On Wed, Feb 3, 2016 at 11:06 PM, <john....@gmail.com <javascript:>> wrote:

    where do I find unfold/unfoldAsync ? I looked at
    http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/stages-overview.html
    ?
    Many Greetings
    John


    Am Mittwoch, 3. Februar 2016 20:51:17 UTC+1 schrieb √:
    I don't see why you'd need to write a custom GraphStage for this.

    unfold/unfoldAsync paired with a buffer with an appropriate dropping
    policy should work, no?
    On Wed, Feb 3, 2016 at 8:47 PM, clca wrote:

    Yes this is close to what I need to do.
    The code in the onPull method is something like

    val m = ReadFromService(...)
    if(m != null)
    push(out, m)

    in a traditional app reading would be done in a loop

    while(true) {
    val m = ReadFromService(...)
    //do something with the new message
    }

    So I'll add a mechanism in the Source to keep polling the external
    service.

    Thanks!


    On Wednesday, February 3, 2016 at 12:55:16 AM UTC-8, john....@gmail.com
    wrote:
    I've done something similar.
    I adapted this JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    When no data is available( for example when it recieves a Request(16)
    Messag) it starts a" polling Actor" which polls an external Database for
    more data.
    Does this help?

    Am Mittwoch, 3. Februar 2016 08:33:16 UTC+1 schrieb clca:
    I searched through the topics but I could not find any example on how
    to pull from a Source in the case where data is coming in bursts. I built a
    customized Source that read from an external source. Data is coming in
    bursts, the flow can stop for a while so I need to keep pulling the Source
    in such a way I can keep reading data (the actual read process is done in
    the old poll fashion). I guess I need something like a KeepAlive type of
    mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+...@googlegroups.com <javascript:>.
    To post to this group, send email to akka...@googlegroups.com
    <javascript:>.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • Viktor Klang at Feb 7, 2016 at 12:05 pm
    Hi John!

    I think I can help you, but I have some follow-on questions :)
    On Sun, Feb 7, 2016 at 12:52 PM, wrote:

    Hi Victor,
    I know you love simple elegant code but
    I looked at unfoldAsync and I don't see that it solves the usecase I have
    in mind.

    I'll try to explain why because I am not sure if I am overlooking
    something obvious?

    An external system writes events into a log table. Since this is a
    non-reactive-sql-database the consumer needs to poll the log table (for
    example every 2 seconds) for new events.
    (Why) does it need to poll the log table if there is no demand for events?

    To convert this scenario into a streaming source using unfoldAsync
    I need to implement a polling loop inside the future so that
    Future.success gets only called when new events are inserted in the log
    table.
    This seems much like a technical aspect rather than a requirement.

    If I don't use polling the future would send 0 Events upstream and the
    stream would come to an end?
    Events go downstream, or did I misunderstand something?

    That's why I like using instead of unfoldAsync an Actor like JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    as an Source. Within the Actor I can be more fine grained and use the
    scheduler to implement the polling logic.
    Let's take a step back, what are the actual requirements?

    1. You have a source of "events" and the only way you know if there are any
    events, is if you ask for events
    2. You want to consume events from this source
    3. You only need to consume events from the source if there is demand from
    "downstream"
    4. Does the source of events ever end, and how do you know?
    5. can you ask for a specified number of events or does polling imply
    reading all available events?

    Many Greetings
    John











    Am Mittwoch, 3. Februar 2016 23:10:15 UTC+1 schrieb √:

    http://doc.akka.io/api/akka-stream-and-http-experimental/2.0.3/?_ga=1.45749860.1579561034.1353497989#akka.stream.scaladsl.Source$
    On Wed, Feb 3, 2016 at 11:06 PM, wrote:

    where do I find unfold/unfoldAsync ? I looked at
    http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/stages-overview.html
    ?
    Many Greetings
    John


    Am Mittwoch, 3. Februar 2016 20:51:17 UTC+1 schrieb √:
    I don't see why you'd need to write a custom GraphStage for this.

    unfold/unfoldAsync paired with a buffer with an appropriate dropping
    policy should work, no?
    On Wed, Feb 3, 2016 at 8:47 PM, clca wrote:

    Yes this is close to what I need to do.
    The code in the onPull method is something like

    val m = ReadFromService(...)
    if(m != null)
    push(out, m)

    in a traditional app reading would be done in a loop

    while(true) {
    val m = ReadFromService(...)
    //do something with the new message
    }

    So I'll add a mechanism in the Source to keep polling the external
    service.

    Thanks!


    On Wednesday, February 3, 2016 at 12:55:16 AM UTC-8,
    john....@gmail.com wrote:
    I've done something similar.
    I adapted this JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    When no data is available( for example when it recieves a Request(16)
    Messag) it starts a" polling Actor" which polls an external Database for
    more data.
    Does this help?

    Am Mittwoch, 3. Februar 2016 08:33:16 UTC+1 schrieb clca:
    I searched through the topics but I could not find any example on
    how to pull from a Source in the case where data is coming in bursts. I
    built a customized Source that read from an external source. Data is coming
    in bursts, the flow can stop for a while so I need to keep pulling the
    Source in such a way I can keep reading data (the actual read process is
    done in the old poll fashion). I guess I need something like a KeepAlive
    type of mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • John Vieten at Feb 7, 2016 at 1:17 pm
    Hi Victor,
    thank you for your reply and here are my remarks :

    Point 1. You have a source of "events" and the only way you know if there
    are any events, is if you ask for events

    exactly

    Point 2.You want to consume events from this source

    Yes

      Point 3) You only need to consume events from the source if there is
    demand from "downstream"

    It is the other way round. The source logically pushes the events
    downstream.
    Ideally as soon as theses Events arrive I would like them to be pushed
    downstream. So its more a push scenario.
    The source pushes as many Events as the downstream processes can handle.
    (But I have to limit myself for polling the database at a rate of 2-5
    seconds because of hardware reasons)

      Point 4) Does the source of events ever end, and how do you know?

    No the source never ends. External Systems always insert new Event records
    into the database

       Point 5. can you ask for a specified number of events or does polling
    imply reading all available events?

    No I can ask for a specified number of events.

    Here is a concrete business case:
    1) External systems want Email Messages to be emailed to Users.
    2) They insert Email Events (Records with email-address, content,
    user_id,etc) into a Email Table (log-table)
    3) Email-Send-Service sends these Emails over the wire
    4) Acknowledge-Service logs the Email-Send Event and cleans up the Email
    Table (log-table).

    So I can set up a Stream: Email-Source -> Email-Send-Service -
    Acknowledge-Service .

    I am now considering two ways of doing it and are hoping for your advice (-:
    1) create the following
         Email-Source (select * from Email limit 1000) -> Email-Send-Service
    -> Acknowledge-Service
    and materialize this stream every two seconds

    2) Set up a custom source which polls forever
         Polling-Neverending-Email-Source (select * from Email limit 1000 every
    two seconds inside the Actor) -> Email-Send-Service ->
    Acknowledge-Service


    Background: I am building up with Akka Streams a inhouse toolkit which
    deals with many ESB typical processes.
    The above usecase profits not so much from throughput (actually the users
    could live with some delay) but more from
    reusing stream components and having a nice clear Flow DSL.

    Many Greetings
    John
















    Am Sonntag, 7. Februar 2016 13:05:59 UTC+1 schrieb √:
    Hi John!

    I think I can help you, but I have some follow-on questions :)
    On Sun, Feb 7, 2016 at 12:52 PM, <john....@gmail.com <javascript:>> wrote:

    Hi Victor,
    I know you love simple elegant code but
    I looked at unfoldAsync and I don't see that it solves the usecase I
    have in mind.

    I'll try to explain why because I am not sure if I am overlooking
    something obvious?

    An external system writes events into a log table. Since this is a
    non-reactive-sql-database the consumer needs to poll the log table (for
    example every 2 seconds) for new events.
    (Why) does it need to poll the log table if there is no demand for events?

    To convert this scenario into a streaming source using unfoldAsync
    I need to implement a polling loop inside the future so that
    Future.success gets only called when new events are inserted in the log
    table.
    This seems much like a technical aspect rather than a requirement.

    If I don't use polling the future would send 0 Events upstream and the
    stream would come to an end?
    Events go downstream, or did I misunderstand something?

    That's why I like using instead of unfoldAsync an Actor like JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    as an Source. Within the Actor I can be more fine grained and use the
    scheduler to implement the polling logic.
    Let's take a step back, what are the actual requirements?

    1. You have a source of "events" and the only way you know if there are
    any events, is if you ask for events
    2. You want to consume events from this source
    3. You only need to consume events from the source if there is demand from
    "downstream"
    4. Does the source of events ever end, and how do you know?
    5. can you ask for a specified number of events or does polling imply
    reading all available events?

    Many Greetings
    John











    Am Mittwoch, 3. Februar 2016 23:10:15 UTC+1 schrieb √:

    http://doc.akka.io/api/akka-stream-and-http-experimental/2.0.3/?_ga=1.45749860.1579561034.1353497989#akka.stream.scaladsl.Source$
    On Wed, Feb 3, 2016 at 11:06 PM, wrote:

    where do I find unfold/unfoldAsync ? I looked at
    http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/stages-overview.html
    ?
    Many Greetings
    John


    Am Mittwoch, 3. Februar 2016 20:51:17 UTC+1 schrieb √:
    I don't see why you'd need to write a custom GraphStage for this.

    unfold/unfoldAsync paired with a buffer with an appropriate dropping
    policy should work, no?
    On Wed, Feb 3, 2016 at 8:47 PM, clca wrote:

    Yes this is close to what I need to do.
    The code in the onPull method is something like

    val m = ReadFromService(...)
    if(m != null)
    push(out, m)

    in a traditional app reading would be done in a loop

    while(true) {
    val m = ReadFromService(...)
    //do something with the new message
    }

    So I'll add a mechanism in the Source to keep polling the external
    service.

    Thanks!


    On Wednesday, February 3, 2016 at 12:55:16 AM UTC-8,
    john....@gmail.com wrote:
    I've done something similar.
    I adapted this JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    When no data is available( for example when it recieves a Request(16)
    Messag) it starts a" polling Actor" which polls an external Database for
    more data.
    Does this help?

    Am Mittwoch, 3. Februar 2016 08:33:16 UTC+1 schrieb clca:
    I searched through the topics but I could not find any example on
    how to pull from a Source in the case where data is coming in bursts. I
    built a customized Source that read from an external source. Data is coming
    in bursts, the flow can stop for a while so I need to keep pulling the
    Source in such a way I can keep reading data (the actual read process is
    done in the old poll fashion). I guess I need something like a KeepAlive
    type of mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+...@googlegroups.com <javascript:>.
    To post to this group, send email to akka...@googlegroups.com
    <javascript:>.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • Viktor Klang at Feb 7, 2016 at 8:49 pm

    On Sun, Feb 7, 2016 at 2:17 PM, wrote:

    Hi Victor,
    thank you for your reply and here are my remarks :

    Point 1. You have a source of "events" and the only way you know if there
    are any events, is if you ask for events

    exactly

    Point 2.You want to consume events from this source

    Yes

    Point 3) You only need to consume events from the source if there is
    demand from "downstream"

    It is the other way round. The source logically pushes the events
    downstream.
    Ideally as soon as theses Events arrive I would like them to be pushed
    downstream. So its more a push scenario.
    The source pushes as many Events as the downstream processes can handle.
    (But I have to limit myself for polling the database at a rate of 2-5
    seconds because of hardware reasons)
    You only want to send data if the downstream can handle it, and the source
    knows that by keeping track of downstream demand.

    Point 4) Does the source of events ever end, and how do you know?

    No the source never ends. External Systems always insert new Event records
    into the database

    Point 5. can you ask for a specified number of events or does polling
    imply reading all available events?

    No I can ask for a specified number of events.

    Here is a concrete business case:
    1) External systems want Email Messages to be emailed to Users.
    2) They insert Email Events (Records with email-address, content,
    user_id,etc) into a Email Table (log-table)
    3) Email-Send-Service sends these Emails over the wire
    4) Acknowledge-Service logs the Email-Send Event and cleans up the Email
    Table (log-table).

    So I can set up a Stream: Email-Source -> Email-Send-Service -
    Acknowledge-Service .

    I am now considering two ways of doing it and are hoping for your advice
    (-:
    1) create the following
    Email-Source (select * from Email limit 1000) -> Email-Send-Service
    -> Acknowledge-Service
    and materialize this stream every two seconds

    2) Set up a custom source which polls forever
    Polling-Neverending-Email-Source (select * from Email limit 1000
    every two seconds inside the Actor) -> Email-Send-Service ->
    Acknowledge-Service
    Sounds like you should be able to, construct your source as something like
    this:

    <poller> + <throttle> + <mapConcat(identity) + <buffer>

    This means that the polling gets throttled, and the mapConcat is if you
    read more than 1 event per poll and the buffer is there to make sure that
    the demand that gets to the poller is > 1 to increase throughput.


    Background: I am building up with Akka Streams a inhouse toolkit which
    deals with many ESB typical processes.
    The above usecase profits not so much from throughput (actually the users
    could live with some delay) but more from
    reusing stream components and having a nice clear Flow DSL.

    Many Greetings
    John
















    Am Sonntag, 7. Februar 2016 13:05:59 UTC+1 schrieb √:
    Hi John!

    I think I can help you, but I have some follow-on questions :)
    On Sun, Feb 7, 2016 at 12:52 PM, wrote:

    Hi Victor,
    I know you love simple elegant code but
    I looked at unfoldAsync and I don't see that it solves the usecase I
    have in mind.

    I'll try to explain why because I am not sure if I am overlooking
    something obvious?

    An external system writes events into a log table. Since this is a
    non-reactive-sql-database the consumer needs to poll the log table (for
    example every 2 seconds) for new events.
    (Why) does it need to poll the log table if there is no demand for events?

    To convert this scenario into a streaming source using unfoldAsync
    I need to implement a polling loop inside the future so that
    Future.success gets only called when new events are inserted in the log
    table.
    This seems much like a technical aspect rather than a requirement.

    If I don't use polling the future would send 0 Events upstream and the
    stream would come to an end?
    Events go downstream, or did I misunderstand something?

    That's why I like using instead of unfoldAsync an Actor like JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    as an Source. Within the Actor I can be more fine grained and use the
    scheduler to implement the polling logic.
    Let's take a step back, what are the actual requirements?

    1. You have a source of "events" and the only way you know if there are
    any events, is if you ask for events
    2. You want to consume events from this source
    3. You only need to consume events from the source if there is demand
    from "downstream"
    4. Does the source of events ever end, and how do you know?
    5. can you ask for a specified number of events or does polling imply
    reading all available events?

    Many Greetings
    John











    Am Mittwoch, 3. Februar 2016 23:10:15 UTC+1 schrieb √:

    http://doc.akka.io/api/akka-stream-and-http-experimental/2.0.3/?_ga=1.45749860.1579561034.1353497989#akka.stream.scaladsl.Source$
    On Wed, Feb 3, 2016 at 11:06 PM, wrote:

    where do I find unfold/unfoldAsync ? I looked at
    http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/stages-overview.html
    ?
    Many Greetings
    John


    Am Mittwoch, 3. Februar 2016 20:51:17 UTC+1 schrieb √:
    I don't see why you'd need to write a custom GraphStage for this.

    unfold/unfoldAsync paired with a buffer with an appropriate dropping
    policy should work, no?
    On Wed, Feb 3, 2016 at 8:47 PM, clca wrote:

    Yes this is close to what I need to do.
    The code in the onPull method is something like

    val m = ReadFromService(...)
    if(m != null)
    push(out, m)

    in a traditional app reading would be done in a loop

    while(true) {
    val m = ReadFromService(...)
    //do something with the new message
    }

    So I'll add a mechanism in the Source to keep polling the external
    service.

    Thanks!


    On Wednesday, February 3, 2016 at 12:55:16 AM UTC-8,
    john....@gmail.com wrote:
    I've done something similar.
    I adapted this JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    When no data is available( for example when it recieves a Request(16)
    Messag) it starts a" polling Actor" which polls an external Database for
    more data.
    Does this help?

    Am Mittwoch, 3. Februar 2016 08:33:16 UTC+1 schrieb clca:
    I searched through the topics but I could not find any example on
    how to pull from a Source in the case where data is coming in bursts. I
    built a customized Source that read from an external source. Data is coming
    in bursts, the flow can stop for a while so I need to keep pulling the
    Source in such a way I can keep reading data (the actual read process is
    done in the old poll fashion). I guess I need something like a KeepAlive
    type of mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • Endre Varga at Feb 8, 2016 at 10:52 am
    On Sun, Feb 7, 2016 at 9:49 PM, Viktor Klang wrote:
    On Sun, Feb 7, 2016 at 2:17 PM, wrote:

    Hi Victor,
    thank you for your reply and here are my remarks :

    Point 1. You have a source of "events" and the only way you know if there
    are any events, is if you ask for events

    exactly

    Point 2.You want to consume events from this source

    Yes

    Point 3) You only need to consume events from the source if there is
    demand from "downstream"

    It is the other way round. The source logically pushes the events
    downstream.
    Ideally as soon as theses Events arrive I would like them to be pushed
    downstream. So its more a push scenario.
    The source pushes as many Events as the downstream processes can handle.
    (But I have to limit myself for polling the database at a rate of 2-5
    seconds because of hardware reasons)
    You only want to send data if the downstream can handle it, and the source
    knows that by keeping track of downstream demand.

    Point 4) Does the source of events ever end, and how do you know?

    No the source never ends. External Systems always insert new Event
    records into the database

    Point 5. can you ask for a specified number of events or does polling
    imply reading all available events?

    No I can ask for a specified number of events.

    Here is a concrete business case:
    1) External systems want Email Messages to be emailed to Users.
    2) They insert Email Events (Records with email-address, content,
    user_id,etc) into a Email Table (log-table)
    3) Email-Send-Service sends these Emails over the wire
    4) Acknowledge-Service logs the Email-Send Event and cleans up the Email
    Table (log-table).

    So I can set up a Stream: Email-Source -> Email-Send-Service -
    Acknowledge-Service .

    I am now considering two ways of doing it and are hoping for your advice
    (-:
    1) create the following
    Email-Source (select * from Email limit 1000) ->
    Email-Send-Service -> Acknowledge-Service
    and materialize this stream every two seconds

    2) Set up a custom source which polls forever
    Polling-Neverending-Email-Source (select * from Email limit 1000
    every two seconds inside the Actor) -> Email-Send-Service ->
    Acknowledge-Service
    Sounds like you should be able to, construct your source as something like
    this:

    <poller> + <throttle> + <mapConcat(identity) + <buffer>

    This means that the polling gets throttled, and the mapConcat is if you
    read more than 1 event per poll and the buffer is there to make sure that
    the demand that gets to the poller is > 1 to increase throughput.
    On the other hand buffer detaches the upstream, so the poller will be
    called even when there is no downstream demand, causing it to return
    potentially stale data from the buffer if there are long pauses in
    downstream consumption. So this is a tradeoff, not necessarily a bad one,
    but one to be aware of.

    -Endre



    Background: I am building up with Akka Streams a inhouse toolkit which
    deals with many ESB typical processes.
    The above usecase profits not so much from throughput (actually the users
    could live with some delay) but more from
    reusing stream components and having a nice clear Flow DSL.

    Many Greetings
    John
















    Am Sonntag, 7. Februar 2016 13:05:59 UTC+1 schrieb √:
    Hi John!

    I think I can help you, but I have some follow-on questions :)
    On Sun, Feb 7, 2016 at 12:52 PM, wrote:

    Hi Victor,
    I know you love simple elegant code but
    I looked at unfoldAsync and I don't see that it solves the usecase I
    have in mind.

    I'll try to explain why because I am not sure if I am overlooking
    something obvious?

    An external system writes events into a log table. Since this is a
    non-reactive-sql-database the consumer needs to poll the log table (for
    example every 2 seconds) for new events.
    (Why) does it need to poll the log table if there is no demand for
    events?

    To convert this scenario into a streaming source using unfoldAsync
    I need to implement a polling loop inside the future so that
    Future.success gets only called when new events are inserted in the log
    table.
    This seems much like a technical aspect rather than a requirement.

    If I don't use polling the future would send 0 Events upstream and the
    stream would come to an end?
    Events go downstream, or did I misunderstand something?

    That's why I like using instead of unfoldAsync an Actor like
    JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    as an Source. Within the Actor I can be more fine grained and use the
    scheduler to implement the polling logic.
    Let's take a step back, what are the actual requirements?

    1. You have a source of "events" and the only way you know if there are
    any events, is if you ask for events
    2. You want to consume events from this source
    3. You only need to consume events from the source if there is demand
    from "downstream"
    4. Does the source of events ever end, and how do you know?
    5. can you ask for a specified number of events or does polling imply
    reading all available events?

    Many Greetings
    John











    Am Mittwoch, 3. Februar 2016 23:10:15 UTC+1 schrieb √:

    http://doc.akka.io/api/akka-stream-and-http-experimental/2.0.3/?_ga=1.45749860.1579561034.1353497989#akka.stream.scaladsl.Source$
    On Wed, Feb 3, 2016 at 11:06 PM, wrote:

    where do I find unfold/unfoldAsync ? I looked at
    http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/stages-overview.html
    ?
    Many Greetings
    John


    Am Mittwoch, 3. Februar 2016 20:51:17 UTC+1 schrieb √:
    I don't see why you'd need to write a custom GraphStage for this.

    unfold/unfoldAsync paired with a buffer with an appropriate dropping
    policy should work, no?
    On Wed, Feb 3, 2016 at 8:47 PM, clca wrote:

    Yes this is close to what I need to do.
    The code in the onPull method is something like

    val m = ReadFromService(...)
    if(m != null)
    push(out, m)

    in a traditional app reading would be done in a loop

    while(true) {
    val m = ReadFromService(...)
    //do something with the new message
    }

    So I'll add a mechanism in the Source to keep polling the external
    service.

    Thanks!


    On Wednesday, February 3, 2016 at 12:55:16 AM UTC-8,
    john....@gmail.com wrote:
    I've done something similar.
    I adapted this JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    When no data is available( for example when it recieves a Request(16)
    Messag) it starts a" polling Actor" which polls an external Database for
    more data.
    Does this help?

    Am Mittwoch, 3. Februar 2016 08:33:16 UTC+1 schrieb clca:
    I searched through the topics but I could not find any example on
    how to pull from a Source in the case where data is coming in bursts. I
    built a customized Source that read from an external source. Data is coming
    in bursts, the flow can stop for a while so I need to keep pulling the
    Source in such a way I can keep reading data (the actual read process is
    done in the old poll fashion). I guess I need something like a KeepAlive
    type of mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • Viktor Klang at Feb 8, 2016 at 10:53 am
    If the Buffer has Backpressure as overflow strategy I don't see how it
    decouples upstream from downstream.
    On Mon, Feb 8, 2016 at 11:52 AM, Endre Varga wrote:


    On Sun, Feb 7, 2016 at 9:49 PM, Viktor Klang wrote:


    On Sun, Feb 7, 2016 at 2:17 PM, wrote:

    Hi Victor,
    thank you for your reply and here are my remarks :

    Point 1. You have a source of "events" and the only way you know if
    there are any events, is if you ask for events

    exactly

    Point 2.You want to consume events from this source

    Yes

    Point 3) You only need to consume events from the source if there is
    demand from "downstream"

    It is the other way round. The source logically pushes the events
    downstream.
    Ideally as soon as theses Events arrive I would like them to be pushed
    downstream. So its more a push scenario.
    The source pushes as many Events as the downstream processes can handle.
    (But I have to limit myself for polling the database at a rate of 2-5
    seconds because of hardware reasons)
    You only want to send data if the downstream can handle it, and the
    source knows that by keeping track of downstream demand.

    Point 4) Does the source of events ever end, and how do you know?

    No the source never ends. External Systems always insert new Event
    records into the database

    Point 5. can you ask for a specified number of events or does polling
    imply reading all available events?

    No I can ask for a specified number of events.

    Here is a concrete business case:
    1) External systems want Email Messages to be emailed to Users.
    2) They insert Email Events (Records with email-address, content,
    user_id,etc) into a Email Table (log-table)
    3) Email-Send-Service sends these Emails over the wire
    4) Acknowledge-Service logs the Email-Send Event and cleans up the
    Email Table (log-table).

    So I can set up a Stream: Email-Source -> Email-Send-Service -
    Acknowledge-Service .

    I am now considering two ways of doing it and are hoping for your advice
    (-:
    1) create the following
    Email-Source (select * from Email limit 1000) ->
    Email-Send-Service -> Acknowledge-Service
    and materialize this stream every two seconds

    2) Set up a custom source which polls forever
    Polling-Neverending-Email-Source (select * from Email limit 1000
    every two seconds inside the Actor) -> Email-Send-Service ->
    Acknowledge-Service
    Sounds like you should be able to, construct your source as something
    like this:

    <poller> + <throttle> + <mapConcat(identity) + <buffer>

    This means that the polling gets throttled, and the mapConcat is if you
    read more than 1 event per poll and the buffer is there to make sure that
    the demand that gets to the poller is > 1 to increase throughput.
    On the other hand buffer detaches the upstream, so the poller will be
    called even when there is no downstream demand, causing it to return
    potentially stale data from the buffer if there are long pauses in
    downstream consumption. So this is a tradeoff, not necessarily a bad one,
    but one to be aware of.

    -Endre



    Background: I am building up with Akka Streams a inhouse toolkit which
    deals with many ESB typical processes.
    The above usecase profits not so much from throughput (actually the
    users could live with some delay) but more from
    reusing stream components and having a nice clear Flow DSL.

    Many Greetings
    John
















    Am Sonntag, 7. Februar 2016 13:05:59 UTC+1 schrieb √:
    Hi John!

    I think I can help you, but I have some follow-on questions :)
    On Sun, Feb 7, 2016 at 12:52 PM, wrote:

    Hi Victor,
    I know you love simple elegant code but
    I looked at unfoldAsync and I don't see that it solves the usecase I
    have in mind.

    I'll try to explain why because I am not sure if I am overlooking
    something obvious?

    An external system writes events into a log table. Since this is a
    non-reactive-sql-database the consumer needs to poll the log table (for
    example every 2 seconds) for new events.
    (Why) does it need to poll the log table if there is no demand for
    events?

    To convert this scenario into a streaming source using unfoldAsync
    I need to implement a polling loop inside the future so that
    Future.success gets only called when new events are inserted in the log
    table.
    This seems much like a technical aspect rather than a requirement.

    If I don't use polling the future would send 0 Events upstream and the
    stream would come to an end?
    Events go downstream, or did I misunderstand something?

    That's why I like using instead of unfoldAsync an Actor like
    JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    as an Source. Within the Actor I can be more fine grained and use the
    scheduler to implement the polling logic.
    Let's take a step back, what are the actual requirements?

    1. You have a source of "events" and the only way you know if there are
    any events, is if you ask for events
    2. You want to consume events from this source
    3. You only need to consume events from the source if there is demand
    from "downstream"
    4. Does the source of events ever end, and how do you know?
    5. can you ask for a specified number of events or does polling imply
    reading all available events?

    Many Greetings
    John











    Am Mittwoch, 3. Februar 2016 23:10:15 UTC+1 schrieb √:

    http://doc.akka.io/api/akka-stream-and-http-experimental/2.0.3/?_ga=1.45749860.1579561034.1353497989#akka.stream.scaladsl.Source$
    On Wed, Feb 3, 2016 at 11:06 PM, wrote:

    where do I find unfold/unfoldAsync ? I looked at
    http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/stages-overview.html
    ?
    Many Greetings
    John


    Am Mittwoch, 3. Februar 2016 20:51:17 UTC+1 schrieb √:
    I don't see why you'd need to write a custom GraphStage for this.

    unfold/unfoldAsync paired with a buffer with an appropriate
    dropping policy should work, no?
    On Wed, Feb 3, 2016 at 8:47 PM, clca wrote:

    Yes this is close to what I need to do.
    The code in the onPull method is something like

    val m = ReadFromService(...)
    if(m != null)
    push(out, m)

    in a traditional app reading would be done in a loop

    while(true) {
    val m = ReadFromService(...)
    //do something with the new message
    }

    So I'll add a mechanism in the Source to keep polling the
    external service.

    Thanks!


    On Wednesday, February 3, 2016 at 12:55:16 AM UTC-8,
    john....@gmail.com wrote:
    I've done something similar.
    I adapted this JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    When no data is available( for example when it recieves a Request(16)
    Messag) it starts a" polling Actor" which polls an external Database for
    more data.
    Does this help?

    Am Mittwoch, 3. Februar 2016 08:33:16 UTC+1 schrieb clca:
    I searched through the topics but I could not find any example
    on how to pull from a Source in the case where data is coming in bursts. I
    built a customized Source that read from an external source. Data is coming
    in bursts, the flow can stop for a while so I need to keep pulling the
    Source in such a way I can keep reading data (the actual read process is
    done in the old poll fashion). I guess I need something like a KeepAlive
    type of mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • Endre Varga at Feb 8, 2016 at 10:56 am

    On Mon, Feb 8, 2016 at 11:53 AM, Viktor Klang wrote:

    If the Buffer has Backpressure as overflow strategy I don't see how it
    decouples upstream from downstream.
    That is a dangerous move though, since you put the buffer *after* the
    mapConcat, so this means that dropping will be independent of the batch
    boundaries. I.e. the buffer might hold 1 full batch and the half, dopping
    the other half. I think in this case it is better to use a more flexible
    tool there and "batcher" seems to be the right one (it is a conflate-like
    op with capacity, so can implement "smart" buffers). Or, mapConcat should
    not be used here. I guess it depends on the use case.

    -Endre

    On Mon, Feb 8, 2016 at 11:52 AM, Endre Varga wrote:



    On Sun, Feb 7, 2016 at 9:49 PM, Viktor Klang <viktor.klang@gmail.com>
    wrote:
    On Sun, Feb 7, 2016 at 2:17 PM, wrote:

    Hi Victor,
    thank you for your reply and here are my remarks :

    Point 1. You have a source of "events" and the only way you know if
    there are any events, is if you ask for events

    exactly

    Point 2.You want to consume events from this source

    Yes

    Point 3) You only need to consume events from the source if there is
    demand from "downstream"

    It is the other way round. The source logically pushes the events
    downstream.
    Ideally as soon as theses Events arrive I would like them to be pushed
    downstream. So its more a push scenario.
    The source pushes as many Events as the downstream processes can
    handle.
    (But I have to limit myself for polling the database at a rate of 2-5
    seconds because of hardware reasons)
    You only want to send data if the downstream can handle it, and the
    source knows that by keeping track of downstream demand.

    Point 4) Does the source of events ever end, and how do you know?

    No the source never ends. External Systems always insert new Event
    records into the database

    Point 5. can you ask for a specified number of events or does polling
    imply reading all available events?

    No I can ask for a specified number of events.

    Here is a concrete business case:
    1) External systems want Email Messages to be emailed to Users.
    2) They insert Email Events (Records with email-address, content,
    user_id,etc) into a Email Table (log-table)
    3) Email-Send-Service sends these Emails over the wire
    4) Acknowledge-Service logs the Email-Send Event and cleans up the
    Email Table (log-table).

    So I can set up a Stream: Email-Source -> Email-Send-Service -
    Acknowledge-Service .

    I am now considering two ways of doing it and are hoping for your
    advice (-:
    1) create the following
    Email-Source (select * from Email limit 1000) ->
    Email-Send-Service -> Acknowledge-Service
    and materialize this stream every two seconds

    2) Set up a custom source which polls forever
    Polling-Neverending-Email-Source (select * from Email limit 1000
    every two seconds inside the Actor) -> Email-Send-Service ->
    Acknowledge-Service
    Sounds like you should be able to, construct your source as something
    like this:

    <poller> + <throttle> + <mapConcat(identity) + <buffer>

    This means that the polling gets throttled, and the mapConcat is if you
    read more than 1 event per poll and the buffer is there to make sure that
    the demand that gets to the poller is > 1 to increase throughput.
    On the other hand buffer detaches the upstream, so the poller will be
    called even when there is no downstream demand, causing it to return
    potentially stale data from the buffer if there are long pauses in
    downstream consumption. So this is a tradeoff, not necessarily a bad one,
    but one to be aware of.

    -Endre



    Background: I am building up with Akka Streams a inhouse toolkit which
    deals with many ESB typical processes.
    The above usecase profits not so much from throughput (actually the
    users could live with some delay) but more from
    reusing stream components and having a nice clear Flow DSL.

    Many Greetings
    John
















    Am Sonntag, 7. Februar 2016 13:05:59 UTC+1 schrieb √:
    Hi John!

    I think I can help you, but I have some follow-on questions :)
    On Sun, Feb 7, 2016 at 12:52 PM, wrote:

    Hi Victor,
    I know you love simple elegant code but
    I looked at unfoldAsync and I don't see that it solves the usecase I
    have in mind.

    I'll try to explain why because I am not sure if I am overlooking
    something obvious?

    An external system writes events into a log table. Since this is a
    non-reactive-sql-database the consumer needs to poll the log table (for
    example every 2 seconds) for new events.
    (Why) does it need to poll the log table if there is no demand for
    events?

    To convert this scenario into a streaming source using unfoldAsync
    I need to implement a polling loop inside the future so that
    Future.success gets only called when new events are inserted in the log
    table.
    This seems much like a technical aspect rather than a requirement.

    If I don't use polling the future would send 0 Events upstream and
    the stream would come to an end?
    Events go downstream, or did I misunderstand something?

    That's why I like using instead of unfoldAsync an Actor like
    JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    as an Source. Within the Actor I can be more fine grained and use the
    scheduler to implement the polling logic.
    Let's take a step back, what are the actual requirements?

    1. You have a source of "events" and the only way you know if there
    are any events, is if you ask for events
    2. You want to consume events from this source
    3. You only need to consume events from the source if there is demand
    from "downstream"
    4. Does the source of events ever end, and how do you know?
    5. can you ask for a specified number of events or does polling imply
    reading all available events?

    Many Greetings
    John











    Am Mittwoch, 3. Februar 2016 23:10:15 UTC+1 schrieb √:

    http://doc.akka.io/api/akka-stream-and-http-experimental/2.0.3/?_ga=1.45749860.1579561034.1353497989#akka.stream.scaladsl.Source$
    On Wed, Feb 3, 2016 at 11:06 PM, wrote:

    where do I find unfold/unfoldAsync ? I looked at
    http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/stages-overview.html
    ?
    Many Greetings
    John


    Am Mittwoch, 3. Februar 2016 20:51:17 UTC+1 schrieb √:
    I don't see why you'd need to write a custom GraphStage for this.

    unfold/unfoldAsync paired with a buffer with an appropriate
    dropping policy should work, no?
    On Wed, Feb 3, 2016 at 8:47 PM, clca wrote:

    Yes this is close to what I need to do.
    The code in the onPull method is something like

    val m = ReadFromService(...)
    if(m != null)
    push(out, m)

    in a traditional app reading would be done in a loop

    while(true) {
    val m = ReadFromService(...)
    //do something with the new message
    }

    So I'll add a mechanism in the Source to keep polling the
    external service.

    Thanks!


    On Wednesday, February 3, 2016 at 12:55:16 AM UTC-8,
    john....@gmail.com wrote:
    I've done something similar.
    I adapted this JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    When no data is available( for example when it recieves a Request(16)
    Messag) it starts a" polling Actor" which polls an external Database for
    more data.
    Does this help?

    Am Mittwoch, 3. Februar 2016 08:33:16 UTC+1 schrieb clca:
    I searched through the topics but I could not find any example
    on how to pull from a Source in the case where data is coming in bursts. I
    built a customized Source that read from an external source. Data is coming
    in bursts, the flow can stop for a while so I need to keep pulling the
    Source in such a way I can keep reading data (the actual read process is
    done in the old poll fashion). I guess I need something like a KeepAlive
    type of mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the
    Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • Viktor Klang at Feb 8, 2016 at 11:06 am
    What? How would anything be dropped if buffer has Backpressure as
    OverflowStrategy?
    On Mon, Feb 8, 2016 at 11:56 AM, Endre Varga wrote:


    On Mon, Feb 8, 2016 at 11:53 AM, Viktor Klang wrote:

    If the Buffer has Backpressure as overflow strategy I don't see how it
    decouples upstream from downstream.
    That is a dangerous move though, since you put the buffer *after* the
    mapConcat, so this means that dropping will be independent of the batch
    boundaries. I.e. the buffer might hold 1 full batch and the half, dopping
    the other half. I think in this case it is better to use a more flexible
    tool there and "batcher" seems to be the right one (it is a conflate-like
    op with capacity, so can implement "smart" buffers). Or, mapConcat should
    not be used here. I guess it depends on the use case.

    -Endre

    On Mon, Feb 8, 2016 at 11:52 AM, Endre Varga <endre.varga@typesafe.com>
    wrote:

    On Sun, Feb 7, 2016 at 9:49 PM, Viktor Klang <viktor.klang@gmail.com>
    wrote:
    On Sun, Feb 7, 2016 at 2:17 PM, wrote:

    Hi Victor,
    thank you for your reply and here are my remarks :

    Point 1. You have a source of "events" and the only way you know if
    there are any events, is if you ask for events

    exactly

    Point 2.You want to consume events from this source

    Yes

    Point 3) You only need to consume events from the source if there is
    demand from "downstream"

    It is the other way round. The source logically pushes the events
    downstream.
    Ideally as soon as theses Events arrive I would like them to be
    pushed downstream. So its more a push scenario.
    The source pushes as many Events as the downstream processes can
    handle.
    (But I have to limit myself for polling the database at a rate of 2-5
    seconds because of hardware reasons)
    You only want to send data if the downstream can handle it, and the
    source knows that by keeping track of downstream demand.

    Point 4) Does the source of events ever end, and how do you know?

    No the source never ends. External Systems always insert new Event
    records into the database

    Point 5. can you ask for a specified number of events or does
    polling imply reading all available events?

    No I can ask for a specified number of events.

    Here is a concrete business case:
    1) External systems want Email Messages to be emailed to Users.
    2) They insert Email Events (Records with email-address, content,
    user_id,etc) into a Email Table (log-table)
    3) Email-Send-Service sends these Emails over the wire
    4) Acknowledge-Service logs the Email-Send Event and cleans up the
    Email Table (log-table).

    So I can set up a Stream: Email-Source -> Email-Send-Service -
    Acknowledge-Service .

    I am now considering two ways of doing it and are hoping for your
    advice (-:
    1) create the following
    Email-Source (select * from Email limit 1000) ->
    Email-Send-Service -> Acknowledge-Service
    and materialize this stream every two seconds

    2) Set up a custom source which polls forever
    Polling-Neverending-Email-Source (select * from Email limit 1000
    every two seconds inside the Actor) -> Email-Send-Service ->
    Acknowledge-Service
    Sounds like you should be able to, construct your source as something
    like this:

    <poller> + <throttle> + <mapConcat(identity) + <buffer>

    This means that the polling gets throttled, and the mapConcat is if you
    read more than 1 event per poll and the buffer is there to make sure that
    the demand that gets to the poller is > 1 to increase throughput.
    On the other hand buffer detaches the upstream, so the poller will be
    called even when there is no downstream demand, causing it to return
    potentially stale data from the buffer if there are long pauses in
    downstream consumption. So this is a tradeoff, not necessarily a bad one,
    but one to be aware of.

    -Endre



    Background: I am building up with Akka Streams a inhouse toolkit which
    deals with many ESB typical processes.
    The above usecase profits not so much from throughput (actually the
    users could live with some delay) but more from
    reusing stream components and having a nice clear Flow DSL.

    Many Greetings
    John
















    Am Sonntag, 7. Februar 2016 13:05:59 UTC+1 schrieb √:
    Hi John!

    I think I can help you, but I have some follow-on questions :)
    On Sun, Feb 7, 2016 at 12:52 PM, wrote:

    Hi Victor,
    I know you love simple elegant code but
    I looked at unfoldAsync and I don't see that it solves the usecase
    I have in mind.

    I'll try to explain why because I am not sure if I am overlooking
    something obvious?

    An external system writes events into a log table. Since this is a
    non-reactive-sql-database the consumer needs to poll the log table (for
    example every 2 seconds) for new events.
    (Why) does it need to poll the log table if there is no demand for
    events?

    To convert this scenario into a streaming source using unfoldAsync
    I need to implement a polling loop inside the future so that
    Future.success gets only called when new events are inserted in the log
    table.
    This seems much like a technical aspect rather than a requirement.

    If I don't use polling the future would send 0 Events upstream and
    the stream would come to an end?
    Events go downstream, or did I misunderstand something?

    That's why I like using instead of unfoldAsync an Actor like
    JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    as an Source. Within the Actor I can be more fine grained and use the
    scheduler to implement the polling logic.
    Let's take a step back, what are the actual requirements?

    1. You have a source of "events" and the only way you know if there
    are any events, is if you ask for events
    2. You want to consume events from this source
    3. You only need to consume events from the source if there is demand
    from "downstream"
    4. Does the source of events ever end, and how do you know?
    5. can you ask for a specified number of events or does polling imply
    reading all available events?

    Many Greetings
    John











    Am Mittwoch, 3. Februar 2016 23:10:15 UTC+1 schrieb √:

    http://doc.akka.io/api/akka-stream-and-http-experimental/2.0.3/?_ga=1.45749860.1579561034.1353497989#akka.stream.scaladsl.Source$
    On Wed, Feb 3, 2016 at 11:06 PM, wrote:

    where do I find unfold/unfoldAsync ? I looked at
    http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/stages-overview.html
    ?
    Many Greetings
    John


    Am Mittwoch, 3. Februar 2016 20:51:17 UTC+1 schrieb √:
    I don't see why you'd need to write a custom GraphStage for this.

    unfold/unfoldAsync paired with a buffer with an appropriate
    dropping policy should work, no?
    On Wed, Feb 3, 2016 at 8:47 PM, clca wrote:

    Yes this is close to what I need to do.
    The code in the onPull method is something like

    val m = ReadFromService(...)
    if(m != null)
    push(out, m)

    in a traditional app reading would be done in a loop

    while(true) {
    val m = ReadFromService(...)
    //do something with the new message
    }

    So I'll add a mechanism in the Source to keep polling the
    external service.

    Thanks!


    On Wednesday, February 3, 2016 at 12:55:16 AM UTC-8,
    john....@gmail.com wrote:
    I've done something similar.
    I adapted this JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    When no data is available( for example when it recieves a Request(16)
    Messag) it starts a" polling Actor" which polls an external Database for
    more data.
    Does this help?

    Am Mittwoch, 3. Februar 2016 08:33:16 UTC+1 schrieb clca:
    I searched through the topics but I could not find any example
    on how to pull from a Source in the case where data is coming in bursts. I
    built a customized Source that read from an external source. Data is coming
    in bursts, the flow can stop for a while so I need to keep pulling the
    Source in such a way I can keep reading data (the actual read process is
    done in the old poll fashion). I guess I need something like a KeepAlive
    type of mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the
    Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from
    it, send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • Endre Varga at Feb 8, 2016 at 11:20 am
    Ah, you meant Backpressure. But then what I originally said is true. Just
    think about it:

      - buffer requests
      - buffer is enough to contain the next batch emitted by mapConcat
      - buffer hence stores some result
      - downstream now asks for next batch, but 3 hours later
      - emitted result is now 3 hours old

    -Endre
    On Mon, Feb 8, 2016 at 12:05 PM, Viktor Klang wrote:

    What? How would anything be dropped if buffer has Backpressure as
    OverflowStrategy?
    On Mon, Feb 8, 2016 at 11:56 AM, Endre Varga wrote:



    On Mon, Feb 8, 2016 at 11:53 AM, Viktor Klang <viktor.klang@gmail.com>
    wrote:
    If the Buffer has Backpressure as overflow strategy I don't see how it
    decouples upstream from downstream.
    That is a dangerous move though, since you put the buffer *after* the
    mapConcat, so this means that dropping will be independent of the batch
    boundaries. I.e. the buffer might hold 1 full batch and the half, dopping
    the other half. I think in this case it is better to use a more flexible
    tool there and "batcher" seems to be the right one (it is a conflate-like
    op with capacity, so can implement "smart" buffers). Or, mapConcat should
    not be used here. I guess it depends on the use case.

    -Endre

    On Mon, Feb 8, 2016 at 11:52 AM, Endre Varga <endre.varga@typesafe.com>
    wrote:

    On Sun, Feb 7, 2016 at 9:49 PM, Viktor Klang <viktor.klang@gmail.com>
    wrote:
    On Sun, Feb 7, 2016 at 2:17 PM, wrote:

    Hi Victor,
    thank you for your reply and here are my remarks :

    Point 1. You have a source of "events" and the only way you know if
    there are any events, is if you ask for events

    exactly

    Point 2.You want to consume events from this source

    Yes

    Point 3) You only need to consume events from the source if there is
    demand from "downstream"

    It is the other way round. The source logically pushes the events
    downstream.
    Ideally as soon as theses Events arrive I would like them to be
    pushed downstream. So its more a push scenario.
    The source pushes as many Events as the downstream processes can
    handle.
    (But I have to limit myself for polling the database at a rate of 2-5
    seconds because of hardware reasons)
    You only want to send data if the downstream can handle it, and the
    source knows that by keeping track of downstream demand.

    Point 4) Does the source of events ever end, and how do you know?

    No the source never ends. External Systems always insert new Event
    records into the database

    Point 5. can you ask for a specified number of events or does
    polling imply reading all available events?

    No I can ask for a specified number of events.

    Here is a concrete business case:
    1) External systems want Email Messages to be emailed to Users.
    2) They insert Email Events (Records with email-address, content,
    user_id,etc) into a Email Table (log-table)
    3) Email-Send-Service sends these Emails over the wire
    4) Acknowledge-Service logs the Email-Send Event and cleans up the
    Email Table (log-table).

    So I can set up a Stream: Email-Source -> Email-Send-Service -
    Acknowledge-Service .

    I am now considering two ways of doing it and are hoping for your
    advice (-:
    1) create the following
    Email-Source (select * from Email limit 1000) ->
    Email-Send-Service -> Acknowledge-Service
    and materialize this stream every two seconds

    2) Set up a custom source which polls forever
    Polling-Neverending-Email-Source (select * from Email limit 1000
    every two seconds inside the Actor) -> Email-Send-Service ->
    Acknowledge-Service
    Sounds like you should be able to, construct your source as something
    like this:

    <poller> + <throttle> + <mapConcat(identity) + <buffer>

    This means that the polling gets throttled, and the mapConcat is if
    you read more than 1 event per poll and the buffer is there to make sure
    that the demand that gets to the poller is > 1 to increase throughput.
    On the other hand buffer detaches the upstream, so the poller will be
    called even when there is no downstream demand, causing it to return
    potentially stale data from the buffer if there are long pauses in
    downstream consumption. So this is a tradeoff, not necessarily a bad one,
    but one to be aware of.

    -Endre



    Background: I am building up with Akka Streams a inhouse toolkit
    which deals with many ESB typical processes.
    The above usecase profits not so much from throughput (actually the
    users could live with some delay) but more from
    reusing stream components and having a nice clear Flow DSL.

    Many Greetings
    John
















    Am Sonntag, 7. Februar 2016 13:05:59 UTC+1 schrieb √:
    Hi John!

    I think I can help you, but I have some follow-on questions :)
    On Sun, Feb 7, 2016 at 12:52 PM, wrote:

    Hi Victor,
    I know you love simple elegant code but
    I looked at unfoldAsync and I don't see that it solves the usecase
    I have in mind.

    I'll try to explain why because I am not sure if I am overlooking
    something obvious?

    An external system writes events into a log table. Since this is a
    non-reactive-sql-database the consumer needs to poll the log table (for
    example every 2 seconds) for new events.
    (Why) does it need to poll the log table if there is no demand for
    events?

    To convert this scenario into a streaming source using unfoldAsync
    I need to implement a polling loop inside the future so that
    Future.success gets only called when new events are inserted in the log
    table.
    This seems much like a technical aspect rather than a requirement.

    If I don't use polling the future would send 0 Events upstream and
    the stream would come to an end?
    Events go downstream, or did I misunderstand something?

    That's why I like using instead of unfoldAsync an Actor like
    JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    as an Source. Within the Actor I can be more fine grained and use the
    scheduler to implement the polling logic.
    Let's take a step back, what are the actual requirements?

    1. You have a source of "events" and the only way you know if there
    are any events, is if you ask for events
    2. You want to consume events from this source
    3. You only need to consume events from the source if there is
    demand from "downstream"
    4. Does the source of events ever end, and how do you know?
    5. can you ask for a specified number of events or does polling
    imply reading all available events?

    Many Greetings
    John











    Am Mittwoch, 3. Februar 2016 23:10:15 UTC+1 schrieb √:

    http://doc.akka.io/api/akka-stream-and-http-experimental/2.0.3/?_ga=1.45749860.1579561034.1353497989#akka.stream.scaladsl.Source$
    On Wed, Feb 3, 2016 at 11:06 PM, wrote:

    where do I find unfold/unfoldAsync ? I looked at
    http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/stages-overview.html
    ?
    Many Greetings
    John


    Am Mittwoch, 3. Februar 2016 20:51:17 UTC+1 schrieb √:
    I don't see why you'd need to write a custom GraphStage for this.

    unfold/unfoldAsync paired with a buffer with an appropriate
    dropping policy should work, no?

    On Wed, Feb 3, 2016 at 8:47 PM, clca <clau...@outlook.com>
    wrote:
    Yes this is close to what I need to do.
    The code in the onPull method is something like

    val m = ReadFromService(...)
    if(m != null)
    push(out, m)

    in a traditional app reading would be done in a loop

    while(true) {
    val m = ReadFromService(...)
    //do something with the new message
    }

    So I'll add a mechanism in the Source to keep polling the
    external service.

    Thanks!


    On Wednesday, February 3, 2016 at 12:55:16 AM UTC-8,
    john....@gmail.com wrote:
    I've done something similar.
    I adapted this JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    When no data is available( for example when it recieves a Request(16)
    Messag) it starts a" polling Actor" which polls an external Database for
    more data.
    Does this help?

    Am Mittwoch, 3. Februar 2016 08:33:16 UTC+1 schrieb clca:
    I searched through the topics but I could not find any
    example on how to pull from a Source in the case where data is coming in
    bursts. I built a customized Source that read from an external source. Data
    is coming in bursts, the flow can stop for a while so I need to keep
    pulling the Source in such a way I can keep reading data (the actual read
    process is done in the old poll fashion). I guess I need something like a
    KeepAlive type of mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the
    Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from
    it, send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the
    Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • Viktor Klang at Feb 8, 2016 at 11:29 am
    How is that a problem? OP only wanted to make sure that the external system
    wasn't polled too often.
    On Mon, Feb 8, 2016 at 12:20 PM, Endre Varga wrote:

    Ah, you meant Backpressure. But then what I originally said is true. Just
    think about it:

    - buffer requests
    - buffer is enough to contain the next batch emitted by mapConcat
    - buffer hence stores some result
    - downstream now asks for next batch, but 3 hours later
    - emitted result is now 3 hours old

    -Endre
    On Mon, Feb 8, 2016 at 12:05 PM, Viktor Klang wrote:

    What? How would anything be dropped if buffer has Backpressure as
    OverflowStrategy?

    On Mon, Feb 8, 2016 at 11:56 AM, Endre Varga <endre.varga@typesafe.com>
    wrote:

    On Mon, Feb 8, 2016 at 11:53 AM, Viktor Klang <viktor.klang@gmail.com>
    wrote:
    If the Buffer has Backpressure as overflow strategy I don't see how it
    decouples upstream from downstream.
    That is a dangerous move though, since you put the buffer *after* the
    mapConcat, so this means that dropping will be independent of the batch
    boundaries. I.e. the buffer might hold 1 full batch and the half, dopping
    the other half. I think in this case it is better to use a more flexible
    tool there and "batcher" seems to be the right one (it is a conflate-like
    op with capacity, so can implement "smart" buffers). Or, mapConcat should
    not be used here. I guess it depends on the use case.

    -Endre

    On Mon, Feb 8, 2016 at 11:52 AM, Endre Varga <endre.varga@typesafe.com>
    wrote:

    On Sun, Feb 7, 2016 at 9:49 PM, Viktor Klang <viktor.klang@gmail.com>
    wrote:
    On Sun, Feb 7, 2016 at 2:17 PM, wrote:

    Hi Victor,
    thank you for your reply and here are my remarks :

    Point 1. You have a source of "events" and the only way you know if
    there are any events, is if you ask for events

    exactly

    Point 2.You want to consume events from this source

    Yes

    Point 3) You only need to consume events from the source if there
    is demand from "downstream"

    It is the other way round. The source logically pushes the events
    downstream.
    Ideally as soon as theses Events arrive I would like them to be
    pushed downstream. So its more a push scenario.
    The source pushes as many Events as the downstream processes can
    handle.
    (But I have to limit myself for polling the database at a rate of
    2-5 seconds because of hardware reasons)
    You only want to send data if the downstream can handle it, and the
    source knows that by keeping track of downstream demand.

    Point 4) Does the source of events ever end, and how do you know?

    No the source never ends. External Systems always insert new Event
    records into the database

    Point 5. can you ask for a specified number of events or does
    polling imply reading all available events?

    No I can ask for a specified number of events.

    Here is a concrete business case:
    1) External systems want Email Messages to be emailed to Users.
    2) They insert Email Events (Records with email-address, content,
    user_id,etc) into a Email Table (log-table)
    3) Email-Send-Service sends these Emails over the wire
    4) Acknowledge-Service logs the Email-Send Event and cleans up the
    Email Table (log-table).

    So I can set up a Stream: Email-Source -> Email-Send-Service -
    Acknowledge-Service .

    I am now considering two ways of doing it and are hoping for your
    advice (-:
    1) create the following
    Email-Source (select * from Email limit 1000) ->
    Email-Send-Service -> Acknowledge-Service
    and materialize this stream every two seconds

    2) Set up a custom source which polls forever
    Polling-Neverending-Email-Source (select * from Email limit
    1000 every two seconds inside the Actor) -> Email-Send-Service ->
    Acknowledge-Service
    Sounds like you should be able to, construct your source as something
    like this:

    <poller> + <throttle> + <mapConcat(identity) + <buffer>

    This means that the polling gets throttled, and the mapConcat is if
    you read more than 1 event per poll and the buffer is there to make sure
    that the demand that gets to the poller is > 1 to increase throughput.
    On the other hand buffer detaches the upstream, so the poller will be
    called even when there is no downstream demand, causing it to return
    potentially stale data from the buffer if there are long pauses in
    downstream consumption. So this is a tradeoff, not necessarily a bad one,
    but one to be aware of.

    -Endre



    Background: I am building up with Akka Streams a inhouse toolkit
    which deals with many ESB typical processes.
    The above usecase profits not so much from throughput (actually the
    users could live with some delay) but more from
    reusing stream components and having a nice clear Flow DSL.

    Many Greetings
    John
















    Am Sonntag, 7. Februar 2016 13:05:59 UTC+1 schrieb √:
    Hi John!

    I think I can help you, but I have some follow-on questions :)
    On Sun, Feb 7, 2016 at 12:52 PM, wrote:

    Hi Victor,
    I know you love simple elegant code but
    I looked at unfoldAsync and I don't see that it solves the
    usecase I have in mind.

    I'll try to explain why because I am not sure if I am overlooking
    something obvious?

    An external system writes events into a log table. Since this is a
    non-reactive-sql-database the consumer needs to poll the log table (for
    example every 2 seconds) for new events.
    (Why) does it need to poll the log table if there is no demand for
    events?

    To convert this scenario into a streaming source using
    unfoldAsync
    I need to implement a polling loop inside the future so that
    Future.success gets only called when new events are inserted in the log
    table.
    This seems much like a technical aspect rather than a requirement.

    If I don't use polling the future would send 0 Events upstream and
    the stream would come to an end?
    Events go downstream, or did I misunderstand something?

    That's why I like using instead of unfoldAsync an Actor like
    JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    as an Source. Within the Actor I can be more fine grained and use the
    scheduler to implement the polling logic.
    Let's take a step back, what are the actual requirements?

    1. You have a source of "events" and the only way you know if there
    are any events, is if you ask for events
    2. You want to consume events from this source
    3. You only need to consume events from the source if there is
    demand from "downstream"
    4. Does the source of events ever end, and how do you know?
    5. can you ask for a specified number of events or does polling
    imply reading all available events?

    Many Greetings
    John











    Am Mittwoch, 3. Februar 2016 23:10:15 UTC+1 schrieb √:

    http://doc.akka.io/api/akka-stream-and-http-experimental/2.0.3/?_ga=1.45749860.1579561034.1353497989#akka.stream.scaladsl.Source$
    On Wed, Feb 3, 2016 at 11:06 PM, wrote:

    where do I find unfold/unfoldAsync ? I looked at
    http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/stages-overview.html
    ?
    Many Greetings
    John


    Am Mittwoch, 3. Februar 2016 20:51:17 UTC+1 schrieb √:
    I don't see why you'd need to write a custom GraphStage for
    this.

    unfold/unfoldAsync paired with a buffer with an appropriate
    dropping policy should work, no?

    On Wed, Feb 3, 2016 at 8:47 PM, clca <clau...@outlook.com>
    wrote:
    Yes this is close to what I need to do.
    The code in the onPull method is something like

    val m = ReadFromService(...)
    if(m != null)
    push(out, m)

    in a traditional app reading would be done in a loop

    while(true) {
    val m = ReadFromService(...)
    //do something with the new message
    }

    So I'll add a mechanism in the Source to keep polling the
    external service.

    Thanks!


    On Wednesday, February 3, 2016 at 12:55:16 AM UTC-8,
    john....@gmail.com wrote:
    I've done something similar.
    I adapted this JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    When no data is available( for example when it recieves a Request(16)
    Messag) it starts a" polling Actor" which polls an external Database for
    more data.
    Does this help?

    Am Mittwoch, 3. Februar 2016 08:33:16 UTC+1 schrieb clca:
    I searched through the topics but I could not find any
    example on how to pull from a Source in the case where data is coming in
    bursts. I built a customized Source that read from an external source. Data
    is coming in bursts, the flow can stop for a while so I need to keep
    pulling the Source in such a way I can keep reading data (the actual read
    process is done in the old poll fashion). I guess I need something like a
    KeepAlive type of mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the
    Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from
    it, send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the
    Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from
    it, send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • Endre Varga at Feb 8, 2016 at 11:35 am
    It might or might not be a problem. I am just pointing out the effects of
    buffer. There are at lest three different approaches possible here:

      - no buffers at all, keep the pipeline fused: polling only happens on
    need, and always returns fresh data:

    poller.throttle.mapConcat(identity)

      - want prefetch for better throughput, paying the price of potentially
    stale data:

    poller.throttle.mapConcat.buffer // can use .viaAsync to make consumer
    stages concurrent

      - want to poll independently of downstream demand, but not too often

    poller.throttle.conflate(<keep newest>).mapConcat(identity)

    Any of these work, with different opertional semantics.

    -Endre
    On Mon, Feb 8, 2016 at 12:29 PM, Viktor Klang wrote:

    How is that a problem? OP only wanted to make sure that the external
    system wasn't polled too often.
    On Mon, Feb 8, 2016 at 12:20 PM, Endre Varga wrote:

    Ah, you meant Backpressure. But then what I originally said is true. Just
    think about it:

    - buffer requests
    - buffer is enough to contain the next batch emitted by mapConcat
    - buffer hence stores some result
    - downstream now asks for next batch, but 3 hours later
    - emitted result is now 3 hours old

    -Endre

    On Mon, Feb 8, 2016 at 12:05 PM, Viktor Klang <viktor.klang@gmail.com>
    wrote:
    What? How would anything be dropped if buffer has Backpressure as
    OverflowStrategy?

    On Mon, Feb 8, 2016 at 11:56 AM, Endre Varga <endre.varga@typesafe.com>
    wrote:

    On Mon, Feb 8, 2016 at 11:53 AM, Viktor Klang <viktor.klang@gmail.com>
    wrote:
    If the Buffer has Backpressure as overflow strategy I don't see how it
    decouples upstream from downstream.
    That is a dangerous move though, since you put the buffer *after* the
    mapConcat, so this means that dropping will be independent of the batch
    boundaries. I.e. the buffer might hold 1 full batch and the half, dopping
    the other half. I think in this case it is better to use a more flexible
    tool there and "batcher" seems to be the right one (it is a conflate-like
    op with capacity, so can implement "smart" buffers). Or, mapConcat should
    not be used here. I guess it depends on the use case.

    -Endre

    On Mon, Feb 8, 2016 at 11:52 AM, Endre Varga <endre.varga@typesafe.com
    wrote:

    On Sun, Feb 7, 2016 at 9:49 PM, Viktor Klang <viktor.klang@gmail.com>
    wrote:
    On Sun, Feb 7, 2016 at 2:17 PM, wrote:

    Hi Victor,
    thank you for your reply and here are my remarks :

    Point 1. You have a source of "events" and the only way you know if
    there are any events, is if you ask for events

    exactly

    Point 2.You want to consume events from this source

    Yes

    Point 3) You only need to consume events from the source if there
    is demand from "downstream"

    It is the other way round. The source logically pushes the events
    downstream.
    Ideally as soon as theses Events arrive I would like them to be
    pushed downstream. So its more a push scenario.
    The source pushes as many Events as the downstream processes can
    handle.
    (But I have to limit myself for polling the database at a rate of
    2-5 seconds because of hardware reasons)
    You only want to send data if the downstream can handle it, and the
    source knows that by keeping track of downstream demand.

    Point 4) Does the source of events ever end, and how do you know?

    No the source never ends. External Systems always insert new Event
    records into the database

    Point 5. can you ask for a specified number of events or does
    polling imply reading all available events?

    No I can ask for a specified number of events.

    Here is a concrete business case:
    1) External systems want Email Messages to be emailed to Users.
    2) They insert Email Events (Records with email-address, content,
    user_id,etc) into a Email Table (log-table)
    3) Email-Send-Service sends these Emails over the wire
    4) Acknowledge-Service logs the Email-Send Event and cleans up the
    Email Table (log-table).

    So I can set up a Stream: Email-Source -> Email-Send-Service -
    Acknowledge-Service .

    I am now considering two ways of doing it and are hoping for your
    advice (-:
    1) create the following
    Email-Source (select * from Email limit 1000) ->
    Email-Send-Service -> Acknowledge-Service
    and materialize this stream every two seconds

    2) Set up a custom source which polls forever
    Polling-Neverending-Email-Source (select * from Email limit
    1000 every two seconds inside the Actor) -> Email-Send-Service ->
    Acknowledge-Service
    Sounds like you should be able to, construct your source as
    something like this:

    <poller> + <throttle> + <mapConcat(identity) + <buffer>

    This means that the polling gets throttled, and the mapConcat is if
    you read more than 1 event per poll and the buffer is there to make sure
    that the demand that gets to the poller is > 1 to increase throughput.
    On the other hand buffer detaches the upstream, so the poller will be
    called even when there is no downstream demand, causing it to return
    potentially stale data from the buffer if there are long pauses in
    downstream consumption. So this is a tradeoff, not necessarily a bad one,
    but one to be aware of.

    -Endre



    Background: I am building up with Akka Streams a inhouse toolkit
    which deals with many ESB typical processes.
    The above usecase profits not so much from throughput (actually the
    users could live with some delay) but more from
    reusing stream components and having a nice clear Flow DSL.

    Many Greetings
    John
















    Am Sonntag, 7. Februar 2016 13:05:59 UTC+1 schrieb √:
    Hi John!

    I think I can help you, but I have some follow-on questions :)
    On Sun, Feb 7, 2016 at 12:52 PM, wrote:

    Hi Victor,
    I know you love simple elegant code but
    I looked at unfoldAsync and I don't see that it solves the
    usecase I have in mind.

    I'll try to explain why because I am not sure if I am overlooking
    something obvious?

    An external system writes events into a log table. Since this is
    a non-reactive-sql-database the consumer needs to poll the log table (for
    example every 2 seconds) for new events.
    (Why) does it need to poll the log table if there is no demand for
    events?

    To convert this scenario into a streaming source using
    unfoldAsync
    I need to implement a polling loop inside the future so that
    Future.success gets only called when new events are inserted in the log
    table.
    This seems much like a technical aspect rather than a requirement.

    If I don't use polling the future would send 0 Events upstream
    and the stream would come to an end?
    Events go downstream, or did I misunderstand something?

    That's why I like using instead of unfoldAsync an Actor like
    JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    as an Source. Within the Actor I can be more fine grained and use the
    scheduler to implement the polling logic.
    Let's take a step back, what are the actual requirements?

    1. You have a source of "events" and the only way you know if
    there are any events, is if you ask for events
    2. You want to consume events from this source
    3. You only need to consume events from the source if there is
    demand from "downstream"
    4. Does the source of events ever end, and how do you know?
    5. can you ask for a specified number of events or does polling
    imply reading all available events?

    Many Greetings
    John











    Am Mittwoch, 3. Februar 2016 23:10:15 UTC+1 schrieb √:

    http://doc.akka.io/api/akka-stream-and-http-experimental/2.0.3/?_ga=1.45749860.1579561034.1353497989#akka.stream.scaladsl.Source$
    On Wed, Feb 3, 2016 at 11:06 PM, wrote:

    where do I find unfold/unfoldAsync ? I looked at
    http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/stages-overview.html
    ?
    Many Greetings
    John


    Am Mittwoch, 3. Februar 2016 20:51:17 UTC+1 schrieb √:
    I don't see why you'd need to write a custom GraphStage for
    this.

    unfold/unfoldAsync paired with a buffer with an appropriate
    dropping policy should work, no?

    On Wed, Feb 3, 2016 at 8:47 PM, clca <clau...@outlook.com>
    wrote:
    Yes this is close to what I need to do.
    The code in the onPull method is something like

    val m = ReadFromService(...)
    if(m != null)
    push(out, m)

    in a traditional app reading would be done in a loop

    while(true) {
    val m = ReadFromService(...)
    //do something with the new message
    }

    So I'll add a mechanism in the Source to keep polling the
    external service.

    Thanks!


    On Wednesday, February 3, 2016 at 12:55:16 AM UTC-8,
    john....@gmail.com wrote:
    I've done something similar.
    I adapted this JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    When no data is available( for example when it recieves a Request(16)
    Messag) it starts a" polling Actor" which polls an external Database for
    more data.
    Does this help?

    Am Mittwoch, 3. Februar 2016 08:33:16 UTC+1 schrieb clca:
    I searched through the topics but I could not find any
    example on how to pull from a Source in the case where data is coming in
    bursts. I built a customized Source that read from an external source. Data
    is coming in bursts, the flow can stop for a while so I need to keep
    pulling the Source in such a way I can keep reading data (the actual read
    process is done in the old poll fashion). I guess I need something like a
    KeepAlive type of mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the
    Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from
    it, send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com
    .
    Visit this group at https://groups.google.com/group/akka-user
    .
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the
    Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from
    it, send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the
    Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • John Vieten at Feb 10, 2016 at 1:43 pm
    thanks Endre and Victor.
    my code is much much cleaner now actually just 8 java lines and a
    Throttler:
    I was just wondering: I had to code my own throttler or is there any in
    core?

    final static class Tick {}
    public static <T> Flow<T,T, BoxedUnit> createFlow(final long throttleSecs) {
    Flow<T, T, BoxedUnit> sourceflow = Flow.<T>create();

    Flow<T,Pair<T,Tick>, BoxedUnit> flow = Flow.fromGraph(
              GraphDSL.create(builder -> {
                 final FlowShape<T, T> source = builder.add(sourceflow);
                 Source<Tick, Cancellable> tickSource = Source.tick(FiniteDuration.apply(0, "millis"), FiniteDuration.apply(throttleSecs, "millis"), new Tick());
                 final FanInShape2<T, Tick, Pair<T, Tick>> zipper = builder.add(Zip.create());
                 SourceShape<Tick> tickSourceShape = builder.add(tickSource);

                 builder.from(source).toInlet(zipper.in0());
                 builder.from(tickSourceShape).toInlet(zipper.in1());
                 return FlowShape.of(source.in(), zipper.out());
              }));
        return flow.map(Pair::first);
    }




    Am Montag, 8. Februar 2016 12:35:35 UTC+1 schrieb drewhk:
    It might or might not be a problem. I am just pointing out the effects of
    buffer. There are at lest three different approaches possible here:

    - no buffers at all, keep the pipeline fused: polling only happens on
    need, and always returns fresh data:

    poller.throttle.mapConcat(identity)

    - want prefetch for better throughput, paying the price of potentially
    stale data:

    poller.throttle.mapConcat.buffer // can use .viaAsync to make consumer
    stages concurrent

    - want to poll independently of downstream demand, but not too often

    poller.throttle.conflate(<keep newest>).mapConcat(identity)

    Any of these work, with different opertional semantics.

    -Endre

    On Mon, Feb 8, 2016 at 12:29 PM, Viktor Klang <viktor...@gmail.com
    <javascript:>> wrote:
    How is that a problem? OP only wanted to make sure that the external
    system wasn't polled too often.

    On Mon, Feb 8, 2016 at 12:20 PM, Endre Varga <endre...@typesafe.com
    <javascript:>> wrote:
    Ah, you meant Backpressure. But then what I originally said is true.
    Just think about it:

    - buffer requests
    - buffer is enough to contain the next batch emitted by mapConcat
    - buffer hence stores some result
    - downstream now asks for next batch, but 3 hours later
    - emitted result is now 3 hours old

    -Endre

    On Mon, Feb 8, 2016 at 12:05 PM, Viktor Klang <viktor...@gmail.com
    <javascript:>> wrote:
    What? How would anything be dropped if buffer has Backpressure as
    OverflowStrategy?

    On Mon, Feb 8, 2016 at 11:56 AM, Endre Varga <endre...@typesafe.com
    <javascript:>> wrote:

    On Mon, Feb 8, 2016 at 11:53 AM, Viktor Klang <viktor...@gmail.com
    <javascript:>> wrote:
    If the Buffer has Backpressure as overflow strategy I don't see how
    it decouples upstream from downstream.
    That is a dangerous move though, since you put the buffer *after* the
    mapConcat, so this means that dropping will be independent of the batch
    boundaries. I.e. the buffer might hold 1 full batch and the half, dopping
    the other half. I think in this case it is better to use a more flexible
    tool there and "batcher" seems to be the right one (it is a conflate-like
    op with capacity, so can implement "smart" buffers). Or, mapConcat should
    not be used here. I guess it depends on the use case.

    -Endre

    On Mon, Feb 8, 2016 at 11:52 AM, Endre Varga <endre...@typesafe.com
    <javascript:>> wrote:

    On Sun, Feb 7, 2016 at 9:49 PM, Viktor Klang <viktor...@gmail.com
    <javascript:>> wrote:

    On Sun, Feb 7, 2016 at 2:17 PM, <john....@gmail.com <javascript:>>
    wrote:
    Hi Victor,
    thank you for your reply and here are my remarks :

    Point 1. You have a source of "events" and the only way you know
    if there are any events, is if you ask for events

    exactly

    Point 2.You want to consume events from this source

    Yes

    Point 3) You only need to consume events from the source if there
    is demand from "downstream"

    It is the other way round. The source logically pushes the events
    downstream.
    Ideally as soon as theses Events arrive I would like them to be
    pushed downstream. So its more a push scenario.
    The source pushes as many Events as the downstream processes can
    handle.
    (But I have to limit myself for polling the database at a rate of
    2-5 seconds because of hardware reasons)
    You only want to send data if the downstream can handle it, and the
    source knows that by keeping track of downstream demand.

    Point 4) Does the source of events ever end, and how do you know?

    No the source never ends. External Systems always insert new Event
    records into the database

    Point 5. can you ask for a specified number of events or does
    polling imply reading all available events?

    No I can ask for a specified number of events.

    Here is a concrete business case:
    1) External systems want Email Messages to be emailed to Users.
    2) They insert Email Events (Records with email-address, content,
    user_id,etc) into a Email Table (log-table)
    3) Email-Send-Service sends these Emails over the wire
    4) Acknowledge-Service logs the Email-Send Event and cleans up
    the Email Table (log-table).

    So I can set up a Stream: Email-Source -> Email-Send-Service -
    Acknowledge-Service .

    I am now considering two ways of doing it and are hoping for your
    advice (-:
    1) create the following
    Email-Source (select * from Email limit 1000) ->
    Email-Send-Service -> Acknowledge-Service
    and materialize this stream every two seconds

    2) Set up a custom source which polls forever
    Polling-Neverending-Email-Source (select * from Email limit
    1000 every two seconds inside the Actor) -> Email-Send-Service ->
    Acknowledge-Service
    Sounds like you should be able to, construct your source as
    something like this:

    <poller> + <throttle> + <mapConcat(identity) + <buffer>

    This means that the polling gets throttled, and the mapConcat is if
    you read more than 1 event per poll and the buffer is there to make sure
    that the demand that gets to the poller is > 1 to increase throughput.
    On the other hand buffer detaches the upstream, so the poller will
    be called even when there is no downstream demand, causing it to return
    potentially stale data from the buffer if there are long pauses in
    downstream consumption. So this is a tradeoff, not necessarily a bad one,
    but one to be aware of.

    -Endre



    Background: I am building up with Akka Streams a inhouse toolkit
    which deals with many ESB typical processes.
    The above usecase profits not so much from throughput (actually
    the users could live with some delay) but more from
    reusing stream components and having a nice clear Flow DSL.

    Many Greetings
    John
















    Am Sonntag, 7. Februar 2016 13:05:59 UTC+1 schrieb √:
    Hi John!

    I think I can help you, but I have some follow-on questions :)
    On Sun, Feb 7, 2016 at 12:52 PM, wrote:

    Hi Victor,
    I know you love simple elegant code but
    I looked at unfoldAsync and I don't see that it solves the
    usecase I have in mind.

    I'll try to explain why because I am not sure if I am
    overlooking something obvious?

    An external system writes events into a log table. Since this is
    a non-reactive-sql-database the consumer needs to poll the log table (for
    example every 2 seconds) for new events.
    (Why) does it need to poll the log table if there is no demand
    for events?

    To convert this scenario into a streaming source using
    unfoldAsync
    I need to implement a polling loop inside the future so that
    Future.success gets only called when new events are inserted in the log
    table.
    This seems much like a technical aspect rather than a requirement.

    If I don't use polling the future would send 0 Events upstream
    and the stream would come to an end?
    Events go downstream, or did I misunderstand something?

    That's why I like using instead of unfoldAsync an Actor like
    JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    as an Source. Within the Actor I can be more fine grained and use the
    scheduler to implement the polling logic.
    Let's take a step back, what are the actual requirements?

    1. You have a source of "events" and the only way you know if
    there are any events, is if you ask for events
    2. You want to consume events from this source
    3. You only need to consume events from the source if there is
    demand from "downstream"
    4. Does the source of events ever end, and how do you know?
    5. can you ask for a specified number of events or does polling
    imply reading all available events?

    Many Greetings
    John











    Am Mittwoch, 3. Februar 2016 23:10:15 UTC+1 schrieb √:

    http://doc.akka.io/api/akka-stream-and-http-experimental/2.0.3/?_ga=1.45749860.1579561034.1353497989#akka.stream.scaladsl.Source$
    On Wed, Feb 3, 2016 at 11:06 PM, wrote:

    where do I find unfold/unfoldAsync ? I looked at
    http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/stages-overview.html
    ?
    Many Greetings
    John


    Am Mittwoch, 3. Februar 2016 20:51:17 UTC+1 schrieb √:
    I don't see why you'd need to write a custom GraphStage for
    this.

    unfold/unfoldAsync paired with a buffer with an appropriate
    dropping policy should work, no?

    On Wed, Feb 3, 2016 at 8:47 PM, clca <clau...@outlook.com>
    wrote:
    Yes this is close to what I need to do.
    The code in the onPull method is something like

    val m = ReadFromService(...)
    if(m != null)
    push(out, m)

    in a traditional app reading would be done in a loop

    while(true) {
    val m = ReadFromService(...)
    //do something with the new message
    }

    So I'll add a mechanism in the Source to keep polling the
    external service.

    Thanks!


    On Wednesday, February 3, 2016 at 12:55:16 AM UTC-8,
    john....@gmail.com wrote:
    I've done something similar.
    I adapted this JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    When no data is available( for example when it recieves a Request(16)
    Messag) it starts a" polling Actor" which polls an external Database for
    more data.
    Does this help?

    Am Mittwoch, 3. Februar 2016 08:33:16 UTC+1 schrieb clca:
    I searched through the topics but I could not find any
    example on how to pull from a Source in the case where data is coming in
    bursts. I built a customized Source that read from an external source. Data
    is coming in bursts, the flow can stop for a while so I need to keep
    pulling the Source in such a way I can keep reading data (the actual read
    process is done in the old poll fashion). I guess I need something like a
    KeepAlive type of mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the
    Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails
    from it, send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to
    akka...@googlegroups.com.
    Visit this group at
    https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the
    Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from
    it, send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the
    Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from
    it, send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com <javascript:>.
    To post to this group, send email to akka...@googlegroups.com
    <javascript:>.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com <javascript:>.
    To post to this group, send email to akka...@googlegroups.com
    <javascript:>.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com <javascript:>.
    To post to this group, send email to akka...@googlegroups.com
    <javascript:>.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com <javascript:>.
    To post to this group, send email to akka...@googlegroups.com
    <javascript:>.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+...@googlegroups.com <javascript:>.
    To post to this group, send email to akka...@googlegroups.com
    <javascript:>.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+...@googlegroups.com <javascript:>.
    To post to this group, send email to akka...@googlegroups.com
    <javascript:>.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+...@googlegroups.com <javascript:>.
    To post to this group, send email to akka...@googlegroups.com
    <javascript:>.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+...@googlegroups.com <javascript:>.
    To post to this group, send email to akka...@googlegroups.com
    <javascript:>.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
  • Endre Varga at Feb 10, 2016 at 1:52 pm
    There is one:
    http://doc.akka.io/japi/akka/2.4.2-RC2/akka/stream/javadsl/Flow.html#throttle-int-scala.concurrent.duration.FiniteDuration-int-akka.stream.ThrottleMode-

    -Endre
    On Wed, Feb 10, 2016 at 2:43 PM, wrote:

    thanks Endre and Victor.
    my code is much much cleaner now actually just 8 java lines and a
    Throttler:
    I was just wondering: I had to code my own throttler or is there any in
    core?

    final static class Tick {}
    public static <T> Flow<T,T, BoxedUnit> createFlow(final long throttleSecs) {
    Flow<T, T, BoxedUnit> sourceflow = Flow.<T>create();

    Flow<T,Pair<T,Tick>, BoxedUnit> flow = Flow.fromGraph(
    GraphDSL.create(builder -> {
    final FlowShape<T, T> source = builder.add(sourceflow);
    Source<Tick, Cancellable> tickSource = Source.tick(FiniteDuration.apply(0, "millis"), FiniteDuration.apply(throttleSecs, "millis"), new Tick());
    final FanInShape2<T, Tick, Pair<T, Tick>> zipper = builder.add(Zip.create());
    SourceShape<Tick> tickSourceShape = builder.add(tickSource);

    builder.from(source).toInlet(zipper.in0());
    builder.from(tickSourceShape).toInlet(zipper.in1());
    return FlowShape.of(source.in(), zipper.out());
    }));
    return flow.map(Pair::first);
    }




    Am Montag, 8. Februar 2016 12:35:35 UTC+1 schrieb drewhk:
    It might or might not be a problem. I am just pointing out the effects of
    buffer. There are at lest three different approaches possible here:

    - no buffers at all, keep the pipeline fused: polling only happens on
    need, and always returns fresh data:

    poller.throttle.mapConcat(identity)

    - want prefetch for better throughput, paying the price of potentially
    stale data:

    poller.throttle.mapConcat.buffer // can use .viaAsync to make consumer
    stages concurrent

    - want to poll independently of downstream demand, but not too often

    poller.throttle.conflate(<keep newest>).mapConcat(identity)

    Any of these work, with different opertional semantics.

    -Endre

    On Mon, Feb 8, 2016 at 12:29 PM, Viktor Klang <viktor...@gmail.com>
    wrote:
    How is that a problem? OP only wanted to make sure that the external
    system wasn't polled too often.

    On Mon, Feb 8, 2016 at 12:20 PM, Endre Varga <endre...@typesafe.com>
    wrote:
    Ah, you meant Backpressure. But then what I originally said is true.
    Just think about it:

    - buffer requests
    - buffer is enough to contain the next batch emitted by mapConcat
    - buffer hence stores some result
    - downstream now asks for next batch, but 3 hours later
    - emitted result is now 3 hours old

    -Endre

    On Mon, Feb 8, 2016 at 12:05 PM, Viktor Klang <viktor...@gmail.com>
    wrote:
    What? How would anything be dropped if buffer has Backpressure as
    OverflowStrategy?

    On Mon, Feb 8, 2016 at 11:56 AM, Endre Varga <endre...@typesafe.com>
    wrote:

    On Mon, Feb 8, 2016 at 11:53 AM, Viktor Klang <viktor...@gmail.com>
    wrote:
    If the Buffer has Backpressure as overflow strategy I don't see how
    it decouples upstream from downstream.
    That is a dangerous move though, since you put the buffer *after* the
    mapConcat, so this means that dropping will be independent of the batch
    boundaries. I.e. the buffer might hold 1 full batch and the half, dopping
    the other half. I think in this case it is better to use a more flexible
    tool there and "batcher" seems to be the right one (it is a conflate-like
    op with capacity, so can implement "smart" buffers). Or, mapConcat should
    not be used here. I guess it depends on the use case.

    -Endre

    On Mon, Feb 8, 2016 at 11:52 AM, Endre Varga <endre...@typesafe.com>
    wrote:

    On Sun, Feb 7, 2016 at 9:49 PM, Viktor Klang <viktor...@gmail.com>
    wrote:

    On Sun, Feb 7, 2016 at 2:17 PM, wrote:

    Hi Victor,
    thank you for your reply and here are my remarks :

    Point 1. You have a source of "events" and the only way you know
    if there are any events, is if you ask for events

    exactly

    Point 2.You want to consume events from this source

    Yes

    Point 3) You only need to consume events from the source if
    there is demand from "downstream"

    It is the other way round. The source logically pushes the events
    downstream.
    Ideally as soon as theses Events arrive I would like them to be
    pushed downstream. So its more a push scenario.
    The source pushes as many Events as the downstream processes can
    handle.
    (But I have to limit myself for polling the database at a rate of
    2-5 seconds because of hardware reasons)
    You only want to send data if the downstream can handle it, and
    the source knows that by keeping track of downstream demand.

    Point 4) Does the source of events ever end, and how do you know?

    No the source never ends. External Systems always insert new
    Event records into the database

    Point 5. can you ask for a specified number of events or does
    polling imply reading all available events?

    No I can ask for a specified number of events.

    Here is a concrete business case:
    1) External systems want Email Messages to be emailed to Users.
    2) They insert Email Events (Records with email-address, content,
    user_id,etc) into a Email Table (log-table)
    3) Email-Send-Service sends these Emails over the wire
    4) Acknowledge-Service logs the Email-Send Event and cleans up
    the Email Table (log-table).

    So I can set up a Stream: Email-Source -> Email-Send-Service -
    Acknowledge-Service .

    I am now considering two ways of doing it and are hoping for your
    advice (-:
    1) create the following
    Email-Source (select * from Email limit 1000) ->
    Email-Send-Service -> Acknowledge-Service
    and materialize this stream every two seconds

    2) Set up a custom source which polls forever
    Polling-Neverending-Email-Source (select * from Email limit
    1000 every two seconds inside the Actor) -> Email-Send-Service ->
    Acknowledge-Service
    Sounds like you should be able to, construct your source as
    something like this:

    <poller> + <throttle> + <mapConcat(identity) + <buffer>

    This means that the polling gets throttled, and the mapConcat is
    if you read more than 1 event per poll and the buffer is there to make sure
    that the demand that gets to the poller is > 1 to increase throughput.
    On the other hand buffer detaches the upstream, so the poller will
    be called even when there is no downstream demand, causing it to return
    potentially stale data from the buffer if there are long pauses in
    downstream consumption. So this is a tradeoff, not necessarily a bad one,
    but one to be aware of.

    -Endre



    Background: I am building up with Akka Streams a inhouse toolkit
    which deals with many ESB typical processes.
    The above usecase profits not so much from throughput (actually
    the users could live with some delay) but more from
    reusing stream components and having a nice clear Flow DSL.

    Many Greetings
    John
















    Am Sonntag, 7. Februar 2016 13:05:59 UTC+1 schrieb √:
    Hi John!

    I think I can help you, but I have some follow-on questions :)
    On Sun, Feb 7, 2016 at 12:52 PM, wrote:

    Hi Victor,
    I know you love simple elegant code but
    I looked at unfoldAsync and I don't see that it solves the
    usecase I have in mind.

    I'll try to explain why because I am not sure if I am
    overlooking something obvious?

    An external system writes events into a log table. Since this
    is a non-reactive-sql-database the consumer needs to poll the log table
    (for example every 2 seconds) for new events.
    (Why) does it need to poll the log table if there is no demand
    for events?

    To convert this scenario into a streaming source using
    unfoldAsync
    I need to implement a polling loop inside the future so that
    Future.success gets only called when new events are inserted in the log
    table.
    This seems much like a technical aspect rather than a
    requirement.

    If I don't use polling the future would send 0 Events upstream
    and the stream would come to an end?
    Events go downstream, or did I misunderstand something?

    That's why I like using instead of unfoldAsync an Actor like
    JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    as an Source. Within the Actor I can be more fine grained and use the
    scheduler to implement the polling logic.
    Let's take a step back, what are the actual requirements?

    1. You have a source of "events" and the only way you know if
    there are any events, is if you ask for events
    2. You want to consume events from this source
    3. You only need to consume events from the source if there is
    demand from "downstream"
    4. Does the source of events ever end, and how do you know?
    5. can you ask for a specified number of events or does polling
    imply reading all available events?

    Many Greetings
    John











    Am Mittwoch, 3. Februar 2016 23:10:15 UTC+1 schrieb √:

    http://doc.akka.io/api/akka-stream-and-http-experimental/2.0.3/?_ga=1.45749860.1579561034.1353497989#akka.stream.scaladsl.Source$
    On Wed, Feb 3, 2016 at 11:06 PM, wrote:

    where do I find unfold/unfoldAsync ? I looked at
    http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/stages-overview.html
    ?
    Many Greetings
    John


    Am Mittwoch, 3. Februar 2016 20:51:17 UTC+1 schrieb √:
    I don't see why you'd need to write a custom GraphStage for
    this.

    unfold/unfoldAsync paired with a buffer with an appropriate
    dropping policy should work, no?

    On Wed, Feb 3, 2016 at 8:47 PM, clca <clau...@outlook.com>
    wrote:
    Yes this is close to what I need to do.
    The code in the onPull method is something like

    val m = ReadFromService(...)
    if(m != null)
    push(out, m)

    in a traditional app reading would be done in a loop

    while(true) {
    val m = ReadFromService(...)
    //do something with the new message
    }

    So I'll add a mechanism in the Source to keep polling the
    external service.

    Thanks!


    On Wednesday, February 3, 2016 at 12:55:16 AM UTC-8,
    john....@gmail.com wrote:
    I've done something similar.
    I adapted this JobManager
    <http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/java/stream-integrations.html>.
    When no data is available( for example when it recieves a Request(16)
    Messag) it starts a" polling Actor" which polls an external Database for
    more data.
    Does this help?

    Am Mittwoch, 3. Februar 2016 08:33:16 UTC+1 schrieb clca:
    I searched through the topics but I could not find any
    example on how to pull from a Source in the case where data is coming in
    bursts. I built a customized Source that read from an external source. Data
    is coming in bursts, the flow can stop for a while so I need to keep
    pulling the Source in such a way I can keep reading data (the actual read
    process is done in the old poll fashion). I guess I need something like a
    KeepAlive type of mechanism for the data stream.

    BTW: Fantastic job with Akka, Akka Stream & Akka HTTP!

    Thanks
    Claudio
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the
    Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails
    from it, send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to
    akka...@googlegroups.com.
    Visit this group at
    https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the
    Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from
    it, send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com
    .
    Visit this group at https://groups.google.com/group/akka-user
    .
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the
    Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from
    it, send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the
    Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it,
    send an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.


    --
    Cheers,


    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    Search the archives:
    https://groups.google.com/group/akka-user
    ---
    You received this message because you are subscribed to the Google
    Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send
    an email to akka-user+...@googlegroups.com.
    To post to this group, send email to akka...@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    Read the docs: http://akka.io/docs/
    Check the FAQ:
    http://doc.akka.io/docs/akka/current/additional/faq.html
    ---
    You received this message because you are subscribed to the Google Groups
    "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an
    email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.
    --
    ---
    You received this message because you are subscribed to the Google Groups "Akka User List" group.
    To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
    To post to this group, send email to akka-user@googlegroups.com.
    Visit this group at https://groups.google.com/group/akka-user.
    For more options, visit https://groups.google.com/d/optout.

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupakka-user @
categoriesscala
postedFeb 3, '16 at 7:33a
activeFeb 10, '16 at 1:52p
posts21
users4
websiteakka.io
irc#akka

People

Translate

site design / logo © 2019 Grokbase