Deelo | 29 Jun 2012 10:22
Picon

RabbitMQ Subscriber Timeout When Waiting for Delivery

My subscriber appears to timeout and not process any messages if the publisher takes a long time to initiate. (e.g. I start the publisher 30 minutes after the subscriber).
Is there a way to change this?

Appreciate any help as this is causing all kinds of problems!


public class RabbitMqSubscriber<T extends Serializable> implements Subscriber<T> {
private static final Logger log = LoggerFactory.getLogger(RabbitMqSubscriber.class);
private QueueingConsumer consumer;
private MessageListener<T> listener;
private String exchange;
private String topic;
public RabbitMqSubscriber(String host,String exchange,String topic) throws IOException {
this.exchange=exchange;
this.topic=topic;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchange, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchange, topic);
consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
}

public void run() {
log.info("Starting subscribing to messages on exchange: "+exchange + " with topic: "+topic);
while (true) {
QueueingConsumer.Delivery delivery;
try {
delivery = consumer.nextDelivery();
Object o=SerializationUtils.deserialize(delivery.getBody());
log.info("Subscriber received object: "+o);
listener.receive((T)o);
} catch (ShutdownSignalException | ConsumerCancelledException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}

<at> Override
public void setListener(MessageListener<T> listener) {
this.listener=listener;
}
}

public class RabbitMqPublisher<T extends Serializable> implements Publisher<T>{
private String topic;
private Channel channel;
private String exchange;
private static final Logger log = LoggerFactory.getLogger(RabbitMqPublisher.class);
public RabbitMqPublisher(String host,String exchange,String topic) throws IOException {
super();
this.topic = topic;
this.exchange=exchange;
   ConnectionFactory factory = new ConnectionFactory();
   factory.setHost(host);
   Connection connection = factory.newConnection();
   channel = connection.createChannel();
        channel.exchangeDeclare(exchange, "topic");
log.info("Publishing to messages on exchange: "+exchange + " with topic: "+topic);
}
<at> Override
public void publish(T t) {
   try {
    log.info("Publishing object: "+t);
       channel.basicPublish(exchange, topic, null, SerializationUtils.serialize(t));
} catch (IOException e) {
throw new RuntimeException(e);
}

}
}

public class TimeoutTest {
private static final Logger log = LoggerFactory.getLogger(TimeoutTest.class);
public static void main(String[] args) throws Exception{
final RabbitMqPublisher<String> publisher=new RabbitMqPublisher<String>("31.222.170.242","EXCHANGE","TEST");
final RabbitMqSubscriber<String> subscriber=new RabbitMqSubscriber<String>("31.222.170.242","EXCHANGE", "TEST");
subscriber.setListener(new MessageListener<String>() {
<at> Override
public void receive(String s) {
System.out.println("Received: "+s); //NEVER GETS CALLED!!!
}
});
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
subscriber.run();
}
});
//start subscriber then wait 30 minutes before publishing first message 
Thread.sleep(1000*1800);
publisher.publish("MESSAGE PUBLISHED");
}
}

<div>
<div>My subscriber appears to timeout and not process any messages if the publisher takes a long time to initiate. (e.g. I start the publisher 30 minutes after the subscriber).</div>
<div>Is there a way to change this?</div>
<div><br></div>
<div>Appreciate any help as this is causing all kinds of problems!</div>
<div><br></div>
<div><br></div>
<div>public class RabbitMqSubscriber&lt;T extends Serializable&gt; implements Subscriber&lt;T&gt; {</div>
<div>
<span class="Apple-tab-span">	</span>private static final Logger log = LoggerFactory.getLogger(RabbitMqSubscriber.class);</div>
<div>
<span class="Apple-tab-span">	</span>private QueueingConsumer consumer;</div>
<div>
<span class="Apple-tab-span">	</span>private MessageListener&lt;T&gt; listener;</div>
<div>
<span class="Apple-tab-span">	</span>private String exchange;</div>
<div>
<span class="Apple-tab-span">	</span>private String topic;</div>
<div>
<span class="Apple-tab-span">	</span>public RabbitMqSubscriber(String host,String exchange,String topic) throws IOException {</div>
<div>
<span class="Apple-tab-span">		</span>this.exchange=exchange;</div>
<div>
<span class="Apple-tab-span">		</span>this.topic=topic;</div>
<div>
<span class="Apple-tab-span">		</span>ConnectionFactory factory = new ConnectionFactory();</div>
<div>
<span class="Apple-tab-span">		</span>factory.setHost(host);</div>
<div>
<span class="Apple-tab-span">		</span>Connection connection = factory.newConnection();</div>
<div>
<span class="Apple-tab-span">		</span>Channel channel = connection.createChannel();</div>
<div>
<span class="Apple-tab-span">		</span>channel.exchangeDeclare(exchange, "topic");</div>
<div>
<span class="Apple-tab-span">		</span>String queueName = channel.queueDeclare().getQueue();</div>
<div>
<span class="Apple-tab-span">		</span>channel.queueBind(queueName, exchange, topic);</div>
<div>
<span class="Apple-tab-span">		</span>consumer = new QueueingConsumer(channel);</div>
<div>
<span class="Apple-tab-span">		</span>channel.basicConsume(queueName, true, consumer);</div>
<div>
<span class="Apple-tab-span">	</span>}</div>
<div><br></div>
<div>
<span class="Apple-tab-span">	</span>public void run() {</div>
<div>
<span class="Apple-tab-span">		</span>log.info("Starting subscribing to messages on exchange: "+exchange + " with topic: "+topic);</div>
<div>
<span class="Apple-tab-span">		</span>while (true) {</div>
<div>
<span class="Apple-tab-span">			</span>QueueingConsumer.Delivery delivery;</div>
<div>
<span class="Apple-tab-span">			</span>try {</div>
<div>
<span class="Apple-tab-span">				</span>delivery = consumer.nextDelivery();</div>
<div>
<span class="Apple-tab-span">				</span>Object o=SerializationUtils.deserialize(delivery.getBody());</div>
<div>
<span class="Apple-tab-span">				</span>log.info("Subscriber received object: "+o);</div>
<div>
<span class="Apple-tab-span">				</span>listener.receive((T)o);</div>
<div>
<span class="Apple-tab-span">			</span>} catch (ShutdownSignalException | ConsumerCancelledException | InterruptedException e) {</div>
<div>
<span class="Apple-tab-span">				</span>throw new RuntimeException(e);</div>
<div>
<span class="Apple-tab-span">			</span>}</div>
<div>
<span class="Apple-tab-span">		</span>}</div>
<div>
<span class="Apple-tab-span">	</span>}</div>
<div><br></div>
<div>
<span class="Apple-tab-span">	</span> <at> Override</div>
<div>
<span class="Apple-tab-span">	</span>public void setListener(MessageListener&lt;T&gt; listener) {</div>
<div>
<span class="Apple-tab-span">		</span>this.listener=listener;</div>
<div>
<span class="Apple-tab-span">	</span>}</div>
<div>}</div>
<div><br></div>
<div>
<div>public class RabbitMqPublisher&lt;T extends Serializable&gt; implements Publisher&lt;T&gt;{</div>
<div>
<span class="Apple-tab-span">	</span>private String topic;</div>
<div>
<span class="Apple-tab-span">	</span>private Channel channel;</div>
<div>
<span class="Apple-tab-span">	</span>private String exchange;</div>
<div>
<span class="Apple-tab-span">	</span>private static final Logger log = LoggerFactory.getLogger(RabbitMqPublisher.class);</div>
<div>
<span class="Apple-tab-span">	</span>public RabbitMqPublisher(String host,String exchange,String topic) throws IOException {</div>
<div>
<span class="Apple-tab-span">		</span>super();</div>
<div>
<span class="Apple-tab-span">		</span>this.topic = topic;</div>
<div>
<span class="Apple-tab-span">		</span>this.exchange=exchange;</div>
<div>
<span class="Apple-tab-span">	</span> &nbsp; &nbsp;ConnectionFactory factory = new ConnectionFactory();</div>
<div>
<span class="Apple-tab-span">	</span> &nbsp; &nbsp;factory.setHost(host);</div>
<div>
<span class="Apple-tab-span">	</span> &nbsp; &nbsp;Connection connection = factory.newConnection();</div>
<div>
<span class="Apple-tab-span">	</span> &nbsp; &nbsp;channel = connection.createChannel();</div>
<div>&nbsp; &nbsp; &nbsp; &nbsp; channel.exchangeDeclare(exchange, "topic");</div>
<div>
<span class="Apple-tab-span">		</span>log.info("Publishing to messages on exchange: "+exchange + " with topic: "+topic);</div>
<div>
<span class="Apple-tab-span">	</span>}</div>
<div>
<span class="Apple-tab-span">	</span> <at> Override</div>
<div>
<span class="Apple-tab-span">	</span>public void publish(T t) {</div>
<div>
<span class="Apple-tab-span">	</span> &nbsp; &nbsp;try {</div>
<div>
<span class="Apple-tab-span">	</span> &nbsp; &nbsp;<span class="Apple-tab-span">	</span>log.info("Publishing object: "+t);</div>
<div>
<span class="Apple-tab-span">	</span> &nbsp; &nbsp; &nbsp; &nbsp;channel.basicPublish(exchange, topic, null, SerializationUtils.serialize(t));</div>
<div>
<span class="Apple-tab-span">		</span>} catch (IOException e) {</div>
<div>
<span class="Apple-tab-span">			</span>throw new RuntimeException(e);</div>
<div>
<span class="Apple-tab-span">		</span>}</div>
<div><br></div>
<div>
<span class="Apple-tab-span">	</span>}</div>
<div>}</div>
</div>
<div><br></div>
<div>
<div>public class TimeoutTest {</div>
<div>
<span class="Apple-tab-span">	</span>private static final Logger log = LoggerFactory.getLogger(TimeoutTest.class);</div>
<div>
<span class="Apple-tab-span">	</span>public static void main(String[] args) throws Exception{</div>
<div>
<span class="Apple-tab-span">		</span>final RabbitMqPublisher&lt;String&gt; publisher=new RabbitMqPublisher&lt;String&gt;("31.222.170.242","EXCHANGE","TEST");</div>
<div>
<span class="Apple-tab-span">		</span>final RabbitMqSubscriber&lt;String&gt; subscriber=new RabbitMqSubscriber&lt;String&gt;("31.222.170.242","EXCHANGE", "TEST");</div>
<div>
<span class="Apple-tab-span">		</span>subscriber.setListener(new MessageListener&lt;String&gt;() {</div>
<div><span class="Apple-tab-span">			</span></div>
<div>
<span class="Apple-tab-span">			</span> <at> Override</div>
<div>
<span class="Apple-tab-span">			</span>public void receive(String s) {</div>
<div>
<span class="Apple-tab-span">				</span>System.out.println("Received: "+s); //NEVER GETS CALLED!!!</div>
<div><span class="Apple-tab-span">				</span></div>
<div>
<span class="Apple-tab-span">			</span>}</div>
<div>
<span class="Apple-tab-span">		</span>});</div>
<div>
<span class="Apple-tab-span">		</span>Executors.newSingleThreadExecutor().execute(new Runnable() {</div>
<div>
<span class="Apple-tab-span">			</span>public void run() {</div>
<div>
<span class="Apple-tab-span">				</span>subscriber.run();</div>
<div>
<span class="Apple-tab-span">			</span>}</div>
<div>
<span class="Apple-tab-span">		</span>});</div>
<div><span class="Apple-tab-span">		</span></div>
<div>
<span class="Apple-tab-span">		</span>//start subscriber then wait 30 minutes before publishing first message&nbsp;</div>
<div>
<span class="Apple-tab-span">		</span>Thread.sleep(1000*1800);</div>
<div><span class="Apple-tab-span">		</span></div>
<div>
<span class="Apple-tab-span">		</span>publisher.publish("MESSAGE PUBLISHED");</div>
<div>
<span class="Apple-tab-span">	</span>}</div>
<div><span class="Apple-tab-span">	</span></div>
<div>}</div>
</div>
<div><br></div>
</div>

Gmane