Asynchronous messages

Receive asynchronous message

Receive asynchronous messages and storing them in DB:

  • all essential things are solved in AsynchInMessageRoute (trace header recognition, create message, persist to DB, exception handling)
  • use AsynchRouteBuilder to implement inbound asynchronous message

 

    /**
     * Route for asynchronous <strong>asyncHello</strong> input operation.
     * <p/>
     * Prerequisite: none
     * <p/>
     * Output: {@link AsyncHelloResponse}
     */
    private void createRouteForAsyncHelloRouteIn() throws FailedToCreateRouteException {
        Namespaces ns = new Namespaces("h", SyncHelloRoute.HELLO_SERVICE_NS);
   
        // note: mandatory parameters are set already in XSD, this validation is extra
        XPathValidator validator1 = new XPathValidator("/h:asyncHelloRequest", ns, "h:name1");
 
        // note: mandatory parameters are set already in XSD, this validation is extra
        XPathValidator validator2 = new XPathValidator("/h:asyncHelloRequest", ns, "h:name2");
   
        // note: only shows using but without any influence in this case
        Expression nameExpr = xpath("/h:asyncHelloRequest/h:name").namespaces(ns).stringResult();
  
        AsynchRouteBuilder.newInstance(ServiceEnum.HELLO, OPERATION_NAME,
                getInWsUri(new QName(SyncHelloRoute.HELLO_SERVICE_NS, "asyncHelloRequest")),
                new AsynchResponseProcessor() {
                    @Override
                    protected Object setCallbackResponse(CallbackResponse callbackResponse) {
                        AsyncHelloResponse res = new AsyncHelloResponse();
                        res.setConfirmAsyncHello(callbackResponse);
                        return res;
                    }
                }, jaxb(AsyncHelloResponse.class))
                .withValidator(validator1, validator2)
                .withObjectIdExpr(nameExpr)
                .build(this);
    }
 
AsynchRouteBuilder creates uniform design of asynchronous routes (processes) with the following processing steps (low-level approach):
  • settings the name of source service and operation. This information (based name conventions) are necessary to resolving of start endpoint to process asynchronous part.
  • execution of general validation (check required values). If a validation error occurs then OpenHub framework will throw ValidationIntegrationException or org.apache.camel.ValidationException (from Apache Camel).
  • redirection to "to(AsynchConstants.URI_ASYNCH_IN_MSG)"
  • using AsynchResponseProcessor - checks if created message was successfully persisted or if some error did not occur. Then correct response will be produced.

Hereby accepting an asynchronous message ends when it is persisted in internal database for further processing and OpenHub framework sends confirmation to external system that message was adopted.

 

Asynchronous message processing:

  • firstly incoming asynchronous message is persisted and almost immediately is stored into queue for further processing. New inbound messages are not queried from database and therefore are not locked for the specific node of cluster. OpenHub framework pulls only messages in PARTLY_FAILED and POSTPONED state - messages which failed or were postponed.
  • messages are stored in SEDA queue
  • message is dynamically routed into start endpoint. Expected URI of this endpoint has to match with this format:
    "direct:SERVICE_OPERATION_OUTROUTE"

    where SERVICE is value from enum interface implementation ServiceEnumOPERATION is name of operation and OUTROUTE is const AbstractBasicRoute#OUT_ROUTE_SUFFIX. Service and operation names are values configured via AsynchRouteBuilder, see also AsynchConstants.SERVICE_HEADER and AsynchConstants.OPERATION_HEADER.

  • you can find main algorithm for processing of asynchronous messages in class AsynchMessageRoute

Implementation of asynchronous route 

Implementation of route for asynchronous message is the same as synchronous message but there are some differences which must be followed:

  • header (Camel header) AsynchConstants.MSG_HEADER contains entity of Message, body contains storable (serialized, marshalled) payload of message (e.g. for communication via SOAP it will be XML of request) of original message
  • for each external system call (or one system with multiple calls) there must be check for duplicated calls. One asynchronous message (=one correlationID) can be processed repeatedly - for example: to create subscriber in MVNO solution the Billing and MNO systems have to be called. If the Billing system was called successfully but MNO failed the message is persisted with PARTLY_FAILED status and in future will be reprocessed (redelivered). When this message will be reprocessed, successfully calls have to be skipped.
    • important: asynchronous messages are processed repeatedly (number of attempts and how often can be configured)
  • if processing of message is simulated - fake processing ("blank" = unsuccessfully processing is not error > failed count must not be incremented) you have to setup properly header AsynchConstants.NO_EFFECT_PROCESS_HEADER as true value

Exceptions

  • if call to external system is unsuccessful - either some expected error is thrown (= exception declared in operation) or another exception is thrown as error during processing message. The type of error is "business" exception (e.g. value of invoice is negatively and it does not make sense or we want to create customer in external system which already exists), second type is internal technical error in external system, where it is necessary to try to call external system for some time. But if this error is expected "business" exception then process should remember these exceptions to confirm it to the callee system (that invokes this process) at the end.
    • if some expected exception occurs then it has to be caught and stored into property with the name with suffix AsynchConstants.BUSINESS_ERROR_PROP_SUFFIX (there may be more exceptions, and each of them will be saved as a separate value in property under name with appropriate suffix)
  • it is possible to call more external systems during processing of one asynchronous message. The order of calls doesn't have to be random therefore if any call to external system fails then an error is thrown and processing of the message is stopped. After some time (configurable) OpenHub will pick the message by MessagePollExecutor from DB and the message is processed again.
  • validation exceptions (=org.apache.camel.ValidationException) are resolved as business exceptions, so if this error occurs then processing of message ends immidiatelly with status FAILED.
  • each call which defines route is extended from AbstractBasicRoute where there is implemented basic error handling mechanism, see Error handling
  • exceptions can be caught and processed immediately in the route or can be propagated to superior route, i.e. route whence the route was called - it is solved by errorHandler(noErrorHandler()). More information you can find in Apache Camel documentation - Error handling in Camel

 

Transferring state among attempts

  • sometimes is necessary to transfer stateful information among individual attempts of processing - for example information, which IDs of customer have been already processed
  • there is property AsynchConstants.CUSTOM_DATA_PROP for these purposes that can contain random string data (e.g. map of string and so on). The value of this property is at the end of message processing included in message entity: Message#getCustomData
 

Check obsolete messages in the queue

  • check for obsolete messages is solved by extcall component (only if keyType has value entity or custom)
  • the message that failed for the first time to process and changed existing data then it is necessary to check whether the data to be processed again. You can imagine that new incoming message for same operation and same entity is received but was processed before first one.
    MSG1 setCustomer(externalCustomerId=5) OK
    MSG2 setCustomer(externalCustomerId=5) PARTLY_FAILED
    MSG3 setCustomer(externalCustomerId=5) OK

    Message MSG2 doesn't have to be further processed because there is the message MSG3 which is newer (receiveTimestamp) and changes the same entity with same "object ID".

  • to make it possible to centrally check it then each entity has a specific unique identifier which we call "object ID". This identifier is mandatory and has to be set during processing inbound message via AsynchConstants.OBJECT_ID_HEADER header.
    .setHeader(AsynchConstants.OBJECT_ID_HEADER, ns.xpath("/cus:setCustomerRequest/cus:customer/cus1:externalCustomerID", String.class))
  • for accurate entity identification OpenHub framework uses the name of operation and object ID by default. But sometimes it is not sufficient because for example customer's changes are contained in several different operations (messages). To change this behaviour then use header value AsynchConstants.ENTITY_TYPE_HEADER to change entity type that will be used instead of the name of the operation.
    • Example: we have two operations setCustomerExt and createCustomerExtAll, which change customer. In this scenario the name of operation is not sufficient and therefore we use ENTITY_TYPE_HEADER.
  • if OpenHub decides the message is obsolete then message final status will be SKIPPED and hereafter already will not be processed
  • when new objects are created then checking obsolote messages is useless
 

Processing of message by splitter pattern to child (partial) messages

  • if one message is too complex to process then it is recommended (appropriate solution) to split into smaller child messages (partial messages). 
  • Main (parent) message will be successfully processed when all its child messages will be also successfully processed. Conversely, if any partial message will end in FAILED status then the main message will also end in FAILED status.
  • to split message into partial messages use org.openhubframework.openhub.api.asynch.msg.MessageSplitterCallback:
/**
 * Gets child messages for next processing.
 * Order of child messages in the list determines order of first synchronous processing.
 *
 * @param parentMsg the parent message
 * @param body the exchange body
 * @return list of child messages
 */
List<ChildMessage> getChildMessages(Message parentMsg, Object body);
  • contract for splitting messages is defined by org.openhubframework.openhub.api.asynch.msg.MsgSplitter, implemented by MessageSplitterImpl
  • child messages are at first attempt processed synchronously in respectively order. When any processing of child message fails then the order during next processing is not guaranteed. For this reason it is necessary to write implementation of child messages completely independent of the processing order.
 

Confirmation the result of processing asynchronous messages

  • when asynchronous message is processed (is in the final status) then OpenHub can transmit information about result of processing to callee system - OK, FAILED or CANCEL final status.
  • main interface is ConfirmationCallback which has two default implementations:
    • DefaultConfirmationCallback - default behaviour (rather suitable for tests), which only logs information about result
    • DelegateConfirmationCallback - based upon source system it selects properly implementation of ExternalSystemConfirmation interface that calls external system to confirm result.

Design of this functionality is flexible because:

  • not every system wants be informed about result of message processing
  • every system can have specific requirements about confirmation (confirmation via web service, database call and so on)
  • OpenHub provides own defined WSDL asynchConfirmation-v1.0.wsdl with XSD asynchConfirmationOperations-v1.0.xsd to auto-confirmation solution