Hello all,

I'm working on a little project using RabbitMQ with .net C#.
I want to demonstrate that if I send a message to a queue and there is no
consumer, the message stays stocked in the queue.

In order to start with an easy example, I decided to demonstrate this with
an easy example as "Hello world" (in the RabbitMQ tutorials).

1)I build a Producer form, with a TextBox and a button to send the message
and also to open a consumer form:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using HelloWorldRabbitMQ;

namespace HelloWorldRabbitMQ2
{
public partial class ProducerForm : Form
{
public int count = 0;
public string HOST_NAME = "localhost";
public string QUEUE_NAME = "helloWorld";

private Producer producer;

public ProducerForm()
{
InitializeComponent();
//create the producer
producer = new Producer(HOST_NAME, QUEUE_NAME);

}

//Send the message on click
private void button1_Click(object sender, EventArgs e)
{
count++;
if (count<2)
{
ConsumerForm cf = new ConsumerForm();
cf.Show();
}


producer.SendMessage(System.Text.Encoding.UTF8.GetBytes(textBox1.Text));
}

}
}

2)Here is the Producer class:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;

namespace HelloWorldRabbitMQ
{
class Producer:IDisposable
{
protected IModel Model;

protected IConnection Connection;

protected string QueueName;



public Producer(string hostName, string queueName)

{

QueueName = queueName;

var connectionFactory = new ConnectionFactory();

connectionFactory.HostName = hostName;

Connection = connectionFactory.CreateConnection();

Model = Connection.CreateModel();

Model.QueueDeclare(QueueName, false, false, false, null);

}



public void SendMessage(byte[] message)

{

IBasicProperties basicProperties =
Model.CreateBasicProperties();

Model.BasicPublish("", QueueName, basicProperties , message);

}

public void Dispose()

{

if (Connection != null)

Connection.Close();

if (Model != null)

Model.Abort();

}

}
}
3)The consumer form has only a RichTextBox:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using HelloWorldRabbitMQ;

namespace HelloWorldRabbitMQ2
{
public partial class ConsumerForm : Form
{
public string HOST_NAME = "localhost";
public string QUEUE_NAME = "helloWorld";
public bool ConsumerUp;
public bool firstLoad;
private Consumer consumer;

public ConsumerForm()
{
InitializeComponent();
//create the consumer
consumer = new Consumer(HOST_NAME, QUEUE_NAME);
consumer.onMessageReceived += handleMessage;

//start consuming
consumer.StartConsuming();


}
//delegate to post to UI thread
private delegate void showMessageDelegate(string message);

//Callback for message receive
public void handleMessage(byte[] message)
{
showMessageDelegate s = new
showMessageDelegate(richTextBox1.AppendText);

this.Invoke(s, System.Text.Encoding.UTF8.GetString(message) +
Environment.NewLine);


}

}
}
4)This is the Consumer class:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;

namespace HelloWorldRabbitMQ
{
class Consumer
{
protected IModel Model;
protected IConnection Connection;
protected string QueueName;

protected bool isConsuming;

// used to pass messages back to UI for processing
public delegate void onReceiveMessage(byte[] message);
public event onReceiveMessage onMessageReceived;

public Consumer(string hostName, string queueName)
{
QueueName = queueName;
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.HostName = hostName;
Connection = connectionFactory.CreateConnection();
Model = Connection.CreateModel();
Model.QueueDeclare(QueueName, false, false, false, null);
}

//internal delegate to run the queue consumer on a seperate thread
private delegate void ConsumeDelegate();

public void StartConsuming()
{
isConsuming = true;
ConsumeDelegate c = new ConsumeDelegate(Consume);
c.BeginInvoke(null, null);
}

public void Consume()
{
QueueingBasicConsumer consumer = new
QueueingBasicConsumer(Model);
String consumerTag = Model.BasicConsume(QueueName, false,
consumer);
while (isConsuming)
{
try
{
RabbitMQ.Client.Events.BasicDeliverEventArgs e =
(RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue();
IBasicProperties props = e.BasicProperties;
byte[] body = e.Body;

// ... process the message
onMessageReceived(body);
Model.BasicAck(e.DeliveryTag, false);

}
catch //(OperationInterruptedException ex)
{

// The consumer was removed, either through
// channel or connection closure, or through the
// action of IModel.BasicCancel().
break;
}
}
}

public void Dispose()
{
isConsuming = false;
if (Connection != null)
Connection.Close();
if (Model != null)
Model.Abort();
}

}
}
Using this program, I can send messages to a single consumer and seeing the
received messages on the consumer side.
Now, if I close my consumer form and go on sending messages, messages will
be stocked into the queue.
If I close all the program and restarts sending a new message, I will
receive on my consumer side all the non-consumed messages plus the currently
sent message. That's what I want to demonstrate but without closing all the
program.

My purpous is to show that if I close my connection to the queue (on the
consumer's side)and aterwars, I open a new consumer, I will receive all the
messages that were sent to the queue during the time there was no consumer.

Does anyone have an idea what I should append to my code?

--
View this message in context: http://old.nabble.com/Shuting-down-and-reopening-the-consumer%27s-side-tp33329194p33329194.html
Sent from the RabbitMQ mailing list archive at Nabble.com.

Search Discussions

  • Emile Joubert at Feb 16, 2012 at 11:49 am
    Hi,
    On 15/02/12 14:35, Bamboula wrote:
    My purpous is to show that if I close my connection to the queue (on the
    consumer's side)and aterwars, I open a new consumer, I will receive all the
    messages that were sent to the queue during the time there was no consumer.
    If you have a fixed set of consumers then the publisher can set up
    queues and bindings for all of them before starting to publish. The
    publisher and consumers will need to establish a convention for queue
    naming beforehand. Each consumer can then subscribe to its own queue and
    read the messages that were published during its absence.

    Also take a look at this exchange plugin:
    https://github.com/squaremo/rabbitmq-lvc-plugin



    -Emile

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
grouprabbitmq-discuss @
categoriesrabbitmq
postedFeb 15, '12 at 2:35p
activeFeb 16, '12 at 11:49a
posts2
users2
websiterabbitmq.com
irc#rabbitmq

2 users in discussion

Emile Joubert: 1 post Bamboula: 1 post

People

Translate

site design / logo © 2022 Grokbase