1
2
3
4
5
6
7
8 package it.imolinfo.jbi4corba.jbi.processor;
9
10
11
12 import it.imolinfo.jbi4corba.Logger;
13 import it.imolinfo.jbi4corba.LoggerFactory;
14 import it.imolinfo.jbi4corba.exception.Jbi4CorbaException;
15 import it.imolinfo.jbi4corba.jbi.component.runtime.RuntimeHelper;
16 import it.imolinfo.jbi4corba.jbi.cxf.CXFUtils;
17 import it.imolinfo.jbi4corba.jbi.endpoint.ProviderEndpoint;
18 import it.imolinfo.jbi4corba.jbi.processor.transform.SourceTransformer;
19
20 import java.io.ByteArrayInputStream;
21 import java.io.ByteArrayOutputStream;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25 import java.io.PrintWriter;
26
27 import javax.jbi.messaging.ExchangeStatus;
28 import javax.jbi.messaging.Fault;
29 import javax.jbi.messaging.InOnly;
30 import javax.jbi.messaging.InOptionalOut;
31 import javax.jbi.messaging.InOut;
32 import javax.jbi.messaging.MessageExchange;
33 import javax.jbi.messaging.NormalizedMessage;
34 import javax.xml.namespace.QName;
35 import javax.xml.parsers.ParserConfigurationException;
36 import javax.xml.stream.XMLStreamException;
37 import javax.xml.stream.XMLStreamReader;
38 import javax.xml.transform.Source;
39 import javax.xml.transform.Transformer;
40 import javax.xml.transform.TransformerConfigurationException;
41 import javax.xml.transform.TransformerException;
42 import javax.xml.transform.TransformerFactory;
43 import javax.xml.transform.dom.DOMSource;
44 import javax.xml.transform.stream.StreamResult;
45 import javax.xml.transform.stream.StreamSource;
46
47 import net.java.hulp.measure.Probe;
48
49 import org.apache.cxf.endpoint.Endpoint;
50 import org.apache.cxf.message.Exchange;
51 import org.apache.cxf.message.ExchangeImpl;
52 import org.apache.cxf.message.FaultMode;
53 import org.apache.cxf.message.Message;
54 import org.apache.cxf.message.MessageImpl;
55 import org.apache.cxf.phase.PhaseInterceptorChain;
56 import org.apache.cxf.phase.PhaseManager;
57 import org.apache.cxf.service.Service;
58 import org.apache.cxf.service.model.BindingInfo;
59 import org.apache.cxf.service.model.BindingOperationInfo;
60 import org.apache.cxf.service.model.EndpointInfo;
61 import org.apache.cxf.service.model.ServiceInfo;
62 import org.w3c.dom.Document;
63 import org.w3c.dom.Element;
64 import org.w3c.dom.Node;
65 import org.xml.sax.SAXException;
66
67
68
69
70
71 public class ProviderExchangeProcessor implements ExchangeProcessor {
72
73
74
75 private static final TransformerFactory TRANSFORMER_FACTORY = TransformerFactory.newInstance();
76
77 private static final Logger LOG = LoggerFactory.getLogger(ProviderExchangeProcessor.class);
78
79
80
81 protected ProviderEndpoint endpoint;
82
83
84
85 private SourceTransformer transformer;
86
87
88
89
90
91
92
93
94
95
96
97 private Probe mMeasurement = null;
98
99
100
101
102
103
104
105
106
107
108
109 public ProviderExchangeProcessor(ProviderEndpoint endpoint) throws Jbi4CorbaException {
110
111 this.endpoint = endpoint;
112
113 this.transformer = new SourceTransformer();
114
115
116
117
118
119 }
120
121
122
123
124
125
126
127
128
129
130
131 public void process(MessageExchange jbiExchange) {
132
133
134 Object IOR = jbiExchange.getProperty("EPR_IOR");
135
136 endpoint.getEndpointStatus().incrementReceivedRequests();
137
138
139
140 LOG.debug(">>>>> process - begin. MessageExchange=" + jbiExchange);
141
142
143
144 try {
145
146
147
148 Service cxfService = endpoint.getCXFService();
149
150 Exchange cxfExchange = new ExchangeImpl();
151
152
153
154 NormalizedMessage in = jbiExchange.getMessage("in");
155
156
157
158
159
160 if (LOG.isDebugEnabled()) {
161
162 String inMessage = "In message, before unwrapping: " + transformer.contentToString(in);
163
164 LOG.debug(inMessage);
165
166 }
167
168
169
170
171
172 String topic = new String("Denormalization");
173
174 mMeasurement = Probe.fine(getClass(), endpoint.getUniqueName(), topic);
175
176
177 MessageDenormalizer messageDenormalizer = new MessageDenormalizer();
178 JbiMessage inMsg = messageDenormalizer.denormalize(in, endpoint, jbiExchange.getOperation(), true, false);
179
180 if (LOG.isDebugEnabled()) {
181
182 String inMessage = "In message, after unwrapping: " + transformer.toString(inMsg.getMessageSource());
183
184 LOG.debug(inMessage);
185
186 }
187
188
189
190
191
192 if (jbiExchange.getOperation() != null) {
193
194
195
196
197
198 QName operation = jbiExchange.getOperation();
199
200
201
202 if (IOR != null) {
203 cxfExchange.put("EPR_IOR", IOR);
204 }
205
206
207
208 cxfExchange.put("Measure-deN", mMeasurement);
209
210
211
212
213
214 cxfExchange.put("EndpointName", endpoint.getUniqueName());
215
216
217
218
219
220 ByteArrayOutputStream cxfOut = invokeCXFOperation(cxfExchange, cxfService, operation, inMsg.getMessageSource(), isOneWay(jbiExchange));
221
222
223
224
225
226 if (isInAndOut(jbiExchange)) {
227
228
229
230
231
232 if (cxfExchange.getOutMessage().get(FaultMode.class) != null) {
233
234
235
236
237
238 FaultMode faultMode = (FaultMode) cxfExchange.getOutMessage().get(FaultMode.class);
239
240 org.apache.cxf.interceptor.Fault cxfFault = (org.apache.cxf.interceptor.Fault) cxfExchange.getOutFaultMessage().getContent(Exception.class);
241
242
243
244 if (faultMode.equals(FaultMode.CHECKED_APPLICATION_FAULT)) {
245
246
247
248
249
250 InputStream inputStream = new ByteArrayInputStream(cxfOut.toByteArray());
251
252 Source faultSource = transformer.toDOMSourceFromStream(new StreamSource(inputStream));
253
254
255
256 QName faultName = (QName) cxfExchange.getOutFaultMessage().getContent(QName.class);
257
258
259 LOG.info("CRB000801_Raised_checked_application_fault_with_code",
260 new Object[]{faultName});
261
262
263
264
265
266 Fault jbiFault = jbiExchange.createFault();
267
268 jbiFault.setContent(faultSource);
269
270
271
272
273 MessageNormalizer messageNormalizer = new MessageNormalizer();
274 messageNormalizer.normalizeFault(faultSource, jbiFault, endpoint, jbiExchange.getOperation(), faultName.getLocalPart(), inMsg.isWrapped());
275
276
277
278 if (LOG.isDebugEnabled()) {
279
280 String inMessage = "Fault message, after wrapping: " + transformer.contentToString(jbiFault);
281
282 LOG.debug(inMessage);
283
284 }
285
286
287
288
289
290 jbiExchange.setFault(jbiFault);
291
292 endpoint.getEndpointStatus().incrementSentReplies();
293
294 } else {
295
296
297
298
299
300
301 LOG.info("CRB000802_Raised_unchecked_application_fault_with_exception",
302 new Object[]{cxfFault.getCause()});
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317 endpoint.getEndpointStatus().incrementSentErrors();
318
319 if (cxfFault.getCause() instanceof Exception) {
320
321 jbiExchange.setError((Exception) cxfFault.getCause());
322
323 } else {
324
325 jbiExchange.setError(new Exception(cxfFault.getCause()));
326
327 }
328
329
330
331 OutputStream detail = new ByteArrayOutputStream();
332 PrintWriter writer = new PrintWriter(detail,true);
333 cxfFault.getCause().printStackTrace(writer);
334
335
336
337 jbiExchange.setProperty("com.sun.jbi.crl.faultcode",
338 "Server");
339 jbiExchange.setProperty("com.sun.jbi.crl.faultstring",
340 cxfFault.getCause().toString());
341 jbiExchange.setProperty("com.sun.jbi.crl.faultdetail",
342 detail.toString());
343 jbiExchange.setProperty("com.sun.jbi.crl.faultactor",
344 "jbi4corba");
345 LOG.debug(">>>>> Set Message Exchange Properties");
346 }
347
348
349
350 } else {
351
352
353
354 NormalizedMessage outMsg = jbiExchange.createMessage();
355
356
357
358
359
360 mMeasurement = (Probe) cxfExchange.get("Measure-N");
361
362 if (mMeasurement == null) {
363
364
365
366
367
368 String topicn = new String("Normalization");
369
370 String endpointName = (String) endpoint.getUniqueName();
371
372 mMeasurement = Probe.fine(getClass(), endpointName, topicn);
373
374 }
375
376
377
378
379
380 InputStream inputStream = new ByteArrayInputStream(cxfOut.toByteArray());
381
382 Source outSource = transformer.toDOMSourceFromStream(new StreamSource(inputStream));
383
384
385
386 if (LOG.isDebugEnabled()) {
387
388 String inMessage = "Out message, before wrapping: " + transformer.toString(outSource);
389
390 LOG.debug(inMessage);
391
392 }
393
394
395
396
397 MessageNormalizer messageNormalizer = new MessageNormalizer();
398 messageNormalizer.normalize(outSource, outMsg, endpoint, jbiExchange.getOperation(), inMsg.isWrapped(), true);
399
400
401
402
403
404 mMeasurement.end();
405
406
407
408 if (LOG.isDebugEnabled()) {
409
410 String inMessage = "Out message, after wrapping: " + transformer.contentToString(outMsg);
411
412 LOG.debug(inMessage);
413
414 }
415
416
417
418 jbiExchange.setMessage(outMsg, "out");
419
420 endpoint.getEndpointStatus().incrementSentReplies();
421
422 }
423
424
425
426 } else {
427
428 LOG.debug("MessageExchange - InOnly");
429
430 jbiExchange.setStatus(ExchangeStatus.DONE);
431
432 endpoint.getEndpointStatus().incrementSentDones();
433
434 }
435
436 }
437
438 LOG.debug("before - Channel.send");
439
440 RuntimeHelper.getDeliveryChannel().send(jbiExchange);
441
442 LOG.debug("after - Channel.send");
443
444
445
446 } catch (Exception ex) {
447
448 LOG.error("CRB000800_Error_in_message_exchange", new Object[]{ex.getMessage()}, ex);
449
450
451
452 jbiExchange.setError(ex);
453
454 }
455
456 }
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474 protected XMLStreamReader getXMLStreamReader(Source source) throws TransformerException, XMLStreamException {
475
476 LOG.debug("Returning stream reader");
477
478 return transformer.toXMLStreamReader(source);
479
480 }
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502 protected Document getXMLDom(Source source) throws TransformerException, ParserConfigurationException, IOException, SAXException {
503
504 LOG.debug("Returning document from source");
505
506 return transformer.toDOMDocument(source);
507
508 }
509
510
511
512
513
514
515
516
517
518
519
520
521
522 protected boolean isInAndOut(MessageExchange exchange) {
523
524 return exchange instanceof InOut || exchange instanceof InOptionalOut;
525
526 }
527
528
529
530
531
532
533
534
535
536
537
538
539
540 protected boolean isOneWay(MessageExchange exchange) {
541
542 return exchange instanceof InOnly;
543
544 }
545
546
547
548 private static InputStream convertMessageToInputStream(Source src) throws IOException,
549
550 TransformerConfigurationException, TransformerException {
551
552
553
554 final Transformer transformer = TRANSFORMER_FACTORY.newTransformer();
555
556
557
558 ByteArrayOutputStream baos = new ByteArrayOutputStream();
559
560 StreamResult result = new StreamResult(baos);
561
562 transformer.transform(src, result);
563
564
565
566 return new ByteArrayInputStream(baos.toByteArray());
567
568 }
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598 private ByteArrayOutputStream invokeCXFOperation(Exchange cxfExchange,
599
600 Service service, QName operation, Source inSource, boolean oneWay) throws IOException, TransformerConfigurationException, TransformerException {
601
602
603
604
605
606 EndpointInfo ei = CXFUtils.getEndpointInfo(service);
607 LOG.debug(">>>Interface Endpoint"+ei.getInterface().getName().getLocalPart());
608 Endpoint ep = CXFUtils.getEndpoint(service, ei);
609
610 ServiceInfo si = service.getServiceInfos().get(0);
611
612 BindingInfo bi = (BindingInfo) si.getBindings().iterator().next();
613
614 if (operation.getNamespaceURI() == null || operation.getNamespaceURI().length() == 0){
615 String namespacePortType = bi.getInterface().getName().getNamespaceURI();
616 LOG.debug(">>>>> NameSpace PortType: "+namespacePortType);
617 operation = new QName(namespacePortType, operation.getLocalPart());
618 }
619
620 LOG.debug(">>>>>> Operation "+operation.toString());
621 BindingOperationInfo bio = bi.getOperation(operation);
622
623 if (bio == null){
624 LOG.error(">>>>> BindingOperationInfo is NULL");
625 }
626
627 String parameterStyle = CXFUtils.getParameterStyle(bio);
628 String bindingStyle = CXFUtils.getBindingStyle(bio);
629
630
631
632
633 Message cxfMessage = new MessageImpl();
634
635 cxfExchange.setOneWay(oneWay);
636
637 cxfExchange.put(BindingOperationInfo.class, bio);
638
639 cxfExchange.put(Service.class, service);
640
641 cxfExchange.put(Endpoint.class, ep);
642
643 cxfMessage.setExchange(cxfExchange);
644
645 cxfExchange.setInMessage(cxfMessage);
646
647 cxfExchange.setOutMessage(new MessageImpl());
648
649 cxfExchange.setOutFaultMessage(new MessageImpl());
650
651
652
653
654
655 cxfMessage.setContent(InputStream.class, convertMessageToInputStream(inSource));
656
657
658 PhaseInterceptorChain inInterceptorChain = new PhaseInterceptorChain(CXFUtils.getBus().getExtension(PhaseManager.class).getInPhases());
659 CXFUtils.populateInInterceptorsForProvider(inInterceptorChain, parameterStyle, bindingStyle);
660 cxfMessage.setInterceptorChain(inInterceptorChain);
661 inInterceptorChain.doIntercept(cxfMessage);
662
663
664
665 org.apache.cxf.interceptor.Fault fault = (org.apache.cxf.interceptor.Fault) cxfExchange.getOutFaultMessage().getContent(Exception.class);
666
667 ByteArrayOutputStream out = new ByteArrayOutputStream();
668
669
670 if (fault != null) {
671
672
673
674 FaultMode faultMode = (FaultMode) cxfExchange.getOutMessage().get(FaultMode.class);
675
676
677
678 boolean isCheckedApplication = false;
679
680 if (faultMode.equals(FaultMode.CHECKED_APPLICATION_FAULT)) {
681
682 isCheckedApplication = true;
683
684 } else {
685
686 isCheckedApplication = false;
687
688 }
689
690
691
692 LOG.info("CRB000803_Fault_raised");
693
694
695
696
697
698
699
700 org.apache.cxf.interceptor.Fault outFault = (org.apache.cxf.interceptor.Fault) cxfMessage.getContent(Exception.class);
701
702
703
704
705
706 PhaseInterceptorChain faultInterceptorChain = new PhaseInterceptorChain(CXFUtils.getBus().getExtension(PhaseManager.class).getOutPhases());
707 CXFUtils.populateFaultInterceptors(faultInterceptorChain, parameterStyle, bindingStyle);
708
709
710
711 faultInterceptorChain.doIntercept(cxfMessage);
712
713
714
715 if (isCheckedApplication) {
716
717 Element detail = outFault.getDetail();
718 if (detail != null) {
719
720 Node exceptionDoc = detail.getFirstChild();
721
722
723 QName faultName = new QName(exceptionDoc.getNamespaceURI(), exceptionDoc.getLocalName());
724 cxfExchange.getOutFaultMessage().setContent(QName.class, faultName);
725
726
727
728 StreamResult result = new StreamResult(out);
729 transformer.toResult(new DOMSource(exceptionDoc), result);
730
731 }
732
733 if (LOG.isDebugEnabled()) {
734
735
736 LOG.debug("Fault message: " + out.toString());
737
738 }
739 } else {
740 LOG.debug("Uncheked fault, returning empty ByteArrayOutputStream");
741 }
742
743 } else {
744
745
746
747 if (oneWay) {
748 return null;
749 }
750 Message outCXFMessage = cxfExchange.getOutMessage();
751
752
753
754
755
756
757
758 PhaseInterceptorChain outInterceptorChain = new PhaseInterceptorChain(CXFUtils.getBus().getExtension(PhaseManager.class).getOutPhases());
759 CXFUtils.populateOutInterceptors(outInterceptorChain, parameterStyle, bindingStyle, true);
760
761 outCXFMessage.setInterceptorChain(outInterceptorChain);
762 outCXFMessage.setContent(OutputStream.class, out);
763
764
765 outInterceptorChain.doIntercept(outCXFMessage);
766 }
767
768 return out;
769
770 }
771
772 }
773
774