package durablesubscribers; import java.util.Hashtable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.jms.Message; import javax.jms.TopicConnectionFactory; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.naming.Context; import javax.naming.InitialContext; /** * * @author Pierre */ public class DurableSubscriberFactory implements Runnable { public static final int NUMBER_OF_THREADS = 50; public static final int WAIT_BETWEEN_RECEIVE = 10000; private String clientId = null; public static void main(String[] args) throws Exception { ExecutorService service = Executors.newFixedThreadPool(NUMBER_OF_THREADS); for (int i = 0; i < NUMBER_OF_THREADS; i++) { DurableSubscriberFactory runnable = new DurableSubscriberFactory(); runnable.setClientId("PV_CLIENTID_" + i); service.submit(runnable); Thread.sleep(200); } } public void run() { TopicConnection topicConnection = null; try { System.out.println("starting"); // get the initial context, refer to your app server docs for this Hashtable<String, String> ht = new Hashtable<String, String>(); ht.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory"); ht.put(Context.PROVIDER_URL, "t3://myserver:7011"); Context ctx = new InitialContext(ht); System.out.println("looking up"); // get the connection factory, and open a connection TopicConnectionFactory qcf = (TopicConnectionFactory) ctx.lookup("myConnectionFactory"); System.out.println("createTopicConnection"); topicConnection = qcf.createTopicConnection(); // must specify, otherwise you get an IllegalStateException topicConnection.setClientID(getClientId()); // start connection to receive messages topicConnection.start(); // create topic session off the connection System.out.println("createTopicSession"); TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // get handle on topic, create a durable subscriber and consume messages System.out.println("lookup topic"); // you can only create a Durable Subscriber to a individual member, not to a Distributed Destination Topic topic = (Topic) ctx.lookup("JMS_SERVER_01@TOPICNAME"); System.out.println("create durable subscriber"); TopicSubscriber topicSubscriber = topicSession.createDurableSubscriber(topic, "PVTEST"); while (true) { System.out.println("receiving...."); Message message = topicSubscriber.receive(); System.out.println("received message " + message); Thread.sleep(WAIT_BETWEEN_RECEIVE); } } catch (Exception e) { e.printStackTrace(); } finally { // close the queue connection if (topicConnection != null) { try { topicConnection.close(); } catch (Exception e) { e.printStackTrace(); } } } } /** * @return the clientId */ public String getClientId() { return clientId; } /** * @param clientId the clientId to set */ public void setClientId(String clientId) { this.clientId = clientId; } }
It works really great.... but when you exit the application, the Durable Subscribers will still appear in the Consumers for the Topic.
If you want to remove them, do this:
package durablesubscribers; import java.util.Hashtable; import javax.jms.TopicConnectionFactory; import javax.jms.Session; import javax.jms.TopicConnection; import javax.jms.TopicSession; import javax.naming.Context; import javax.naming.InitialContext; /** * * @author vernetto */ public class UnsubscribeAll { public static final int NUMBER_OF_THREADS = 50; private String clientId = null; public static void main(String[] args) throws Exception { for (int i = 0; i < NUMBER_OF_THREADS; i++) { UnsubscribeAll runnable = new UnsubscribeAll(); runnable.setClientId("PV_CLIENTID_" + i); runnable.unsubscribe(); Thread.sleep(200); } } /** * @param args the command line arguments */ public void unsubscribe() { TopicConnection topicConnection = null; try { System.out.println("starting"); // get the initial context, refer to your app server docs for this Hashtable<String, String> ht = new Hashtable<String, String>(); ht.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory"); ht.put(Context.PROVIDER_URL, "t3://myserver:7011"); Context ctx = new InitialContext(ht); System.out.println("looking up"); // get the connection factory, and open a connection TopicConnectionFactory qcf = (TopicConnectionFactory) ctx.lookup("myConnectionFactory"); System.out.println("createTopicConnection"); topicConnection = qcf.createTopicConnection(); // must specify, otherwise you get an IllegalStateException topicConnection.setClientID(getClientId()); // create topic session off the connection System.out.println("createTopicSession"); TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); topicSession.unsubscribe("PVTEST"); } catch (Exception e) { e.printStackTrace(); } finally { // close the queue connection if (topicConnection != null) { try { topicConnection.close(); } catch (Exception e) { e.printStackTrace(); } } } } /** * @return the clientId */ public String getClientId() { return clientId; } /** * @param clientId the clientId to set */ public void setClientId(String clientId) { this.clientId = clientId; } }
No comments:
Post a Comment