Hi all,

How do we make a non blocking recv from librabbitmq? amqp_simple_wait_frame
blocks if the queue doesn't contain any message.

Should we call amqp_data_in_buffer() and based on the result call
amqp_simple_wait_frame or implement a solution like this?

http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-September/014868.html

Thanks in advance.

- Arun
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120215/36f5d167/attachment.htm>

Search Discussions

  • Alan Antonuk at Feb 16, 2012 at 5:32 am
    Arun;

    I agree with the method used in the gist linked in your email:

    First check amqp_frames_enqueued(). If this returns true, there are decoded
    frames ready, amqp_simple_wait_frame() will not block.

    Second check amqp_data_in_buffer(). If this returns true, there is data
    that has already been received (with recv()) but not decoded. It is likely
    if you call amqp_simple_wait_frame() it will not block. However if the data
    in the buffer doesn't complete a frame, recv() will be called and may block.

    Third, call select() or poll() on the socket associated with the connection
    (you can use amqp_get_sockfd() to get the socket descriptor). If this
    system call shows that the socket can be read from amqp_simple_wait_frame()
    will call recv() and likely won't block - assuming a full frame is received.

    As you may notice the last two steps don't give you a "correct all the
    time" answer to whether amqp_simple_wait_frame() will block or not. In
    practice I've found the majority of the time the above works well enough
    for RPC-style AMQP messaging for the following two reasons:
    1. When the broker sends data - typically it sends it as an entire frame,
    if your select() call returns that there is data in the buffer likely you
    already have, or soon will have an entire frame ready to be read by recv()
    2. AMQP is intended to be run on a low-latency, high-bandwidth LAN, so if
    you do get a partial frame when recv is called, within a short time period
    you will receive the rest of the frame, otherwise it is likely something
    serious has happened and that will cause the whole connection to die at
    some point in the near future (possibly time out - which I grant you can
    possibly block for a lengthy amount of time).

    There is definitely room for improvement in the rabbitmq-c library handles
    non-blocking behavior.

    HTH
    -Alan

    On Feb 15, 2012, at 2:28 PM, Brett Cameron wrote:

    Arun,

    I'd look at implementing something along the lines of what Alex describes
    in the link using select() or poll(). Have cc'd Alan for his consideration
    as to how this might best be done. The approach outlined by Alex in the
    link is okay, but you could still potentially find yourself hanging on a
    blocked read if something bad happened between the initial check for data
    available and a read operation. An alternative might be to add a timeout
    parameter to the wait_frame call or create a variant of this function that
    includes a timeout...

    Regards,
    Brett
    On Thu, Feb 16, 2012 at 7:58 AM, Arun Chandrasekaran wrote:

    Hi all,

    How do we make a non blocking recv from librabbitmq?
    amqp_simple_wait_frame blocks if the queue doesn't contain any message.

    Should we call amqp_data_in_buffer() and based on the result call
    amqp_simple_wait_frame or implement a solution like this?


    http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-September/014868.html

    Thanks in advance.

    - Arun
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-discuss at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120216/05ca4675/attachment.html>
  • Arun Chandrasekaran at Feb 16, 2012 at 5:14 pm
    Thanks for reply Alan.

    I modified the amqp_consumer.c sample code to simulate the non-blocking
    behaviour as you mentioned.

    /* if (!amqp_frames_enqueued(conn) && !amqp_data_in_buffer(conn)) { */
    if (1) {
    int sock = amqp_get_sockfd(conn);
    printf("socket: %d\n", sock);

    /* Watch socket fd to see when it has input. */
    fd_set read_flags;
    FD_ZERO(&read_flags);
    FD_SET(sock), &read_flags);
    int ret = 0;
    do {
    struct timeval timeout;

    /* Wait upto a second. */
    timeout.tv_sec = 1;
    timeout.tv_usec = 0;

    ret = select(sock+1, &read_flags, NULL, NULL, &timeout);
    if (ret == -1)
    printf("select: %s\n", strerror(errno));
    else if (ret == 0)
    printf("select timedout\n");
    if (FD_ISSET(sock, &read_flags)) {
    printf("Flag is set\n");
    }
    } while (ret == 0);
    }

    But this always results in a timeout. Any idea where I might be going
    wrong? I have commented the first two checks that you mentioned just for
    sake of clarity on select().

    -Arun
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120216/086d94ad/attachment.htm>
  • Brett Cameron at Feb 16, 2012 at 6:08 pm
    Arun,

    A couple of random early in the morning pre-caffeine thoughts: is sock set
    to non-blocking? What's the errno value after the select() returns?

    Brett

    On Fri, Feb 17, 2012 at 6:14 AM, Arun Chandrasekaran wrote:

    Thanks for reply Alan.

    I modified the amqp_consumer.c sample code to simulate the non-blocking
    behaviour as you mentioned.

    /* if (!amqp_frames_enqueued(conn) && !amqp_data_in_buffer(conn)) { */
    if (1) {
    int sock = amqp_get_sockfd(conn);
    printf("socket: %d\n", sock);

    /* Watch socket fd to see when it has input. */
    fd_set read_flags;
    FD_ZERO(&read_flags);
    FD_SET(sock), &read_flags);
    int ret = 0;
    do {
    struct timeval timeout;

    /* Wait upto a second. */
    timeout.tv_sec = 1;
    timeout.tv_usec = 0;

    ret = select(sock+1, &read_flags, NULL, NULL, &timeout);
    if (ret == -1)
    printf("select: %s\n", strerror(errno));
    else if (ret == 0)
    printf("select timedout\n");
    if (FD_ISSET(sock, &read_flags)) {
    printf("Flag is set\n");
    }
    } while (ret == 0);
    }

    But this always results in a timeout. Any idea where I might be going
    wrong? I have commented the first two checks that you mentioned just for
    sake of clarity on select().

    -Arun
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120217/a024cdc1/attachment.htm>
  • Alan Antonuk at Feb 17, 2012 at 1:37 am
    Brief glance at the code - it looks good to me, except for the FD_SET(...)
    line which seems to have an extra ) after the sock variable.

    I would uncomment the line you have commented. Its likely that if you run
    this program after amqp_producer.c program, that a frame or two is cached
    and that the result of amqp_frames_enqueued() is returning true

    Also check with the management plugin see if there is a message in the
    queue ready to be consumed.

    -Alan
    On Thu, Feb 16, 2012 at 1:08 PM, Brett Cameron wrote:

    Arun,

    A couple of random early in the morning pre-caffeine thoughts: is sock set
    to non-blocking? What's the errno value after the select() returns?

    Brett


    On Fri, Feb 17, 2012 at 6:14 AM, Arun Chandrasekaran <
    visionofarun at gmail.com> wrote:
    Thanks for reply Alan.

    I modified the amqp_consumer.c sample code to simulate the non-blocking
    behaviour as you mentioned.

    /* if (!amqp_frames_enqueued(conn) && !amqp_data_in_buffer(conn)) { */
    if (1) {
    int sock = amqp_get_sockfd(conn);
    printf("socket: %d\n", sock);

    /* Watch socket fd to see when it has input. */
    fd_set read_flags;
    FD_ZERO(&read_flags);
    FD_SET(sock), &read_flags);
    int ret = 0;
    do {
    struct timeval timeout;

    /* Wait upto a second. */
    timeout.tv_sec = 1;
    timeout.tv_usec = 0;

    ret = select(sock+1, &read_flags, NULL, NULL, &timeout);
    if (ret == -1)
    printf("select: %s\n", strerror(errno));
    else if (ret == 0)
    printf("select timedout\n");
    if (FD_ISSET(sock, &read_flags)) {
    printf("Flag is set\n");
    }
    } while (ret == 0);
    }

    But this always results in a timeout. Any idea where I might be going
    wrong? I have commented the first two checks that you mentioned just for
    sake of clarity on select().

    -Arun
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120216/f17b4030/attachment.htm>
  • Arun Chandrasekaran at Feb 17, 2012 at 6:06 am
    Hi Alan,

    Brief glance at the code - it looks good to me, except for the FD_SET(...)
    line which seems to have an extra ) after the sock variable.

    I would uncomment the line you have commented. Its likely that if you run
    this program after amqp_producer.c program, that a frame or two is cached
    and that the result of amqp_frames_enqueued() is returning true
    Thanks, I've done that. But I may be missing something here. So I've
    attached the hg diff to make sure the changes are in the right place for
    the non-blocking consumer.

    Also check with the management plugin see if there is a message in the
    queue ready to be consumed.
    Currently I run on RHEL5. So I'll have to install the latest Erlang, I'll
    do that today. Alternatively I have made sure that the message is ready to
    be consumed by starting the consumer ahead of the producer. Will that
    suffice?

    Thanks,
    Arun
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120217/1d32ddec/attachment.htm>
    -------------- next part --------------
    A non-text attachment was scrubbed...
    Name: non-block-consume.diff
    Type: application/octet-stream
    Size: 1795 bytes
    Desc: not available
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120217/1d32ddec/attachment.obj>
  • Alan Antonuk at Feb 18, 2012 at 12:44 am
    You need to have the FD_ZERO() and FD_SET() inside the do loop. select()
    clears the fd_set if nothing has been seen on a socket.

    -Alan
    On Fri, Feb 17, 2012 at 1:06 AM, Arun Chandrasekaran wrote:

    Hi Alan,

    Brief glance at the code - it looks good to me, except for the
    FD_SET(...) line which seems to have an extra ) after the sock variable.

    I would uncomment the line you have commented. Its likely that if you run
    this program after amqp_producer.c program, that a frame or two is cached
    and that the result of amqp_frames_enqueued() is returning true
    Thanks, I've done that. But I may be missing something here. So I've
    attached the hg diff to make sure the changes are in the right place for
    the non-blocking consumer.

    Also check with the management plugin see if there is a message in the
    queue ready to be consumed.
    Currently I run on RHEL5. So I'll have to install the latest Erlang, I'll
    do that today. Alternatively I have made sure that the message is ready to
    be consumed by starting the consumer ahead of the producer. Will that
    suffice?

    Thanks,
    Arun
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120217/0953d233/attachment.htm>
  • Arun Chandrasekaran at Feb 26, 2012 at 9:02 am
    Hi Alan,

    I was out of country. So I was not able to reply.

    Thanks, that made it work. I was actually trying with the sample consumer
    (amqp_consumer.c) which operates with the unnamed queues. The messages were
    actually dropped by the broker.

    -Arun
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20120226/ff2ae3ff/attachment.htm>

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
grouprabbitmq-discuss @
categoriesrabbitmq
postedFeb 15, '12 at 6:58p
activeFeb 26, '12 at 9:02a
posts8
users3
websiterabbitmq.com
irc#rabbitmq

People

Translate

site design / logo © 2022 Grokbase