Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 

Asynchronous message processing:

  • firstly incoming asynchronous message is persisted and almost immediately is stored into queue for further processing. New inbound messages are not queried from database and therefore are not locked for the specific node of cluster. OpenHub framework pulls only messages in PARTLY_FAILED and POSTPONED state - messages which failed or were postponed.
  • messages are stored in SEDA queue. To version 0.4 included there is classic FIFO queue where messages were processed in the order in which they were inserted into the queue. Since version 0.4 this behavior is implemented by priority queue where new messages are processed earlier than postponed messages or in next attempt of processing. 
  • message is dynamically routed into start endpoint. Expected URI of this endpoint has to match with this format:
    "direct:SERVICE_OPERATION_OUTROUTE"

    where SERVICE is value from enum interface implementation ServiceEnumOPERATION is name of operation and OUTROUTE is const AbstractBasicRoute#OUT_ROUTE_SUFFIX. Service and operation names are values configured via AsynchRouteBuilder, see also AsynchConstants.SERVICE_HEADER and AsynchConstants.OPERATION_HEADER.

  • you can find main algorithm for processing of asynchronous messages in class AsynchMessageRoute

Implementation of asynchronous route 
Anchor
Implementation of asynchronous route
Implementation of asynchronous route

Implementation of route for asynchronous message is the same as synchronous message but there are some differences which must be followed:

  • header (Camel header) AsynchConstants.MSG_HEADER contains entity of Message, body contains storable (serialized, marshalled) payload of message (e.g. for communication via SOAP it will be XML of request) of original message
  • for each external system call (or one system with multiple calls) there must be check for duplicated calls. One asynchronous message (=one correlationID) can be processed repeatedly - for example: to create subscriber in MVNO solution the Billing and MNO systems have to be called. If the Billing system was called successfully but MNO failed the message is persisted with PARTLY_FAILED status and in future will be reprocessed (redelivered). When this message will be reprocessed, successfully calls have to be skipped.
    • important: asynchronous messages are processed repeatedly (number of attempts and how often can be configured)
  • if processing of message is simulated - fake processing ("blank" = unsuccessfully processing is not error > failed count must not be incremented) you have to setup properly header AsynchConstants.NO_EFFECT_PROCESS_HEADER as true value

Exceptions

  • if an external system is called unsuccessfully call to external system is unsuccessful - either some expected error is thrown (= exception declared in operation) or another exception is thrown as error during processing message. The type of error is "business" exception (e.g. value of invoice is negatively and it does not make sense or we want to create customer in external system which already exists), second type is internal technical error in external system, where it is necessary to try to call external system for some time. But if this error is expected "business" exception then process should remember these exceptions to confirm scenario it to the callee system (that invokes this process) at the end.
    • if some expected exception occurs then it has to be catched caught and stored into property with name that will end on the name with suffix AsynchConstants.BUSINESS_ERROR_PROP_SUFFIX (there may be more exceptions, and each of them will be saved as a separate value in property under name with appropriate suffix)
  • during processing of asynchronous message it is possible to call more external systemssystems during processing of one asynchronous message. The order of calling calls doesn't have to be random therefore if any calling of call to external system fails , then an error is thrown and process processing of the message is stopped. After some time (configurable) OpenHub will pick the message by MessagePollExecutor from DB and the message is processed again.
  • validation exceptions (=org.apache.camel.ValidationException) are resolved as business exceptions, so if this error occurs then processing of message ends immidiatelly with status FAILED.
  • each call which defines route is extended from AbstractBasicRoute where there is implemented basic error handling mechanism, see Error handling
  • exceptions can be caught and processed immediately in the route or can be propagated to superior route, i.e. route whence the route was called - it is solved by errorHandler(noErrorHandler()). More information you can find in Apache Camel documentation - Error handling in Camel

 

Transferring state among attempts

  • sometimes is necessary to transfer stateful information of message information among individual attempts of processing - for example information, which IDs of customer collection were have been already processed
  • for these purposes there is property AsynchConstants.CUSTOM_DATA_PROP for these purposes that can contain random string data (e.g. map of string and so on). The value of this property is at the end of message processing included in message entity: Message#getCustomData
 

Check obsolete messages in the queue

  • check for obsolete messages is solved by extcall component
  • the message that failed for the first time to process and amends changed existing data , then it is necessary to check whether the data to be processed again. You can imagine that new incoming message for same operation and same entity is received but was processed before first one.
    MSG1 setCustomer(externalCustomerId=5) OK
    MSG2 setCustomer(externalCustomerId=5) PARTLY_FAILED
    MSG3 setCustomer(externalCustomerId=5) OK

    Message MSG2 must not  doesn't have to be further processed because there exists is the message MSG3, which  which is newer (receiveTimestamp) and changes the same entity with same "object ID".

  • to make it possible to centrally check it so then each entity has a specific unique identifier which we call "object ID". This identifier is mandatory and has to be set during processing inbound message via AsynchConstants.OBJECT_ID_HEADER header.
    .setHeader(AsynchConstants.OBJECT_ID_HEADER, ns.xpath("/cus:setCustomerRequest/cus:customer/cus1:externalCustomerID", String.class))
  • for accurate entity identification OpenHub framework uses (by default) the name of operation and object ID , but by default. But sometimes it is not sufficient because for example change of customer is customer's changes are contained in several different operations (messages). To change this behaviour is necessary to then use header value AsynchConstants.ENTITY_TYPE_HEADER, which is then  to change entity type that will be used instead of the name of the operation.
    • Example: we have two operations setCustomerExt and createCustomerExtAll, which change customer. In this scenario the name of operation is not sufficient and therefore we use ENTITY_TYPE_HEADER.
  • if OpenHub evaluates that OpenHub decides the message is obsolete then will have new status message final status will be SKIPPED and hereafter already will not be processed
  • this is not necessary to check for messages where when new objects are created then checking obsolote messages is useless
 

Processing of message by splitter pattern to child (partial) messages

  • if one message is too complex to process  then then it is recommended (appropriate solution) to split into smaller child messages (partial messages). 
  • Main (parent) message will be successfully processed when all its child messages will be also successfully processed. Conversely, if any partial message will end in FAILED status then the main message will also end in FAILED status.
  • to split message into partial messages use org.openhubframework.openhub.api.asynch.msg.MessageSplitterCallback:
Code Block
/**
 * Gets child messages for next processing.
 * Order of child messages in the list determines order of first synchronous processing.
 *
 * @param parentMsg the parent message
 * @param body the exchange body
 * @return list of child messages
 */
List<ChildMessage> getChildMessages(Message parentMsg, Object body);
  • contract for splitting messages is defined by org.openhubframework.openhub.api.asynch.msg.MsgSplitter, implemented by MessageSplitterImpl
  • child messages are at first attempt processed synchronously in respectively order. When any processing of child message fails then the order during next processing is not guaranteed. For this reason it is necessary to write implementation of child messages completely independent of the processing order.

...