ONJava.com -- The Independent Source for Enterprise Java
oreilly.comSafari Books Online.Conferences.


AddThis Social Bookmark Button

Esper: Event Stream Processing and Correlation
Pages: 1, 2, 3

Events as JavaBeans

Esper is able to handle events as JavaBeans, arbitrary java classes, java.util.Map, or XML documents. In this case study we assume we decided to use the JavaBeans representation for simplicity. Each self-service terminal publishes any of the six events kind below.

Checking A customer started a check-in dialog
Cancelled A customer cancelled a check-in dialog
Completed A customer completed a check-in dialog
OutOfOrder A terminal detected a hardware problem (issued once until fixed by maintenance staff)
LowPaper A terminal is low on paper (issued once until fixed by maintenance staff)
Status Terminal status, published every 1 minute regardless of activity (terminal heartbeat)

All events provide information about the terminal that published the event. Since all events carry similar information, we model each event as a subtype to a base class BaseTerminalEvent, which will provide the terminal information that all events issued by a terminal share. A real-world model would of course be more complex--possibly using XML instead.

public abstract class BaseTerminalEvent {
    private final Terminal terminal;

    public BaseTerminalEvent(Terminal terminal) {
        this.terminal = terminal;

    public String getType() {
        return this.getClass().getSimpleName();

    public Terminal getTerminal() {
        return terminal;

For the terminal information we use a simple class to hold the terminal ID:

public class Terminal {
    private String id;

    public Terminal(String id) {
        this.id = id;

    public String getId() {
        return id;

The Status event class is thus a subclass of BaseTerminalEvent:

public class Status extends BaseTerminalEvent {
    public Status(Terminal terminal) {

Introduction to EQL and Patterns

Esper EQL is an object-oriented event stream query language very similar to SQL in its syntax but that significantly differs to be able to deal with sliding window of streams of data. Esper also includes a pattern language that provides for stateful (state-machine) event pattern matching. EQL and patterns can be used alone or can also be combined to express complex temporal logic.

Let's assume we want to dispatch staff to restock paper supply when a terminal publishes a LowPaper event. The simple EQL statement below detects such events:

select * from LowPaper

Besides looking for LowPaper events, we would also like to be notified when OutOfOrder events arrive. We could thus use two separate statements that each filter for only one type of event:

select * from LowPaper
select * from OutOfOrder

Another solution would be to use a single statement and include an event pattern combined with an or condition:

select a,b from pattern [ every a=LowPaper or every b=OutOfOrder]

We could also implement this with the help of event polymorphism and the BaseTerminalEvent. As all events are subclasses of BaseTerminalEvent, we can use a where clause to filter out the events we are interested in and use the type property (that is the actual class name without package information--see BaseTerminalEvent):

select * from BaseTerminalEvent
where type = 'LowPaper' or type = 'OutOfOrder'

Deciding on a statement merely depends on design choices in this simple case.

Registering Statements and Listeners

Esper can be configured using either straightforward API, or an XML descriptor. We will use the API here as a Java-centric approach. We then first configure and get an engine instance, register statement(s), and then attach one or more listeners to the created statement(s). The engine allows for nickname to avoid having to specify fully qualified class names in EQL when using JavaBeans event representation:

// Configure engine: give nickames to event classes
Configuration config = new Configuration();
config.addEventTypeAlias("Checkin", Checkin.class);
config.addEventTypeAlias("Cancelled", Cancelled.class);
config.addEventTypeAlias("Completed", Completed.class);
config.addEventTypeAlias("Status", Status.class);
config.addEventTypeAlias("LowPaper", LowPaper.class);
config.addEventTypeAlias("OutOfOrder", OutOfOrder.class);
config.addEventTypeAlias("BaseTerminalEvent", BaseTerminalEvent.class);
// Get engine instance
EPServiceProvider epService =

// Register statement
String statement = "select * from LowPaper";
EPStatement statement =
// Attach a listener
statement.addListener(new SampleListener());

The engine calls all listener classes attached to a statement as soon as new results for a statement are available. Events are encapsulated by an EventBean instance which allows querying the event properties and underlying event class. The engine indicates when events enter a data window via newEvents and leave a data window via oldEvents.

Let's look at a sample listener implementation:

public class SampleListener implements UpdateListener {
    public void update(EventBean[] newEvents, EventBean[] oldEvents) {
        LowPaper lowPaper = (LowPaper) newEvents[0].getUnderlying();
        String terminal = (String) newEvents[0].get("terminal");
        String text = (String) newEvents[0].get("text");

An experienced reader might realize that this example could be done with JMS, perhaps with some queue and naming conventions, a few instanceof tests, and a MessageSelector. while that might be the case for this example, we will see in the next sections, as our requirements for analyzing streams get more complex, we will outgrow what JMS is capable of easily. We will see examples of causality relations, joins and aggregate computations that would likely end up in the development team spending most of their time on an in-house project-specific implementation.

Pages: 1, 2, 3

Next Pagearrow