1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 package it.imolinfo.jbi4corba.jbi.component.runtime;
61
62 import it.imolinfo.jbi4corba.Logger;
63 import it.imolinfo.jbi4corba.LoggerFactory;
64
65 import java.util.concurrent.ExecutorService;
66
67 import java.util.concurrent.Executors;
68
69 import java.util.concurrent.TimeUnit;
70
71 import javax.jbi.messaging.DeliveryChannel;
72
73 import javax.jbi.messaging.MessageExchange;
74
75 import javax.jbi.messaging.MessagingException;
76
77 import it.imolinfo.jbi4corba.jbi.Messages;
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 public class MessageExchangeReceiver {
97
98
99 private static final Logger LOG = LoggerFactory
100 .getLogger(MessageExchangeReceiver.class);
101 private static final Messages MESSAGES = Messages
102 .getMessages(MessageExchangeReceiver.class);
103
104
105
106 private final static long DC_ACCEPT_TIME_OUT = 3000;
107
108
109
110 private final static long RECEIVER_WAIT_TIME = 2000;
111
112
113
114 private final static long RECEIVER_SHUTDOWN_WAIT_TIME = 10;
115
116
117
118 private final static long HANDLERS_SHUTDOWN_WAIT_TIME = 30;
119
120
121
122 private static int HANDLER_THREAD_POOL_SIZE = 5;
123
124
125
126 private Boolean mCanAccept = false;
127
128
129
130 private Boolean mContinue = true;
131
132
133
134 private ExecutorService mReceiverThreadMgr;
135
136
137
138 private ExecutorService mHandlerThreadPool;
139
140
141
142 public MessageExchangeReceiver() throws Exception {
143
144 }
145
146 public final void initReceiver(RuntimeConfiguration mRuntimeConfig)
147 throws Exception {
148
149 if (mRuntimeConfig.getOutboundThreads() > 0) {
150 HANDLER_THREAD_POOL_SIZE = mRuntimeConfig.getOutboundThreads();
151 }
152
153
154 RuntimeHelper
155 .logDebug("[MessageExchangeReceiver.initReceiver()] OutBoundThreads - HANDLER_THREAD_POOL_SIZE = "
156 + HANDLER_THREAD_POOL_SIZE);
157
158 this.mHandlerThreadPool = Executors
159 .newFixedThreadPool(HANDLER_THREAD_POOL_SIZE);
160
161 this.mReceiverThreadMgr = Executors.newSingleThreadExecutor();
162
163 this.mReceiverThreadMgr.execute(new Runnable() {
164
165 @SuppressWarnings("static-access")
166 public void run() {
167
168 Thread t = Thread.currentThread();
169
170 while (mContinue) {
171
172 if (mCanAccept) {
173
174 receiveAndProcessMessageExchange();
175
176 } else {
177
178 try {
179
180 t.sleep(RECEIVER_WAIT_TIME);
181
182 } catch (InterruptedException interruptException) {
183
184
185
186
187 RuntimeHelper
188 .logDebug("Interrupted the MessageReceiverThread in Sleep");
189
190 }
191
192 }
193
194 }
195
196 }
197
198 });
199
200 }
201
202 public final void shutdownReceiver() throws Exception {
203
204 synchronized (mContinue) {
205
206 mContinue = false;
207
208 }
209
210 boolean terminated = false;
211
212 try {
213
214 this.mReceiverThreadMgr.shutdown();
215
216 terminated = this.mReceiverThreadMgr.awaitTermination(
217
218 RECEIVER_SHUTDOWN_WAIT_TIME, TimeUnit.SECONDS);
219
220 } catch (InterruptedException ex) {
221
222 RuntimeHelper.logDebug(ex);
223
224 } finally {
225
226 if (!terminated) {
227
228 RuntimeHelper
229 .logDebug("Message Receiver not shutdown. Forcing shutdown");
230
231 this.mReceiverThreadMgr.shutdownNow();
232
233 }
234
235 }
236
237 shutdownHandlers();
238
239 }
240
241 private final void shutdownHandlers() throws Exception {
242
243 boolean terminated = false;
244
245 try {
246
247 this.mHandlerThreadPool.shutdown();
248
249 terminated = this.mHandlerThreadPool.awaitTermination(
250
251 HANDLERS_SHUTDOWN_WAIT_TIME, TimeUnit.SECONDS);
252
253 } catch (InterruptedException ex) {
254
255 RuntimeHelper.logDebug(ex);
256
257 } finally {
258
259 if (!terminated) {
260
261 RuntimeHelper
262 .logDebug("Handler threads not shutdown. Forcing shutdown");
263
264 this.mHandlerThreadPool.shutdownNow();
265
266 }
267
268 }
269
270 }
271
272 public final void startProcessing() throws Exception {
273
274 synchronized (this.mCanAccept) {
275
276 this.mCanAccept = true;
277
278 }
279
280 }
281
282 public final void stopProcessing() throws Exception {
283
284 synchronized (this.mCanAccept) {
285
286 this.mCanAccept = false;
287
288 }
289
290 }
291
292 private void receiveAndProcessMessageExchange() {
293
294 try {
295
296 DeliveryChannel channel = RuntimeHelper.getDeliveryChannel();
297
298 MessageExchange msgExchange = null;
299
300 if (channel == null) {
301
302 RuntimeHelper
303 .logDebug("DeliveryChannel Not Opened for receiving messages");
304
305 return;
306
307 }
308
309 msgExchange = channel.accept(DC_ACCEPT_TIME_OUT);
310
311 if (msgExchange == null) {
312
313
314
315 return;
316
317 }
318
319 MessageExchangeHandler handler = findMessageExchangeHandler(msgExchange);
320
321 if (handler == null) {
322
323 String msg = MESSAGES.getString(
324 "CRB000438_MessageExchagneHandler_NULL",
325 new Object[] { msgExchange });
326 LOG.error(msg);
327
328 return;
329
330 }
331
332
333
334
335 handler.setMessageExchange(msgExchange);
336
337
338
339
340
341 this.mHandlerThreadPool.execute(handler);
342
343 } catch (MessagingException ex) {
344
345
346 String msg = MESSAGES
347 .getString("CRB000439_Warning_in_receive_and_process_message_exchange");
348 LOG.warn(msg, ex);
349
350 ex.printStackTrace();
351
352 }
353
354 }
355
356 private MessageExchangeHandler findMessageExchangeHandler(
357 MessageExchange msgExchange) {
358
359 MessageExchangeHandler handler = null;
360
361 handler = RuntimeContext.getInstance().newMessageExchangeHandler(
362 msgExchange);
363
364 return handler;
365
366 }
367
368 }