ONJava.com    
 Published on ONJava.com (http://www.onjava.com/)
 See this if you're having trouble printing code examples


Esper: Event Stream Processing and Correlation

by Thomas Bernhardt and Alexandre Vasseur
03/08/2007

Esper is an Event Stream Processing (ESP) and event correlation engine (CEP, Complex Event Processing). Targeted to real-time Event Driven Architectures (EDA), Esper is capable of triggering custom actions written as Plain Old Java Objects (POJO) when event conditions occur among event streams. It is designed for high-volume event correlation where millions of events coming in would make it impossible to store them all to later query them using classical database architecture. A tailored Event Query Language (EQL) allows expressing rich event conditions, correlation, possibly spanning time windows, thus minimizing the development effort required to set up a system that can react to complex situations.

Esper is a lightweight kernel written in Java which is fully embeddable into any Java process, JEE application server or Java-based Enterprise Service Bus. It enables rapid development of applications that process large volumes of incoming messages or events.

Esper is the leading open source Event Stream processing solution, currently available under a GPL license. This article introduces you to the main concepts of event stream processing and correlation and walks you through a sample application (source code and Ant script are available for download).

Introduction to event streams and complex events

Information is critical to make wise decisions. This is true in real life but also in computing, and especially critical in several areas, such as finance, fraud detection, business intelligence or battlefield operation. Information flows in from different sources in the form of messages or events, giving a hint on the state at a given time such as stock price. That said, looking at those discrete events is most of the time meaningless. A trader needs to look at the stock trend over a period, possibly combined with other information to make the best deal at the right time.

While discrete events when looked one by one might be meaningless, event streams--that is an infinite set of events--considered over a sliding window and further correlated, are highly meaningful, and reacting to them with the minimal latency is critical for effective action and competitive advantage.

Introduction to Esper

Relational databases or message-based systems such as JMS make it really hard to deal with temporal data and real-time queries. Indeed, databases require explicit querying to return meaningful data and are not suited to push data as it changes. JMS systems are stateless and require the developer to implement the temporal and aggregation logic himself. By contrast, the Esper engine provides a higher abstraction and intelligence and can be thought of as a database turned upside-down: instead of storing the data and running queries against stored data, Esper allows applications to store queries and run the data through. Response from the Esper engine is real-time when conditions occur that match user defined queries. The execution model is thus continuous rather than only when a query is submitted.

Such concepts are a key foundation of EDA, and have been under active research in more than the last 10 years. Awareness of the importance of such systems in real-world architectures has started to emerge only recently.

In Esper, a tailored EQL allows registering queries in the engine. A listener class--which is basically a POJO--will then be called by the engine when the EQL condition is matched as events flow in. The EQL enables to express complex matching conditions that include temporal windows, joining of different event streams, as well as filtering, aggregation, and sorting. Esper statements can also be combined together with "followed by" conditions thus deriving complex events from more simple events. Events can be represented as JavaBean classes, legacy Java classes, XML document or java.util.Map, which promotes reuse of existing systems acting as messages publishers.

A trivial yet meaningful example is as follow: assume a trader wants to buy Google stock as soon as the price goes below some floor value-- not when looking at each tick but when the computation is done over a sliding time window--say of 30 seconds. Given a StockTick event bean with a price and symbol property and the EQL "select avg(price) from StockTick.win:time(30 sec) where symbol='GOOG'", a listener POJO would get notified as ticks come in to trigger the buy order.

ESP and CEP can be used in algorithmic trading, RFID, service-level agreement, adaptive computing, fraud detection, real-time business intelligence and customer relationship management. There are many more use cases you will discover once you get more familiar with Esper through the next section of this article, which builds up on failure detection and customer management use cases. Several use cases are available online at the Esper web site.

Case Study: A Self-Service Terminal Managing System

In this example we consider a self-service terminal system as it exists in airports to allow customers to proceed to self-check in and print boarding passes. The self-service terminal managing system gets a lot of events from all the connected terminals. The event rate is around 500 events per second. Some events indicate abnormal situations such as "paper low" or "terminal out of order." Other events observe activity as a customer uses a terminal to check in and print her boarding pass (see Figure 1).

figure
Figure 1. Event cloud in a terminal managing system

Our primary goal is to resolve self-service terminal or network problems before our customers report them by looking for help, which means higher overall availability and greater customer satisfaction. To accomplish this, we would like to get alerted when certain conditions occur that warrant human intervention: for example, a customer may be in the middle of a check-in process when the terminal detects a hardware problem or when the network goes down. Under these conditions we would like to dispatch a staff member to help that customer, and another staff member to diagnose the hardware or network problem.

We also want to provide a dashboard and summarize activity on an ongoing basis and feed this to a real-time interface. This enables a manager to watch the system in action and spot abnormalities. The system can further compare the summarized activity to stored normal usage patterns.

Events as JavaBeans

Esper is able to handle events as JavaBeans, arbitrary java classes, java.util.Map, or XML documents. In this case study we assume we decided to use the JavaBeans representation for simplicity. Each self-service terminal publishes any of the six events kind below.

Checking A customer started a check-in dialog
Cancelled A customer cancelled a check-in dialog
Completed A customer completed a check-in dialog
OutOfOrder A terminal detected a hardware problem (issued once until fixed by maintenance staff)
LowPaper A terminal is low on paper (issued once until fixed by maintenance staff)
Status Terminal status, published every 1 minute regardless of activity (terminal heartbeat)

All events provide information about the terminal that published the event. Since all events carry similar information, we model each event as a subtype to a base class BaseTerminalEvent, which will provide the terminal information that all events issued by a terminal share. A real-world model would of course be more complex--possibly using XML instead.

public abstract class BaseTerminalEvent {
    private final Terminal terminal;

    public BaseTerminalEvent(Terminal terminal) {
        this.terminal = terminal;
    }

    public String getType() {
        return this.getClass().getSimpleName();
    }

    public Terminal getTerminal() {
        return terminal;
    }
}

For the terminal information we use a simple class to hold the terminal ID:

public class Terminal {
    private String id;

    public Terminal(String id) {
        this.id = id;
    }

    public String getId() {
        return id;
    }
}

The Status event class is thus a subclass of BaseTerminalEvent:

public class Status extends BaseTerminalEvent {
    public Status(Terminal terminal) {
        super(terminal);
    }
}

Introduction to EQL and Patterns

Esper EQL is an object-oriented event stream query language very similar to SQL in its syntax but that significantly differs to be able to deal with sliding window of streams of data. Esper also includes a pattern language that provides for stateful (state-machine) event pattern matching. EQL and patterns can be used alone or can also be combined to express complex temporal logic.

Let's assume we want to dispatch staff to restock paper supply when a terminal publishes a LowPaper event. The simple EQL statement below detects such events:

select * from LowPaper

Besides looking for LowPaper events, we would also like to be notified when OutOfOrder events arrive. We could thus use two separate statements that each filter for only one type of event:

select * from LowPaper
select * from OutOfOrder

Another solution would be to use a single statement and include an event pattern combined with an or condition:

select a,b from pattern [ every a=LowPaper or every b=OutOfOrder]

We could also implement this with the help of event polymorphism and the BaseTerminalEvent. As all events are subclasses of BaseTerminalEvent, we can use a where clause to filter out the events we are interested in and use the type property (that is the actual class name without package information--see BaseTerminalEvent):

select * from BaseTerminalEvent
where type = 'LowPaper' or type = 'OutOfOrder'

Deciding on a statement merely depends on design choices in this simple case.

Registering Statements and Listeners

Esper can be configured using either straightforward API, or an XML descriptor. We will use the API here as a Java-centric approach. We then first configure and get an engine instance, register statement(s), and then attach one or more listeners to the created statement(s). The engine allows for nickname to avoid having to specify fully qualified class names in EQL when using JavaBeans event representation:

// Configure engine: give nickames to event classes
Configuration config = new Configuration();
config.addEventTypeAlias("Checkin", Checkin.class);
config.addEventTypeAlias("Cancelled", Cancelled.class);
config.addEventTypeAlias("Completed", Completed.class);
config.addEventTypeAlias("Status", Status.class);
config.addEventTypeAlias("LowPaper", LowPaper.class);
config.addEventTypeAlias("OutOfOrder", OutOfOrder.class);
config.addEventTypeAlias("BaseTerminalEvent", BaseTerminalEvent.class);
// Get engine instance
EPServiceProvider epService =
    EPServiceProviderManager.getDefaultProvider(config);

// Register statement
String statement = "select * from LowPaper";
EPStatement statement =
    epService.getEPAdministrator().createEQL(stmt);
// Attach a listener
statement.addListener(new SampleListener());

The engine calls all listener classes attached to a statement as soon as new results for a statement are available. Events are encapsulated by an EventBean instance which allows querying the event properties and underlying event class. The engine indicates when events enter a data window via newEvents and leave a data window via oldEvents.

Let's look at a sample listener implementation:

public class SampleListener implements UpdateListener {
    public void update(EventBean[] newEvents, EventBean[] oldEvents) {
        LowPaper lowPaper = (LowPaper) newEvents[0].getUnderlying();
        String terminal = (String) newEvents[0].get("terminal");
        String text = (String) newEvents[0].get("text");
    }
}

An experienced reader might realize that this example could be done with JMS, perhaps with some queue and naming conventions, a few instanceof tests, and a MessageSelector. while that might be the case for this example, we will see in the next sections, as our requirements for analyzing streams get more complex, we will outgrow what JMS is capable of easily. We will see examples of causality relations, joins and aggregate computations that would likely end up in the development team spending most of their time on an in-house project-specific implementation.

Detecting the Absence of Status Events

Each self-service terminal publishes a Status event every 1 minute. The Status event indicates the terminal is alive and online. The absence of Status events may indicate that a terminal went offline for some reason and that needs to be investigated.

Since Status events arrive in regular intervals of 60 seconds, we can make use of temporal pattern matching using timer to find events that didn't arrive. We can use the every operator and timer:interval() to repeat an action every 60 seconds. Then we combine this with a not operator to check for absence of Status events. A 65-second interval during which we look for Status events allows 5 seconds to account for a possible delay in transmission or processing:

select 'terminal 1 is offline' from pattern
[ every timer:interval(60 sec) ->
(timer:interval(65 sec) and not Status(term.id = 'T1'))]
output first every 5 minutes

Since we way not want to see an alert for the same terminal every 1 minute, we added an output first clause to indicate that we only want to be alerted the first time this happens, and then not be alerted for 5 minutes, and then be alerted again if it happens again.

figure
Figure 2. The output first clause can suppress output events

Note that we hardcoded the terminal ID to 'T1' in this query for simplicity. As we are looking for an absent event, it would require to use a subquery to detect all terminal failures with one query.

As you read through those examples you probably already realize that Esper ESP/CEP query language expressiveness enables us to do declarative programming in a very loosely-coupled way thanks to the underlying Event Driven Architecture. Most of the detection logic mirroring our business specifications are directly written in statements and not in custom code.

Activity Summary Data

By presenting statistical information about terminal activity to our staff in real-time we enable them to monitor the system and quickly spot problems. To begin with, the real-time console should show a count of the number of check-in processes started, in progress, cancelled, and completed within the last 10 minutes.

This first query counts the number of Checkin considering only the last 10 minutes of event data:

select count(*) from Checkin.win:time(10 minutes)

Note the use of the win:time syntax. This tells the engine to consider a time window consisting of only the last 10 minutes of the Checkin event stream.

We can improve this query and get a count per event type considering all types of events (Checkin, Completed, Cancelled, Status, OutOfOrder, LowPaper) by using BaseTerminalEvent. Again we want to look at only the last 10 minutes of events so we will use a win:time view. We further want to get notified every 1 minute and not at each change, hence we will use an output all clause:

select type, count(*)
from BaseTerminalEvent.win:time(10 minutes)
group by type
output all every 1 minutes

Running the Application

The source code is provided in the download that accompanies the article.

Resources

Thomas Bernhardt is founder and project lead of the Esper open-source project. During the day, Thomas works as an architect at a major financial institution, implementing event-driven software systems.

Alexandre Vasseur works on Event-Driven Architectures and Java middleware and co-leads the Esper open source ESP/CEP project.


Return to ONJava.com.

Copyright © 2009 O'Reilly Media, Inc.