View Javadoc

1   /****************************************************************************
2    * Copyright (c) 2005, 2006, 2007, 2008, 2009 Imola Informatica.
3    * All rights reserved. This program and the accompanying materials
4    * are made available under the terms of the LGPL License v2.1
5    * which accompanies this distribution, and is available at
6    * http://www.gnu.org/licenses/lgpl.html
7    ****************************************************************************/
8   /*
9   
10   * The contents of this file are subject to the terms
11  
12   * of the Common Development and Distribution License
13  
14   * (the "License").  You may not use this file except
15  
16   * in compliance with the License.
17  
18   *
19  
20   * You can obtain a copy of the license at
21  
22   * https://open-esb.dev.java.net/public/CDDLv1.0.html.
23  
24   * See the License for the specific language governing
25  
26   * permissions and limitations under the License.
27  
28   *
29  
30   * When distributing Covered Code, include this CDDL
31  
32   * HEADER in each file and include the License file at
33  
34   * https://open-esb.dev.java.net/public/CDDLv1.0.html.
35  
36   * If applicable add the following below this CDDL HEADER,
37  
38   * with the fields enclosed by brackets "[]" replaced with
39  
40   * your own identifying information: Portions Copyright
41  
42   * [year] [name of copyright owner]
43  
44   */
45  
46  /*
47  
48   * Copyright 2004-2006 Sun Microsystems, Inc. All Rights Reserved.
49  
50   */
51  
52  /*
53  
54   * MessageExchangeReceiver.java
55  
56   *
57  
58   */
59  
60  package it.imolinfo.jbi4corba.jbi.component.runtime;
61  
62  import it.imolinfo.jbi4corba.Logger;
63  import it.imolinfo.jbi4corba.LoggerFactory;
64  
65  import java.util.concurrent.ExecutorService;
66  
67  import java.util.concurrent.Executors;
68  
69  import java.util.concurrent.TimeUnit;
70  
71  import javax.jbi.messaging.DeliveryChannel;
72  
73  import javax.jbi.messaging.MessageExchange;
74  
75  import javax.jbi.messaging.MessagingException;
76  
77  import it.imolinfo.jbi4corba.jbi.Messages;
78  
79  /**
80   * 
81   * This main purpose of this class is to manage receiving the MessageExchange
82   * object from the DeliveryChannel and process them by delegating the processing
83   * of the MessageExchange object to MessageExchangeHandlers configured for the
84   * component. It also provides the controller methods to start and stop
85   * processing the MessageExchange objects which will be used when the component
86   * lifecycle is controlled.
87   * 
88   * This class creates a single thread to receive the MessageExchange objects
89   * from the delivery channel and then finds the appropriate message exchange
90   * handler to process the message exchange. Each message exchange handler will
91   * be executed in a separate thread from a pool of handler threads.
92   * 
93   * @author <a href="mailto:mpiraccini@imolinfo.it">Marco Piraccini</a>
94   */
95  
96  public class MessageExchangeReceiver {
97  
98  	/** The logger. */
99  	private static final Logger LOG = LoggerFactory
100 			.getLogger(MessageExchangeReceiver.class);
101 	private static final Messages MESSAGES = Messages
102 			.getMessages(MessageExchangeReceiver.class);
103 
104 	/** delivery channel accept time out */
105 
106 	private final static long DC_ACCEPT_TIME_OUT = 3000; // milliseconds
107 
108 	/** receiver thread wait time before polling for messages after woke up **/
109 
110 	private final static long RECEIVER_WAIT_TIME = 2000; // milliseconds
111 
112 	/** receiver thread wait time before force shutdown */
113 
114 	private final static long RECEIVER_SHUTDOWN_WAIT_TIME = 10; // seconds
115 
116 	/** handler threads wait time before forced shutdown */
117 
118 	private final static long HANDLERS_SHUTDOWN_WAIT_TIME = 30; // seconds
119 
120 	/** handler thread pool size */
121 
122 	private static int HANDLER_THREAD_POOL_SIZE = 5;
123 
124 	/** receiver thread accept message exchange condition */
125 
126 	private Boolean mCanAccept = false;
127 
128 	/** receiver thread termination condition */
129 
130 	private Boolean mContinue = true;
131 
132 	/** receiver thread executor service */
133 
134 	private ExecutorService mReceiverThreadMgr;
135 
136 	/** handler thread executor service */
137 
138 	private ExecutorService mHandlerThreadPool;
139 
140 	/** no default constructor for extended classes */
141 
142 	public MessageExchangeReceiver() throws Exception {
143 
144 	}
145 
146 	public final void initReceiver(RuntimeConfiguration mRuntimeConfig)
147 			throws Exception {
148 
149 		if (mRuntimeConfig.getOutboundThreads() > 0) {
150 			HANDLER_THREAD_POOL_SIZE = mRuntimeConfig.getOutboundThreads();
151 		}
152 
153 		// DEBUG
154 		RuntimeHelper
155 				.logDebug("[MessageExchangeReceiver.initReceiver()] OutBoundThreads - HANDLER_THREAD_POOL_SIZE = "
156 						+ HANDLER_THREAD_POOL_SIZE);
157 
158 		this.mHandlerThreadPool = Executors
159 				.newFixedThreadPool(HANDLER_THREAD_POOL_SIZE);
160 
161 		this.mReceiverThreadMgr = Executors.newSingleThreadExecutor();
162 
163 		this.mReceiverThreadMgr.execute(new Runnable() {
164 
165 			@SuppressWarnings("static-access")
166 			public void run() {
167 
168 				Thread t = Thread.currentThread();
169 
170 				while (mContinue) {
171 
172 					if (mCanAccept) {
173 
174 						receiveAndProcessMessageExchange();
175 
176 					} else {
177 
178 						try {
179 
180 							t.sleep(RECEIVER_WAIT_TIME);
181 
182 						} catch (InterruptedException interruptException) {
183 
184 							// someone must have interrupted this thread
185 							// do nothing
186 
187 							RuntimeHelper
188 									.logDebug("Interrupted the MessageReceiverThread in Sleep");
189 
190 						}
191 
192 					}
193 
194 				}
195 
196 			}
197 
198 		});
199 
200 	}
201 
202 	public final void shutdownReceiver() throws Exception {
203 
204 		synchronized (mContinue) {
205 
206 			mContinue = false;
207 
208 		}
209 
210 		boolean terminated = false;
211 
212 		try {
213 
214 			this.mReceiverThreadMgr.shutdown();
215 
216 			terminated = this.mReceiverThreadMgr.awaitTermination(
217 
218 			RECEIVER_SHUTDOWN_WAIT_TIME, TimeUnit.SECONDS);
219 
220 		} catch (InterruptedException ex) {
221 
222 			RuntimeHelper.logDebug(ex);
223 
224 		} finally {
225 
226 			if (!terminated) {
227 
228 				RuntimeHelper
229 						.logDebug("Message Receiver not shutdown. Forcing shutdown");
230 
231 				this.mReceiverThreadMgr.shutdownNow();
232 
233 			}
234 
235 		}
236 
237 		shutdownHandlers();
238 
239 	}
240 
241 	private final void shutdownHandlers() throws Exception {
242 
243 		boolean terminated = false;
244 
245 		try {
246 
247 			this.mHandlerThreadPool.shutdown();
248 
249 			terminated = this.mHandlerThreadPool.awaitTermination(
250 
251 			HANDLERS_SHUTDOWN_WAIT_TIME, TimeUnit.SECONDS);
252 
253 		} catch (InterruptedException ex) {
254 
255 			RuntimeHelper.logDebug(ex);
256 
257 		} finally {
258 
259 			if (!terminated) {
260 
261 				RuntimeHelper
262 						.logDebug("Handler threads not shutdown. Forcing shutdown");
263 
264 				this.mHandlerThreadPool.shutdownNow();
265 
266 			}
267 
268 		}
269 
270 	}
271 
272 	public final void startProcessing() throws Exception {
273 
274 		synchronized (this.mCanAccept) {
275 
276 			this.mCanAccept = true;
277 
278 		}
279 
280 	}
281 
282 	public final void stopProcessing() throws Exception {
283 
284 		synchronized (this.mCanAccept) {
285 
286 			this.mCanAccept = false;
287 
288 		}
289 
290 	}
291 
292 	private void receiveAndProcessMessageExchange() {
293 
294 		try {
295 
296 			DeliveryChannel channel = RuntimeHelper.getDeliveryChannel();
297 
298 			MessageExchange msgExchange = null;
299 
300 			if (channel == null) {
301 
302 				RuntimeHelper
303 						.logDebug("DeliveryChannel Not Opened for receiving messages");
304 
305 				return;
306 
307 			}
308 
309 			msgExchange = channel.accept(DC_ACCEPT_TIME_OUT);
310 
311 			if (msgExchange == null) {
312 
313 				// RuntimeHelper.logDebug("DeliveryChannel returned null message exchange from accept");
314 
315 				return;
316 
317 			}
318 
319 			MessageExchangeHandler handler = findMessageExchangeHandler(msgExchange);
320 
321 			if (handler == null) {
322 
323 				String msg = MESSAGES.getString(
324 						"CRB000438_MessageExchagneHandler_NULL",
325 						new Object[] { msgExchange });
326 				LOG.error(msg);
327 
328 				return;
329 
330 			}
331 
332 			// process message exchange. This could be done in a separate
333 			// thread.
334 
335 			handler.setMessageExchange(msgExchange);
336 
337 			// handler.processMessageExchange();
338 
339 			// try using the pool
340 
341 			this.mHandlerThreadPool.execute(handler);
342 
343 		} catch (MessagingException ex) {
344 
345 			// RuntimeHelper.logWarning(ex);
346 			String msg = MESSAGES
347 					.getString("CRB000439_Warning_in_receive_and_process_message_exchange");
348 			LOG.warn(msg, ex);
349 
350 			ex.printStackTrace();
351 
352 		}
353 
354 	}
355 
356 	private MessageExchangeHandler findMessageExchangeHandler(
357 			MessageExchange msgExchange) {
358 
359 		MessageExchangeHandler handler = null;
360 
361 		handler = RuntimeContext.getInstance().newMessageExchangeHandler(
362 				msgExchange);
363 
364 		return handler;
365 
366 	}
367 
368 }