Within the Interlok framework, an AdaptrisMessageConsumer is responsible for receiving messages from the target system. You need to decide on how the consumer will be triggered. If the consumer should be timer based (e.g. poll a directory every x seconds), then you can extend AdaptrisPollingConsumer which allows for a pluggable timer mechanism. Other types of consumer should directly extend AdaptrisMessageConsumerImp.

  • You can get access to the configured AdaptrisConnection instance by using the retrieveConnection method.
  • Call this.decode(byte[]) to decode the message with any configured AdaptrisMessageEncoder implementation (optional)

Example

Our previous example of a Connection defined our connection; now we need to work with the ClientConnection interface:

public interface ClientConnection {
  String getMessage(String repository, long timeout) throws IOException, TimeoutException;
}

Our AdaptrisMessageConsumer implementation can use ClientConnection to receive messages; but because of the timeout parameter/exception, we can assume that the consumer should be timer based.

@XStreamAlias("my-client-consumer")
public class MyClientConsumer extends AdaptrisPollingConsumer {

  private static final TimeInterval DEF_INTERVAL = new TimeInterval(2L, TimeUnit.SECONDS.name());

  private TimeInterval receiveTimeout;

  public MyClientConsumer() {
  }

  @Override
  protected int processMessages() {
    String repository = getDestination().getDestination();
    int msgCount = 0;
    AdaptrisMessageFactory factory = AdaptrisMessageFactory.defaultIfNull(getMessageFactory());
    try {
      ClientConnection conn = retrieveConnection(MyClientConnection.class).createConnection();
      do {
        String data = conn.getMessage(repository, receiveTimeoutMs());
        msgCount ++;
        AdaptrisMessage msg = factory.newMessage(data);
        retrieveAdaptrisMessageListener().onAdaptrisMessage(msg);
        if (!continueProcessingMessages()) {
          break;
        }
      }
      while (true);
    }
    catch (TimeoutException e) {
      log.trace("No More Messages");
    }
    catch (IOException e) {
      log.error(e.getMessage(), e);
    }
    return msgCount;
  }

  long receiveTimeoutMs() {
    return getReceiveTimeout() != null ? getReceiveTimeout().toMilliseconds() : DEF_INTERVAL.toMilliseconds();
  }

  public TimeInterval getReceiveTimeout() {
    return receiveTimeout;
  }

  public void setReceiveTimeout(TimeInterval receiveTimeout) {
    this.receiveTimeout = receiveTimeout;
  }

  @Override
  protected void prepareConsumer() throws CoreException {}
}

So, the summary of what we did is as follows :

  • We can figure out where we are receiving messages from via the ConsumeDestination implementation.
  • We call retrieveConnection to find the configured connection object.
  • We don’t throw an exception; but we log it.
  • We return the number of messages processed (which is logged).
  • We provide a sensible default for the timeout if it is not configured.
  • We use continueProcessingMessages() to check if we should process the next message or not
  • We use AdaptrisMessageFactory directly as we do not support AdaptrisMessageEncoder.
  • We trigger the workflow via retrieveAdaptrisMessageListener().onAdaptrisMessage()
  • An @XStreamAlias is added so that we have an alias that we can configure; so now, configuration is <consumer class="my-client-consumer"/> rather than the fully qualified classname.
Tags: developer