View Javadoc

1    /****************************************************************************
2    * Copyright (c) 2005, 2006, 2007, 2008, 2009 Imola Informatica.
3    * All rights reserved. This program and the accompanying materials
4    * are made available under the terms of the LGPL License v2.1
5    * which accompanies this distribution, and is available at
6    * http://www.gnu.org/licenses/lgpl.html
7    ****************************************************************************/
8   package it.imolinfo.jbi4corba.jbi.processor;
9   
10  
11  
12  import it.imolinfo.jbi4corba.Logger;
13  import it.imolinfo.jbi4corba.LoggerFactory;
14  import it.imolinfo.jbi4corba.exception.Jbi4CorbaException;
15  import it.imolinfo.jbi4corba.jbi.component.runtime.RuntimeHelper;
16  import it.imolinfo.jbi4corba.jbi.cxf.CXFUtils;
17  import it.imolinfo.jbi4corba.jbi.endpoint.ProviderEndpoint;
18  import it.imolinfo.jbi4corba.jbi.processor.transform.SourceTransformer;
19  
20  import java.io.ByteArrayInputStream;
21  import java.io.ByteArrayOutputStream;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.io.PrintWriter;
26  
27  import javax.jbi.messaging.ExchangeStatus;
28  import javax.jbi.messaging.Fault;
29  import javax.jbi.messaging.InOnly;
30  import javax.jbi.messaging.InOptionalOut;
31  import javax.jbi.messaging.InOut;
32  import javax.jbi.messaging.MessageExchange;
33  import javax.jbi.messaging.NormalizedMessage;
34  import javax.xml.namespace.QName;
35  import javax.xml.parsers.ParserConfigurationException;
36  import javax.xml.stream.XMLStreamException;
37  import javax.xml.stream.XMLStreamReader;
38  import javax.xml.transform.Source;
39  import javax.xml.transform.Transformer;
40  import javax.xml.transform.TransformerConfigurationException;
41  import javax.xml.transform.TransformerException;
42  import javax.xml.transform.TransformerFactory;
43  import javax.xml.transform.dom.DOMSource;
44  import javax.xml.transform.stream.StreamResult;
45  import javax.xml.transform.stream.StreamSource;
46  
47  import net.java.hulp.measure.Probe;
48  
49  import org.apache.cxf.endpoint.Endpoint;
50  import org.apache.cxf.message.Exchange;
51  import org.apache.cxf.message.ExchangeImpl;
52  import org.apache.cxf.message.FaultMode;
53  import org.apache.cxf.message.Message;
54  import org.apache.cxf.message.MessageImpl;
55  import org.apache.cxf.phase.PhaseInterceptorChain;
56  import org.apache.cxf.phase.PhaseManager;
57  import org.apache.cxf.service.Service;
58  import org.apache.cxf.service.model.BindingInfo;
59  import org.apache.cxf.service.model.BindingOperationInfo;
60  import org.apache.cxf.service.model.EndpointInfo;
61  import org.apache.cxf.service.model.ServiceInfo;
62  import org.w3c.dom.Document;
63  import org.w3c.dom.Element;
64  import org.w3c.dom.Node;
65  import org.xml.sax.SAXException;
66  
67  
68  /**
69   * Provider Exchange processor
70   */
71  public class ProviderExchangeProcessor implements ExchangeProcessor {
72  
73  
74  
75      private static final TransformerFactory TRANSFORMER_FACTORY = TransformerFactory.newInstance();
76  
77      private static final Logger LOG = LoggerFactory.getLogger(ProviderExchangeProcessor.class);
78  
79      /** The endpoint */
80  
81      protected ProviderEndpoint endpoint;
82  
83      /** The SourceTransformer. */
84  
85      private SourceTransformer transformer;
86  
87      /** message denormalizer. */
88  
89      //private MessageDenormalizer messageDenormalizer;
90  
91      /** message normalizer. */
92  
93      //private MessageNormalizer messageNormalizer;
94  
95      /** Performance Measurement Probe Object **/
96  
97      private Probe mMeasurement = null;
98  
99  
100 
101     /**
102 
103      * The Exchange processor
104 
105      * @param endpoint  The endpoint
106 
107      */
108 
109     public ProviderExchangeProcessor(ProviderEndpoint endpoint) throws Jbi4CorbaException {
110 
111         this.endpoint = endpoint;
112 
113         this.transformer = new SourceTransformer();
114 
115         //messageDenormalizer = new MessageDenormalizer();
116 
117         //messageNormalizer = new MessageNormalizer();
118 
119     }
120 
121 
122 
123     /**
124 
125      * @param  exchange   The message exchange
126 
127      * @throws Exception  The exception
128 
129      */
130 
131     public void process(MessageExchange jbiExchange) {
132 
133         // IOR for dynamic exchanges
134         Object IOR = jbiExchange.getProperty("EPR_IOR");
135 
136         endpoint.getEndpointStatus().incrementReceivedRequests();
137 
138 
139 
140         LOG.debug(">>>>> process - begin. MessageExchange=" + jbiExchange);
141 
142 
143 
144         try {
145 
146             // 
147 
148             Service cxfService = endpoint.getCXFService();
149 
150             Exchange cxfExchange = new ExchangeImpl();
151 
152 
153 
154             NormalizedMessage in = jbiExchange.getMessage("in");
155 
156 
157 
158             // Gets the JBI message                
159 
160             if (LOG.isDebugEnabled()) {
161 
162                 String inMessage = "In message, before unwrapping: " + transformer.contentToString(in);
163 
164                 LOG.debug(inMessage);
165 
166             }
167 
168 
169 
170             // Performance Measurement Chrono begins
171 
172             String topic = new String("Denormalization");
173 
174             mMeasurement = Probe.fine(getClass(), endpoint.getUniqueName(), topic);
175 
176 
177             MessageDenormalizer messageDenormalizer = new MessageDenormalizer();
178             JbiMessage inMsg = messageDenormalizer.denormalize(in, endpoint, jbiExchange.getOperation(), true, false);
179 
180             if (LOG.isDebugEnabled()) {
181 
182                 String inMessage = "In message, after unwrapping: " + transformer.toString(inMsg.getMessageSource());
183 
184                 LOG.debug(inMessage);
185 
186             }
187 
188 
189 
190 
191 
192             if (jbiExchange.getOperation() != null) {
193 
194 
195 
196                 // Gets the operation exchange
197 
198                 QName operation = jbiExchange.getOperation();
199                 
200                 
201                 //if the exchange is a Dynamic excchange
202                 if (IOR != null) {
203                     cxfExchange.put("EPR_IOR", IOR);
204                 }
205 
206                 //Add Measurement object to Context, as it will need to be stopped in a different class                
207 
208                 cxfExchange.put("Measure-deN", mMeasurement);
209 
210 
211 
212                 //Add endpoint name also.
213 
214                 cxfExchange.put("EndpointName", endpoint.getUniqueName());
215 
216 
217 
218                 // Invokes the CXF service        
219 
220                 ByteArrayOutputStream cxfOut = invokeCXFOperation(cxfExchange, cxfService, operation, inMsg.getMessageSource(), isOneWay(jbiExchange));
221 
222 
223 
224                 // Set response or DONE status
225 
226                 if (isInAndOut(jbiExchange)) {
227 
228 
229 
230                     // If the fault mode is not null in the out message, a fault has been raised.
231 
232                     if (cxfExchange.getOutMessage().get(FaultMode.class) != null) {
233 
234 
235 
236                         // The Fault mode can be CHECKED_APPLICATION_FAULT/UNCHECKED_APPLICATION_FAULT
237 
238                         FaultMode faultMode = (FaultMode) cxfExchange.getOutMessage().get(FaultMode.class);
239 
240                         org.apache.cxf.interceptor.Fault cxfFault = (org.apache.cxf.interceptor.Fault) cxfExchange.getOutFaultMessage().getContent(Exception.class);
241 
242 
243 
244                         if (faultMode.equals(FaultMode.CHECKED_APPLICATION_FAULT)) {
245 
246 
247 
248                             // Gets the fault sorce                    
249 
250                             InputStream inputStream = new ByteArrayInputStream(cxfOut.toByteArray());
251 
252                             Source faultSource = transformer.toDOMSourceFromStream(new StreamSource(inputStream));
253 
254 
255 
256                             QName faultName = (QName) cxfExchange.getOutFaultMessage().getContent(QName.class);
257 
258 
259                             LOG.info("CRB000801_Raised_checked_application_fault_with_code",
260                                       new Object[]{faultName});
261 
262 
263 
264                             // Creates a JBI fault
265 
266                             Fault jbiFault = jbiExchange.createFault();
267 
268                             jbiFault.setContent(faultSource);
269 
270 
271 
272                             // Normalizes the message
273                             MessageNormalizer messageNormalizer = new MessageNormalizer();
274                             messageNormalizer.normalizeFault(faultSource, jbiFault, endpoint, jbiExchange.getOperation(), faultName.getLocalPart(), inMsg.isWrapped());
275 
276 
277 
278                             if (LOG.isDebugEnabled()) {
279 
280                                 String inMessage = "Fault message, after wrapping: " + transformer.contentToString(jbiFault);
281 
282                                 LOG.debug(inMessage);
283 
284                             }
285 
286 
287 
288                             // Sets the jbi fault
289                             
290                             jbiExchange.setFault(jbiFault);
291                             
292                             endpoint.getEndpointStatus().incrementSentReplies();
293 
294                         } else {
295 
296                             // Unchecked fault
297 
298 
299 
300                             
301                             LOG.info("CRB000802_Raised_unchecked_application_fault_with_exception",
302                                             new Object[]{cxfFault.getCause()});
303                             
304 
305 
306 
307                             // Sets the cause. Notice that (from the specs):
308 
309                             // Used to specify the source of a failure status.
310 
311                             // Invoking this method automatically adjusts the status of the
312 
313                             // ME to ExchangeStatus.ERROR.
314 
315                             // So, the cause can be setted only if no Fault is setted. 
316 
317                             endpoint.getEndpointStatus().incrementSentErrors();
318 
319                             if (cxfFault.getCause() instanceof Exception) {
320 
321                                 jbiExchange.setError((Exception) cxfFault.getCause());
322                                 
323                             } else {
324 
325                                 jbiExchange.setError(new Exception(cxfFault.getCause()));
326                                 
327                             }
328                             
329                             // get value of faultdetail property
330                             
331                             OutputStream detail = new ByteArrayOutputStream();
332                             PrintWriter writer = new PrintWriter(detail,true);
333                             cxfFault.getCause().printStackTrace(writer);
334 
335                             // Set Message Exchange Properties
336 
337                             jbiExchange.setProperty("com.sun.jbi.crl.faultcode", 
338                                                     "Server");
339                             jbiExchange.setProperty("com.sun.jbi.crl.faultstring", 
340                                                 cxfFault.getCause().toString());                              
341                             jbiExchange.setProperty("com.sun.jbi.crl.faultdetail", 
342                                                      detail.toString());
343                             jbiExchange.setProperty("com.sun.jbi.crl.faultactor", 
344                                                     "jbi4corba");
345                             LOG.debug(">>>>> Set Message Exchange Properties");
346                         }
347 
348 
349 
350                     } else {
351 
352 
353 
354                         NormalizedMessage outMsg = jbiExchange.createMessage();
355 
356 
357 
358                         // Retrieve the Performance Measurement object from context
359 
360                         mMeasurement = (Probe) cxfExchange.get("Measure-N");
361 
362                         if (mMeasurement == null) {
363 
364                             // this prevents error when an exception is raised in a CXF interceptor
365 
366                             //Normalization timer starts
367 
368                             String topicn = new String("Normalization");
369 
370                             String endpointName = (String) endpoint.getUniqueName();
371 
372                             mMeasurement = Probe.fine(getClass(), endpointName, topicn);
373 
374                         }
375 
376 
377 
378                         // Gets the output sorce                    
379 
380                         InputStream inputStream = new ByteArrayInputStream(cxfOut.toByteArray());
381 
382                         Source outSource = transformer.toDOMSourceFromStream(new StreamSource(inputStream));
383 
384 
385 
386                         if (LOG.isDebugEnabled()) {
387 
388                             String inMessage = "Out message, before wrapping: " + transformer.toString(outSource);
389 
390                             LOG.debug(inMessage);
391 
392                         }
393 
394 
395 
396                         // Normalize the source and sets to the output message.
397                         MessageNormalizer messageNormalizer = new MessageNormalizer();
398                         messageNormalizer.normalize(outSource, outMsg, endpoint, jbiExchange.getOperation(), inMsg.isWrapped(), true);
399 
400 
401 
402                         //Normalization process complete, timer to be stopped now.
403 
404                         mMeasurement.end();
405 
406 
407 
408                         if (LOG.isDebugEnabled()) {
409 
410                             String inMessage = "Out message, after wrapping: " + transformer.contentToString(outMsg);
411 
412                             LOG.debug(inMessage);
413 
414                         }
415 
416 
417 
418                         jbiExchange.setMessage(outMsg, "out");
419 
420                         endpoint.getEndpointStatus().incrementSentReplies();
421 
422                     }
423 
424 
425 
426                 } else {
427 
428                     LOG.debug("MessageExchange - InOnly");
429 
430                     jbiExchange.setStatus(ExchangeStatus.DONE);
431 
432                     endpoint.getEndpointStatus().incrementSentDones();
433 
434                 }
435 
436             }
437 
438             LOG.debug("before - Channel.send");
439 
440             RuntimeHelper.getDeliveryChannel().send(jbiExchange);
441             
442             LOG.debug("after - Channel.send");
443 
444             //LOG.debug(">>>>> process - end. MessageExchange=" + jbiExchange);
445 
446         } catch (Exception ex) {
447 
448             LOG.error("CRB000800_Error_in_message_exchange", new Object[]{ex.getMessage()}, ex);
449 
450             // No exception is thrown...
451 
452             jbiExchange.setError(ex);
453 
454         }
455 
456     }
457 
458 
459 
460     /**
461 
462      * 
463 
464      * @param  source                The source
465 
466      * @return                       The return
467 
468      * @throws TransformerException  The transformer exception
469 
470      * @throws XMLStreamException    The XML stream exception
471 
472      */
473 
474     protected XMLStreamReader getXMLStreamReader(Source source) throws TransformerException, XMLStreamException {
475 
476         LOG.debug("Returning stream reader");
477 
478         return transformer.toXMLStreamReader(source);
479 
480     }
481 
482 
483 
484     /**
485 
486      * 
487 
488      * @param  source                        The source
489 
490      * @return                               The return
491 
492      * @throws TransformerException          The transformer exception
493 
494      * @throws ParserConfigurationException  The parser configuration exception
495 
496      * @throws IOException                   The IO exception
497 
498      * @throws SAXException                  The SAX exception
499 
500      */
501 
502     protected Document getXMLDom(Source source) throws TransformerException, ParserConfigurationException, IOException, SAXException {
503 
504         LOG.debug("Returning document from source");
505 
506         return transformer.toDOMDocument(source);
507 
508     }
509 
510 
511 
512     /**
513 
514      * 
515 
516      * @param exchange  The exchange
517 
518      * @return          The return
519 
520      */
521 
522     protected boolean isInAndOut(MessageExchange exchange) {
523 
524         return exchange instanceof InOut || exchange instanceof InOptionalOut;
525 
526     }
527 
528 
529 
530     /**
531 
532      * True if the exchange is InOnly.
533 
534      * @param exchange  The exchange
535 
536      * @return          True if the exchange is InOnly
537 
538      */
539 
540     protected boolean isOneWay(MessageExchange exchange) {
541 
542         return exchange instanceof InOnly;
543 
544     }
545 
546 
547 
548     private static InputStream convertMessageToInputStream(Source src) throws IOException,
549 
550             TransformerConfigurationException, TransformerException {
551 
552 
553 
554         final Transformer transformer = TRANSFORMER_FACTORY.newTransformer();
555 
556 
557 
558         ByteArrayOutputStream baos = new ByteArrayOutputStream();
559 
560         StreamResult result = new StreamResult(baos);
561 
562         transformer.transform(src, result);
563 
564 
565 
566         return new ByteArrayInputStream(baos.toByteArray());
567 
568     }
569 
570 
571 
572     /**
573 
574      * Inits the CXF exchange ane invokes the operation on the CXF service. 
575 
576      * The service must have registered a invoker.
577 
578      * Returns null for the one-way operations
579 
580      * 
581 
582      * @param service
583 
584      * @param operation
585 
586      * @param inSource
587 
588      * @return the message OutputStream (in both cases, correct out and Fault)
589 
590      * @throws IOException
591 
592      * @throws TransformerConfigurationException
593 
594      * @throws TransformerException
595 
596      */
597 
598     private ByteArrayOutputStream invokeCXFOperation(Exchange cxfExchange, 
599 
600                                                      Service service, QName operation, Source inSource, boolean oneWay) throws IOException, TransformerConfigurationException, TransformerException {
601 
602         
603 
604         // Gets the CXF Endpoint
605 
606         EndpointInfo ei = CXFUtils.getEndpointInfo(service);
607         LOG.debug(">>>Interface Endpoint"+ei.getInterface().getName().getLocalPart());
608         Endpoint ep = CXFUtils.getEndpoint(service, ei);
609                    
610         ServiceInfo si = service.getServiceInfos().get(0);
611         
612         BindingInfo bi = (BindingInfo) si.getBindings().iterator().next();
613        
614         if (operation.getNamespaceURI() == null || operation.getNamespaceURI().length() == 0){
615           String namespacePortType = bi.getInterface().getName().getNamespaceURI();
616           LOG.debug(">>>>> NameSpace PortType: "+namespacePortType);        
617           operation = new QName(namespacePortType, operation.getLocalPart());
618         }
619 
620         LOG.debug(">>>>>> Operation "+operation.toString());
621         BindingOperationInfo bio = bi.getOperation(operation);    
622 
623         if (bio == null){
624           LOG.error(">>>>> BindingOperationInfo is  NULL");
625         }
626 
627         String parameterStyle = CXFUtils.getParameterStyle(bio);
628         String bindingStyle = CXFUtils.getBindingStyle(bio);
629 
630 
631         // Inits the CXF Exchange
632 
633         Message cxfMessage = new MessageImpl();
634 
635         cxfExchange.setOneWay(oneWay);
636 
637         cxfExchange.put(BindingOperationInfo.class, bio);
638 
639         cxfExchange.put(Service.class, service);
640 
641         cxfExchange.put(Endpoint.class, ep);
642 
643         cxfMessage.setExchange(cxfExchange);
644 
645         cxfExchange.setInMessage(cxfMessage);
646 
647         cxfExchange.setOutMessage(new MessageImpl());
648 
649         cxfExchange.setOutFaultMessage(new MessageImpl());        
650 
651 
652 
653         // Sets the Source as In message
654 
655         cxfMessage.setContent(InputStream.class, convertMessageToInputStream(inSource));
656         
657         // Sets the interceptor chain on the message
658         PhaseInterceptorChain inInterceptorChain = new PhaseInterceptorChain(CXFUtils.getBus().getExtension(PhaseManager.class).getInPhases());                
659         CXFUtils.populateInInterceptorsForProvider(inInterceptorChain, parameterStyle, bindingStyle);       
660         cxfMessage.setInterceptorChain(inInterceptorChain);
661         inInterceptorChain.doIntercept(cxfMessage);
662 
663         // Tests if Fault are present
664 
665         org.apache.cxf.interceptor.Fault fault = (org.apache.cxf.interceptor.Fault) cxfExchange.getOutFaultMessage().getContent(Exception.class);
666 
667         ByteArrayOutputStream out = new ByteArrayOutputStream();
668 
669 
670         if (fault != null) {
671 
672 
673 
674             FaultMode faultMode = (FaultMode) cxfExchange.getOutMessage().get(FaultMode.class);
675 
676 
677 
678             boolean isCheckedApplication = false;
679 
680             if (faultMode.equals(FaultMode.CHECKED_APPLICATION_FAULT)) {
681 
682                 isCheckedApplication = true;
683 
684             } else {
685 
686                 isCheckedApplication = false;
687 
688             }
689 
690 
691 
692             LOG.info("CRB000803_Fault_raised");
693 
694 
695 
696             // Fault
697 
698             // Gets the generated fault from the message
699 
700             org.apache.cxf.interceptor.Fault outFault = (org.apache.cxf.interceptor.Fault) cxfMessage.getContent(Exception.class);
701 
702 
703 
704             // The FaultOutInterceptor creates adnd fills the XML details from the cause Exception 
705 
706             PhaseInterceptorChain faultInterceptorChain = new PhaseInterceptorChain(CXFUtils.getBus().getExtension(PhaseManager.class).getOutPhases());
707             CXFUtils.populateFaultInterceptors(faultInterceptorChain, parameterStyle, bindingStyle);
708 
709             // Call the interceptor chain                
710 
711             faultInterceptorChain.doIntercept(cxfMessage);
712 
713             // Gets the details element as Element...it's not very optimized...
714 
715             if (isCheckedApplication) {
716 
717                 Element detail = outFault.getDetail();
718                 if (detail != null) {
719 
720                     Node exceptionDoc = detail.getFirstChild();
721 
722                     // Gets the fault QName and sets it on the OutFaultMessage.
723                     QName faultName = new QName(exceptionDoc.getNamespaceURI(), exceptionDoc.getLocalName());
724                     cxfExchange.getOutFaultMessage().setContent(QName.class, faultName);
725 
726                     // Converts the Element to a StreamSource
727 
728                     StreamResult result = new StreamResult(out);
729                     transformer.toResult(new DOMSource(exceptionDoc), result);
730 
731                 }
732 
733                 if (LOG.isDebugEnabled()) {
734 
735                     //  Converts to String to assert                
736                     LOG.debug("Fault message: " + out.toString());
737 
738                 }
739             } else {
740                 LOG.debug("Uncheked fault, returning empty ByteArrayOutputStream");
741             }
742 
743         } else {
744 
745             // If one way, return null
746 
747             if (oneWay) {
748                 return null;
749             }
750             Message outCXFMessage = cxfExchange.getOutMessage();
751 
752             /** 
753 
754              *Sets the interceptor chain
755 
756              */
757 
758             PhaseInterceptorChain outInterceptorChain = new PhaseInterceptorChain(CXFUtils.getBus().getExtension(PhaseManager.class).getOutPhases());
759             CXFUtils.populateOutInterceptors(outInterceptorChain, parameterStyle, bindingStyle, true);
760 
761             outCXFMessage.setInterceptorChain(outInterceptorChain);
762             outCXFMessage.setContent(OutputStream.class, out);
763 
764             // Call the interceptor chain
765             outInterceptorChain.doIntercept(outCXFMessage);
766         }
767 
768         return out;
769 
770     }
771 
772 }
773 
774