ONJava.com -- The Independent Source for Enterprise Java
oreilly.comSafari Books Online.Conferences.

advertisement

AddThis Social Bookmark Button

J2EE Without the Application Server
Pages: 1, 2, 3, 4, 5, 6, 7

Step 5: Coding the MessageDrivenBank

In this step, we will add the JMS processing logic. In order to do this, we merely need to implement the JMS MessageListener interface. We also add the public setBank method to make Spring's dependency injection work. The source code is below.

package jms;

import jdbc.Bank;
import javax.jms.Message;
import javax.jms.MapMessage;
import javax.jms.MessageListener;

public class MessageDrivenBank
implements MessageListener
{
    private Bank bank;

    public void setBank ( Bank bank )
    {
        this.bank = bank;
    }

    //this method can be private
    //since it is only needed within
    //this class 
    private Bank getBank()
    {
        return this.bank;
    }

    public void onMessage ( Message msg )
    {
        try {
          MapMessage m = ( MapMessage ) msg;
          int account = m.getIntProperty ( "account" );
          int amount = m.getIntProperty ( "amount" );
          bank.withdraw ( account , amount );
          System.out.println ( "Withdraw of " + 
          amount + " from account " + account );
        }
        catch ( Exception e ) {
          e.printStackTrace();
            
          //force rollback
          throw new RuntimeException ( 
          e.getMessage() );
        }
    }
    
}

Step 6: Configuring the MessageDrivenBank

Here we configure our MessageDrivenBank to listen on a transactional (JTA-aware) QueueReceiverSessionPool. This gives us the same message guarantees as EJB (no message loss nor duplicate messages), but with simple POJO objects instead. When a MessageListener is plugged into the pool, the pool will make sure that messages are received within a JTA/XA transaction. Combined with a JTA/XA-capable JDBC datasource, we get reliable messaging. The resulting Spring configuration can be found below:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" 
"http://www.springframework.org
/dtd/spring-beans.dtd">

<!-- 
        NOTE: no explicit transaction manager bean 
        is necessary
        because the QueueReceiverSessionPool will
        start transactions by itself.
-->
<beans>
    <bean id="datasource" 
        class="com.atomikos.jdbc.nonxa.NonXADataSourceBean">
        <property name="user">
            <value>sa</value>
        </property>
        <property name="url">
            <value>jdbc:hsqldb:SpringNonXADB</value>
        </property>
        <property name="driverClassName">
            <value>org.hsqldb.jdbcDriver</value>
        </property>
        <property name="poolSize">
            <value>1</value>
        </property>
        <property name="connectionTimeout">
            <value>60</value>
        </property>
    </bean>
    <bean id="xaFactory" 
        class="org.activemq.ActiveMQXAConnectionFactory">
        <property name="brokerURL">
            <value>tcp://localhost:61616</value>
        </property>
    </bean>
    <bean id="queue" 
        class="org.activemq.message.ActiveMQQueue">
        <property name="physicalName">
            <value>BANK_QUEUE</value>
        </property>
    </bean>
    <bean id="bank" class="jdbc.Bank">
        <property name="dataSource">
            <ref bean="datasource"/>
        </property>
    </bean>
    <bean id="messageDrivenBank" 
        class="jms.MessageDrivenBank">
        <property name="bank">
            <ref bean="bank"/>
        </property>
    </bean>
    <bean id="queueConnectionFactoryBean" 
        class="com.atomikos.jms.QueueConnectionFactoryBean">
        <property name="resourceName">
            <value>QUEUE_BROKER</value>
        </property>
        <property name="xaQueueConnectionFactory">
            <ref bean="xaFactory"/>
        </property>
    </bean>
    <bean id="queueReceiverSessionPool" 
        class="com.atomikos.jms.QueueReceiverSessionPool" 
        init-method="start">
        
        <property name="queueConnectionFactoryBean">
            <ref bean="queueConnectionFactoryBean"/>
        </property>
        <property name="transactionTimeout">
            <value>120</value>
        </property>
        <!-- 
        default license allows only limited 
        concurrency so keep pool small 
        -->
        <property name="poolSize">
            <value>1</value>
        </property>
        <property name="queue">
            <ref bean="queue"/>
        </property>
        <property name="messageListener">
            <ref bean="messageDrivenBank"/>
        </property>
    </bean>
</beans>

Because this article needs a JMS service that is easy to install, we will use ActiveMQ here. If you are using another JMS implementation, then you should still be able to apply the techniques outlined in this section. Next to the familiar datasource and bank objects, the following object definitions have been added:

  • The xaFactory: A connection factory for establishing JMS connections.
  • The queue: This represents the JMS Queue we will use, configured the way ActiveMQ requires.
  • The queueConnectionFactoryBean: A JMS connector that is JTA-aware.
  • A queueReceiverSessionPool for JTA-enabled message consumption. Note that we also specify an initialization method (i.e., start) to be called; this is again a Spring feature. The start method is defined in the session pool class, and it is being referred to in the XML element of the Spring configuration file.
  • The messageDrivenBank is responsible for processing the messages.

You may ask yourself where the transaction management has gone. Indeed, the objects that were added in the previous section have again disappeared. Why? Because we now use the QueueReceiverSessionPool to receive messages from JMS, and this class also starts a JTA transaction for each receive. We could have left the JTA configuration as it was and merely added the JMS elements, but it would have made the XML file a little longer. The session pool class now assumes the role of the transaction management added in Step 5. It works similar to the proxy approach; only this class needs a JMS MessageListener to add transactions to. With this configuration, a new transaction will be started before each message consumption, and this transaction will commit whenever our onMessage implementation returns normally. If there is a RuntimeException, then the transaction will be rolled back. The architecture is shown in Figure 5 (some of the JMS objects have been omitted for clarity).

Architecture with JMS
Figure 5. Architecture for message-driven applications in Spring

The architecture now works as follows:

  1. The application retrieves the bank object and initializes the database tables if necessary.
  2. The application retrieves the queueReceiverSessionPool, thereby triggering a call of the start method to begin listening for incoming messages.
  3. The queueReceiverSessionPool detects a new message on the queue.
  4. The queueReceiverSessionPool starts a new transaction and registers with it.
  5. The queueReceiverSessionPool calls the registered MessageListener (the messageDrivenBank, in our case).
  6. This triggers a call on the bank.
  7. The bank uses the datasource to access the database.
  8. The datasource registers with the transaction.
  9. The database is accessed via JDBC.
  10. When the processing is done, the queueReceiverSessionPool terminates the transaction. Unless there is a RuntimeException, the desired outcome is commit.
  11. The transaction manager initiates two-phase commit with the message queue.
  12. The transaction manager initiates two-phase commit with the database.

Pages: 1, 2, 3, 4, 5, 6, 7

Next Pagearrow