ActiveMQ message producer and consumer with durable subscriber example

When I heard about JMS, it's something like having a message producer, a message broker (you can compare it with message media ) and a message consumer. It is very simple to understand the basic specification of jms. The figure will be explained well.


JMS is nothing but a specification. But we have many providers like weblogic jms, ActiveMQ , hornetQ, RabbitMQ and much more.

Here I used Apache ActiveMQ. It has many features and you can see it on their official website.
Here I want to implement a message producer who produces message topic. And a durable subscriber consumes those messages. Using durable subscriber ensures that each and every message delivered to the consumers. Though a subscriber goes offline, a message stored in activemq context and when it will become online message delivered to the client. Internally it uses KahaDB (a file based persistence database) for message persistence.
Now dive into the code.

I used maven so here is the maven dependency

 <dependency>  
     <groupId>org.apache.activemq</groupId>  
     <artifactId>activemq-all</artifactId>  
     <version>5.14.5</version>  
 </dependency>  

MessageProducer:

 import javax.jms.Connection;  
 import javax.jms.Destination;  
 import javax.jms.JMSException;  
 import javax.jms.MapMessage;  
 import javax.jms.MessageProducer;  
 import javax.jms.Session;  
 import javax.jms.Topic;  
 import org.apache.activemq.ActiveMQConnectionFactory;  
 /**  
  *  
  * @author Ataur Rahman  
  */  
 public class LogMessageProducer {  
   public static String topicName = "topic";  
   public static String clientId = "clientId";  
   public static void main(String[] args) throws JMSException {  
     ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(  
         "tcp://localhost:2020");  
     connectionFactory.setTrustAllPackages(true);  
     Connection connection = connectionFactory.createConnection();  
     Session session = connection.createSession(false,  
         Session.AUTO_ACKNOWLEDGE);  
     Destination d = session.createTopic(topicName);  
     try {  
       String payload = "this is sent text..............................";  
       Topic topic = session.createTopic(topicName);  
       MapMessage m = session.createMapMessage();  
       m.setString("id", "5");  
       m.setString("message", payload);  
       MessageProducer producer = session.createProducer(d);  
       producer.send(m);  
       session.close();  
     } finally {  
       if (connection != null) {  
         connection.close();  
       }  
     }  
   }  
 }  

MessageBroker:

 import java.net.URI;  
 import java.net.URISyntaxException;  
 import org.apache.activemq.broker.BrokerFactory;  
 import org.apache.activemq.broker.BrokerService;  
 /**  
  *  
  * @author Ataur Rahman  
  */  
 public class MessageBroker {  
   public static void main(String[] args) throws URISyntaxException, Exception {  
     BrokerService broker = BrokerFactory.createBroker(new URI(  
         "broker:(tcp://localhost:2020)"));  
     broker.start();  
   }  
 }  

MessageConsumer :

 import static com.tolet.messageserver.LogMessageProducer.topicName;  
 import static com.tolet.messageserver.LogMessageProducer.clientId;  
 import java.util.logging.Level;  
 import java.util.logging.Logger;  
 import javax.jms.Connection;  
 import javax.jms.JMSException;  
 import javax.jms.MapMessage;  
 import javax.jms.Session;  
 import javax.jms.Topic;  
 import javax.jms.TopicSubscriber;  
 import org.apache.activemq.ActiveMQConnectionFactory;  
 /**  
  *  
  * @author Ataur Rahman  
  */  
 public class LogMessageConsumer extends Thread {  
   @Override  
   public void run() {  
     try {  
       ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(  
           "tcp://localhost:2020");  
       connectionFactory.setTrustAllPackages(true);  
       connectionFactory.setDispatchAsync(true);  
       Connection connection = connectionFactory.createConnection();  
       connection.setClientID(clientId);  
       Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);  
       Topic topic = session.createTopic(topicName);  
       try {  
         TopicSubscriber consumer = session.createDurableSubscriber(topic,"Test_Durable_Subscriber");  
         connection.start();  
         while(true){  
           System.out.println("Waiting for message ==== ");  
           MapMessage msg = (MapMessage) consumer.receive();  
           System.out.println(msg);  
           try {  
             Thread.sleep(10000);  
           } catch (InterruptedException ex) {  
             Logger.getLogger(LogMessageConsumer.class.getName()).log(Level.SEVERE, null, ex);  
           }  
           System.out.println("$$$$$$$$$$ id = "+msg.getString("id"));  
           System.out.println("$$$$$$$$$$ message = "+msg.getString("message"));  
         }  
         //session.close();  
       } catch (JMSException ex) {  
         Logger.getLogger(LogMessageConsumer.class.getName()).log(Level.SEVERE, null, ex);  
       } finally {  
         if (connection != null) {  
           connection.close();  
         }  
       }  
     } catch (JMSException ex) {  
       Logger.getLogger(LogMessageConsumer.class.getName()).log(Level.SEVERE, null, ex);  
     }  
   }  
   public static void main(String[] args) throws JMSException {  
     LogMessageConsumer l = new LogMessageConsumer();  
     l.start();  
   }  
 }  

Here I use MapMessage because if the message producer resides in a different program then Message Consumers can not cast to object message. So for safer message sending, I choose MapMessage.

To run the program,

  1. Run MessageBroker class.
  2. Run Producer Class.
  3. Run Consumer Class.
Now see the message in console.
Thanks and Cheers.

Comments

Popular posts from this blog

UUID to BigInteger conversion and BigInteger to UUID conversion

Create Maven Local Repository, Upload your own artifact & download jar from local mirror with Artifactory and Nexus OSS