Sunday, February 6, 2011

Java + Active Mq + JMS

Core Java + Apache Active Mq
Steps to follow:
1) Download  apache-activemq-5.3.0-bin
2) Unzip on C drive and bin/apache.exe to check if the apache is working fine.
3) In eclipse Include the jar file activemq-all-5.3.0 from the unzipped folder.
4)
----------------------------------------------------------------
java --MyFirstActiveMqQueue
package jms;

import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;

/**
 *
 * @author gisbert.amm
 */
public class MyFirstActiveMqQueue {

    private static ActiveMQConnectionFactory connectionFactory;
    private static Connection connection;
    private static Session session;
    private static Destination destination;
    private static boolean transacted = false;

    public static void main(String[] args) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setUseJmx(true);
        broker.addConnector("tcp://localhost:61616");
        broker.start();

        setUp();
        createProducerAndSendAMessage();
        System.out.println("Simulating a huge network delay :)");
        Thread.sleep(4000);
        createConsumerAndReceiveAMessage();

        //TODO: Find out how to get rid of the exceptions thrown when stopping the broker
        broker.stop();
    }

    private static void setUp() throws JMSException {
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                ActiveMQConnection.DEFAULT_BROKER_URL);
        connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue("mmy first active mq queue");
    }

    private static void createProducerAndSendAMessage() throws JMSException {
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        TextMessage message = session.createTextMessage("Hello World!");
        System.out.println("Sending message: " + message.getText());
        producer.send(message);
    }

    private static void createConsumerAndReceiveAMessage() throws JMSException, InterruptedException {
        connection = connectionFactory.createConnection();
        connection.start();
        MessageConsumer consumer = session.createConsumer(destination);
        MyConsumer myConsumer = new MyConsumer();
        connection.setExceptionListener(myConsumer);
        consumer.setMessageListener(myConsumer);
    }
}
--------------------------------------------------------------------------------------------------------------------------------------------
package jms;

import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;

public class MyConsumer implements MessageListener, ExceptionListener {

    synchronized public void onException(JMSException ex) {
        System.out.println("JMS Exception occured.  Shutting down client.");
        System.exit(1);
    }

    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out
                        .println("Received message: " + textMessage.getText());
            } catch (JMSException ex) {
                System.out.println("Error reading message: " + ex);
            }
        } else {
            System.out.println("Received: " + message);
        }
    }
}
---------------------------------------------------------------------
http://localhost:8161/admin
To monitor go to jdk/bin/jconsole
>jconsole broker:(tcp://localhost:61616)?useJmx=true

No comments:

Post a Comment