Thursday, September 22, 2011

Load test for Durable Subscribers

Here is the Java program to create dynamically multiple Durable Subscribers

package durablesubscribers;

import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.jms.Message;

import javax.jms.TopicConnectionFactory;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;

/**
 *
 * @author Pierre
 */
public class DurableSubscriberFactory implements Runnable {
    public static final int NUMBER_OF_THREADS = 50;
    public static final int WAIT_BETWEEN_RECEIVE = 10000;
    
    private String clientId = null;

    public static void main(String[] args) throws Exception {
         ExecutorService service = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
         for (int i = 0; i < NUMBER_OF_THREADS; i++) {
             DurableSubscriberFactory runnable = new DurableSubscriberFactory();
             runnable.setClientId("PV_CLIENTID_" + i);
             service.submit(runnable);
             Thread.sleep(200);
             
         }
    }

    public void run() {
        TopicConnection topicConnection = null;
        try {
            System.out.println("starting");
            // get the initial context, refer to your app server docs for this
            Hashtable<String, String> ht = new Hashtable<String, String>();
            ht.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
            ht.put(Context.PROVIDER_URL, "t3://myserver:7011");
            Context ctx = new InitialContext(ht);

            System.out.println("looking up");
            // get the connection factory, and open a connection
            TopicConnectionFactory qcf = (TopicConnectionFactory) ctx.lookup("myConnectionFactory");
            System.out.println("createTopicConnection");
            topicConnection = qcf.createTopicConnection();
            // must specify, otherwise you get an IllegalStateException
            topicConnection.setClientID(getClientId());
            // start connection to receive messages
            topicConnection.start();

            // create topic session off the connection
            System.out.println("createTopicSession");
            TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

            // get handle on topic, create a durable subscriber and consume messages
            System.out.println("lookup topic");

            // you can only create a Durable Subscriber to a individual member, not to a Distributed Destination
            Topic topic = (Topic) ctx.lookup("JMS_SERVER_01@TOPICNAME");
            System.out.println("create durable subscriber");
            TopicSubscriber topicSubscriber = topicSession.createDurableSubscriber(topic, "PVTEST");
            while (true) {
                System.out.println("receiving....");
                Message message = topicSubscriber.receive();
                System.out.println("received message " + message);
                Thread.sleep(WAIT_BETWEEN_RECEIVE);
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // close the queue connection
            if (topicConnection != null) {
                try {
                    topicConnection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }
        }
    }

    /**
     * @return the clientId
     */
    public String getClientId() {
        return clientId;
    }

    /**
     * @param clientId the clientId to set
     */
    public void setClientId(String clientId) {
        this.clientId = clientId;
    }
}






It works really great.... but when you exit the application, the Durable Subscribers will still appear in the Consumers for the Topic.
If you want to remove them, do this:

package durablesubscribers;

import java.util.Hashtable;

import javax.jms.TopicConnectionFactory;
import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;

/**
 *
 * @author vernetto
 */
public class UnsubscribeAll {
    public static final int NUMBER_OF_THREADS = 50;

    private String clientId = null;

    public static void main(String[] args) throws Exception {
         for (int i = 0; i < NUMBER_OF_THREADS; i++) {
             UnsubscribeAll runnable = new UnsubscribeAll();
             runnable.setClientId("PV_CLIENTID_" + i);
             runnable.unsubscribe();
             Thread.sleep(200);
         }
    }

    /**
     * @param args the command line arguments
     */
    public void unsubscribe() {
        TopicConnection topicConnection = null;
        try {
            System.out.println("starting");
            // get the initial context, refer to your app server docs for this
            Hashtable<String, String> ht = new Hashtable<String, String>();
            ht.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
            ht.put(Context.PROVIDER_URL, "t3://myserver:7011");
            Context ctx = new InitialContext(ht);

            System.out.println("looking up");
            // get the connection factory, and open a connection
            TopicConnectionFactory qcf = (TopicConnectionFactory) ctx.lookup("myConnectionFactory");
            System.out.println("createTopicConnection");
            topicConnection = qcf.createTopicConnection();
            // must specify, otherwise you get an IllegalStateException
            topicConnection.setClientID(getClientId());

            // create topic session off the connection
            System.out.println("createTopicSession");
            TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

            topicSession.unsubscribe("PVTEST");

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // close the queue connection
            if (topicConnection != null) {
                try {
                    topicConnection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }
        }
    }

    /**
     * @return the clientId
     */
    public String getClientId() {
        return clientId;
    }

    /**
     * @param clientId the clientId to set
     */
    public void setClientId(String clientId) {
        this.clientId = clientId;
    }
}





No comments: