Transactional Bank implementation using XA

This example illustrates how to implement a transactional object that use a database for storage. It also illustrates a distributed transaction across multiple processes. The example uses two server processes that provide "Bank" objects, and a client process that transfers funds from an account in one bank into an account in the other bank.

The source files are:

BankImpl        // Implementation of the Bank object.
AccountImpl     // Implementation of the Account object.
BankServerA     // First Bank Server program.
BankServerB     // Second Bank Server program.
BankClient      // Client program that transfers funds.

Compiling and Executing

Use the supplied ANT script to compile and execute the example programs. To compile run:

   ant

To execute the test, you need to start the two Bank servers, and when the two Bank server programs are ready to receive requests you can run the Bank client program.

To start the servers, in separate console windows run:

   ant servera
   ant serverb

When each server program prints out a message like "Bank ... open for business", start the client program by running:

   ant client

Bank Implementation

This implementation uses a database table to store the activity for each account. If there is a transaction associated with the current thread, then the access to the database is via XA connection, which allows the creator of the transaction to control the outcome of the transaction. If there is no transaction associated with the current thread, then the database access is via normal JDBC connection, where each update is automatically committed.

package txBank;
                                                                           
import java.io.File;
import java.util.Date;
import java.util.Hashtable;
import java.text.SimpleDateFormat;
import java.sql.*;
import javax.sql.*;
import javax.transaction.*;
import javax.transaction.xa.*;
import com.sssw.jts.api.TransactionService;
                                                                           
import org.omg.CORBA.ORB;
import org.omg.CORBA.Policy;
                                                                           
import org.omg.PortableServer.POA;
import org.omg.PortableServer.Servant;
                                                                           
import txBank.BankPackage.noSuchAccount;
import txBank.BankPackage.AccountCreationError;
                                                                           
import common.Utils;
import common.TestDataSource;
import common.ResourceMapper;
                                                                           
public class BankImpl extends BankPOA
{
    ORB                 _orb;
    TransactionManager  _tm;
    String              _tableName;
    ResourceMapper      _resmapper;
    TestDataSource      _ds;
    POA                 _acctPOA;
    Hashtable           _accounts;
    int                 _nextAccnt;
                                                                           
    private String      _sqlCreate;
    private String      _sqlUpdate;
    private String      _sqlInsert;
    private String      _sqlSelect;
    private String      _sqlSelAll;
                                                                           
    static SimpleDateFormat     _datefmt =
        new SimpleDateFormat("dd.MM.yyyy HH:mm:ss.SSS");
                                                                           
    BankImpl(ORB orb, POA bankPOA, String db, String tabname,
        ResourceMapper resmapper) throws Exception
    {
    |   TransactionService ts = (TransactionService)
    |       orb.resolve_initial_references("TransactionService");
    |                                                                      
    |                                                                      
    |   _orb = orb;
    |   _tm = ts.getTransactionManager();
    |   _resmapper = resmapper;
    |   _tableName = tabname;
    |   _nextAccnt = 1;
    |   _acctPOA = bankPOA;
    |   _accounts = new Hashtable();
    |   _ds = new TestDataSource(db);
    |                                                                      
    |   _sqlCreate = "CREATE TABLE " + _tableName
    |              + "(ACCNTID INTEGER"
    |              + ",ACTIVITY NUMERIC(5,2)"
    |              + ",DESCRIPTION VARCHAR(200)"
    |              + ")";
    |   _sqlInsert = "INSERT INTO " + _tableName
    |              + " (ACCNTID, ACTIVITY, DESCRIPTION) VALUES (?,?,?)";
    |   _sqlSelect = "SELECT ACTIVITY FROM " + _tableName
    |              + " WHERE ACCNTID=?";
    |   _sqlSelAll = "SELECT ACCNTID, ACTIVITY FROM " + _tableName;
    |                                                                      
    |   if (!loadTable()) {
    |   |   createTable();
    |   }
    }
                                                                           
    public int closedown()
    {
    |   try {
    |   |   _ds.shutdown();
    |   |   _orb.shutdown(false);
    |   } catch (Throwable T) {
    |   |   DbgOut("Shutdown problems ... "+T);
    |   }
    |   return 0;
    }
                                                                           
    public String bankname()
    {
    |   return _tableName;
    }
                                                                           
    public Account getAccount(int acctNum) throws noSuchAccount
    {
    |   Account account = null;
    |   DbgOut("Lookup account "+acctNum);
    |   try {
    |   |   account = (Account) _accounts.get(new Integer(acctNum));
    |   } catch (Throwable T) {
    |   }
    |   if (account == null)
    |       throw new noSuchAccount();
    |   return account;
    }
                                                                           
    public synchronized int newAccount() throws AccountCreationError
    {
    |   int id = _nextAccnt;
    |   try {
    |   |   createAccnt(id);
    |   } catch (Exception e) {
    |   |   throw new AccountCreationError();
    |   }
    |   try {
    |   |   txAddActivity(id, 0);
    |   } catch (Exception e) {
    |   }
    |   _nextAccnt = id + 1;
    |                                                                      
    |   DbgOut("Created new account  "+id);
    |   return id;
    }
                                                                           
    private void createAccnt(int accntid)
        throws Exception
    {
    |   byte[] objectId  = new byte[1];
    |   objectId[0] = (byte) accntid;
    |                                                                      
    |   Servant servant = new AccountImpl(this, accntid, 0);
    |   _acctPOA.activate_object_with_id(objectId, servant);
    |   Account account = (Account) _acctPOA.servant_to_reference(servant);
    |   _accounts.put(new Integer(accntid), account);
    }
                                                                           
    void txAddActivity(int id, float amount) throws Exception
    {
    |   Transaction  tx = _tm.getTransaction();
    |   XAConnection xaconn = null;
    |   XAResource   xares  = null;
    |   Connection   conn   = null;
    |   String       descr  = _datefmt.format(new Date());
    |   try {
    |   |   if (tx != null) {
    |   |   |   xaconn = _ds.getXAConnection();
    |   |   |   xares  = xaconn.getXAResource();
    |   |   |   conn   = xaconn.getConnection();
    |   |   |   tx.enlistResource(xares);
    |   |   |   _resmapper.addResource(xares);
    |   |   |   String txname = tx.toString();
    |   |   |   if (txname.length() > 80) {
    |   |   |   |   txname = txname.substring(0, 80);
    |   |   |   }
    |   |   |   descr += " TX: "+txname;
    |   |   } else {
    |   |   |   conn = _ds.getConnection();
    |   |   |   descr += " <no-transaction>";
    |   |   }
    |   |   try {
    |   |   |   PreparedStatement stmt = conn.prepareStatement(_sqlInsert);
    |   |   |   try {
    |   |   |   |   stmt.setInt(1, id);
    |   |   |   |   stmt.setFloat(2, amount);
    |   |   |   |   stmt.setString(3, descr);
    |   |   |   |   int upd = stmt.executeUpdate();
    |   |   |   |   DbgOut("Add activity: ("+id+", "+amount+", "+descr);
    |   |   |   } finally {
    |   |   |   |   stmt.close();
    |   |   |   }
    |   |   } finally {
    |   |   |   conn.close();
    |   |   }
    |   } catch (SQLException ex) {
    |   |   Utils.displayError("ERROR ACCESING TABLE : "+ex, ex);
    |   |   throw ex;
    |   } finally {
    |   |   if (tx != null) {
    |   |   |   if (xares != null) {
    |   |   |   |   try {
    |   |   |   |   |   tx.delistResource(xares, xares.TMSUSPEND);
    |   |   |   |   } catch (Exception ex) {
    |   |   |   |   |   Utils.displayError("ERROR : "+ex, ex);
    |   |   |   |   |   throw ex;
    |   |   |   |   }
    |   |   |   }
    |   |   }
    |   }
    }
                                                                           
    float txReadBalance(int id) throws Exception
    {
    |   float balance = 0;
    |   Transaction  tx = _tm.getTransaction();
    |   XAConnection xaconn = null;
    |   XAResource   xares  = null;
    |   Connection   conn   = null;
    |   try {
    |   |   if (tx != null) {
    |   |   |   xaconn = _ds.getXAConnection();
    |   |   |   xares  = xaconn.getXAResource();
    |   |   |   conn   = xaconn.getConnection();
    |   |   |   tx.enlistResource(xares);
    |   |   |   _resmapper.addResource(xares);
    |   |   } else {
    |   |   |   conn = _ds.getConnection();
    |   |   }
    |   |   try {
    |   |   |   PreparedStatement stmt = conn.prepareStatement(_sqlSelect);
    |   |   |   ResultSet rset = null;
    |   |   |   try {
    |   |   |   |   stmt.setInt(1, id);
    |   |   |   |   rset = stmt.executeQuery();
    |   |   |   |   while (rset.next()) {
    |   |   |   |   |   balance = balance + rset.getFloat(1); 
    |   |   |   |   }
    |   |   |   |   DbgOut("read acct("+id+") balance("+balance+")");
    |   |   |   |   rset.close();
    |   |   |   } finally {
    |   |   |   |   stmt.close();
    |   |   |   }
    |   |   } finally {
    |   |   |   conn.close();
    |   |   }
    |   } catch (SQLException ex) {
    |   |   Utils.displayError("ERROR READING FROM TABLE : "+ex, ex);
    |   |   throw ex;
    |   } finally {
    |   |   if (tx != null) {
    |   |   |   if (xares != null) {
    |   |   |   |   try {
    |   |   |   |   |   tx.delistResource(xares, xares.TMSUSPEND);
    |   |   |   |   } catch (Exception ex) {
    |   |   |   |   |   Utils.displayError("ERROR : "+ex, ex);
    |   |   |   |   |   throw ex;
    |   |   |   |   }
    |   |   |   }
    |   |   }
    |   }
    |   return balance;
    }
                                                                           
    private boolean loadTable()
    {
    |   boolean retv = false;
    |   try {
    |   |   Connection conn = _ds.getConnection();
    |   |   try {
    |   |   |   Statement stmt = conn.createStatement();
    |   |   |   ResultSet rset = null;
    |   |   |   try {
    |   |   |   |   try {
    |   |   |   |   |   rset = stmt.executeQuery(_sqlSelAll);
    |   |   |   |   |   retv = true;
    |   |   |   |   } catch (SQLException ex) {
    |   |   |   |   }
    |   |   |   |   if (rset != null) {
    |   |   |   |   |   DbgOut("Loading account table "+_tableName);
    |   |   |   |   |   while(rset.next()) {
    |   |   |   |   |   |                                                  
    |   |   |   |   |   |   int   accntid = rset.getInt(1);
    |   |   |   |   |   |   float amount  = rset.getFloat(2); 
    |   |   |   |   |   |   Integer accntkey = new Integer(accntid);
    |   |   |   |   |   |   if (_accounts.get(accntkey) == null) {
    |   |   |   |   |   |   |   createAccnt(accntid);
    |   |   |   |   |   |   |   DbgOut("Account : "+accntid+" : "+amount);
    |   |   |   |   |   |   |   if (_nextAccnt <= accntid) {
    |   |   |   |   |   |   |   |   _nextAccnt = accntid+1;
    |   |   |   |   |   |   |   }
    |   |   |   |   |   |   } else {
    |   |   |   |   |   |   |   DbgOut("Activity: "+accntid+" : "+amount);
    |   |   |   |   |   |   }
    |   |   |   |   |   }
    |   |   |   |   |   DbgOut("Table Loaded");
    |   |   |   |   |   rset.close();
    |   |   |   |   }
    |   |   |   } finally {
    |   |   |   |   stmt.close();
    |   |   |   }
    |   |   } finally {
    |   |   |   conn.close();
    |   |   }
    |   } catch (Exception ex) {
    |   |   Utils.displayError("ERROR LOADING TABLE : "+ex, ex);
    |   }
    |   return retv;
    }
                                                                           
    private boolean createTable()
    {
    |   boolean retv = false;
    |   try {
    |   |   Connection conn = _ds.getConnection();
    |   |   try {
    |   |   |   Statement stmt = conn.createStatement();
    |   |   |   try {
    |   |   |   |   stmt.execute(_sqlCreate);
    |   |   |   |   DbgOut("Created accounts table "+_tableName);
    |   |   |   |   retv = true;
    |   |   |   } finally {
    |   |   |   |   stmt.close();
    |   |   |   }
    |   |   } finally {
    |   |   |   conn.close();
    |   |   }
    |   } catch (SQLException ex) {
    |   |   Utils.displayError("ERROR CREATING TABLE : "+ex, ex);
    |   }
    |   return retv;
    }
                                                                           
    private static void DbgOut(String s)
    {
    |   System.out.println("+ "+s);
    }
}

Account Implementation

This is a simple implementation that allows deposit and withdrawal of funds from an account. It checks for sufficient funds before withdrawing funds. Each activity, deposit or withdrawal, is recorded separately. The balance is always computed as a sum of all activities on an account.

package txBank;
                                                                           
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
                                                                           
import org.omg.CORBA.PERSIST_STORE;
                                                                           
import txBank.AccountPackage.CannotDeposit;
import txBank.AccountPackage.CannotWithdraw;
import txBank.AccountPackage.InsufficientFunds;
import txBank.AccountPackage.BalanceNotAvailable;
                                                                           
import common.TestDataSource;
                                                                           
public class AccountImpl extends AccountPOA
{
    int             _id;
    BankImpl        _bank;
                                                                           
    public AccountImpl(BankImpl bank, int id, float balance)
    {
    |   _id = id;
    |   _bank = bank;
    }
                                                                           
    public String accountname()
    {
    |   return _bank.bankname()+".#."+_id;
    }
                                                                           
    public synchronized float deposit(float sum)
        throws CannotDeposit
    {
    |   try {
    |   |   writeActivity(sum);
    |   |   return balance();
    |   } catch (Exception ex) {
    |   |   throw new CannotDeposit();
    |   }
    }
                                                                           
    public synchronized float withdraw(float sum)
        throws InsufficientFunds, CannotWithdraw
    {
    |   try {
    |   |   float balance = balance() - sum;
    |   |   if (balance < 0)
    |   |       throw new InsufficientFunds();
    |   |                                                                  
    |   |   writeActivity(0-sum);
    |   |   return balance();
    |   } catch (InsufficientFunds ex) {
    |   |   throw ex;
    |   } catch (Exception ex) {
    |   |   throw new CannotWithdraw();
    |   }
    }
                                                                           
    public synchronized float balance()
        throws BalanceNotAvailable
    {
    |   try {
    |   |   return readBalance();
    |   } catch (Exception ex) {
    |   |   throw new BalanceNotAvailable();
    |   }
    }
                                                                           
    void writeActivity(float amount) throws Exception
    {
    |   _bank.txAddActivity(_id, amount);
    }
                                                                           
    float readBalance() throws Exception
    {
    |   return _bank.txReadBalance(_id);
    }
}

BankServerA Implementation

Server program that creates and activates a Bank object. It demonstrates creating a POA with OTSPolicy.

package txBank;
                                                                           
import java.util.Properties;
                                                                           
import org.omg.CORBA.Any;
import org.omg.CORBA.ORB;
import org.omg.CORBA.Object;
import org.omg.CORBA.Policy;
                                                                           
import org.omg.PortableServer.POA;
import org.omg.PortableServer.Servant;
import org.omg.PortableServer.LifespanPolicyValue;
import org.omg.PortableServer.IdAssignmentPolicyValue;
import org.omg.PortableServer.ImplicitActivationPolicyValue;
                                                                           
import org.omg.CosTransactions.ADAPTS;
import org.omg.CosTransactions.SHARED;
import org.omg.CosTransactions.OTS_POLICY_TYPE;
import org.omg.CosTransactions.INVOCATION_POLICY_TYPE;
                                                                           
import com.sssw.jts.api.TransactionService;
import com.sssw.jbroker.api.transaction.TSIdentification;
                                                                           
import common.Utils;
import common.ResourceMapper;
import common.TestDataSource;
                                                                           
public class BankServerA
{
    public static void main(String args[]) throws Exception
    {
    |   mainBody(args, "bankA", TestDataSource._testDBkA, "FleeceNationalBank");
    }
                                                                           
    static void mainBody(String args[], String iorName,
        String dbName, String bankName) throws Exception
    {
    |   // Create a ResourceHandleMapper
    |   ResourceMapper resMapper = new ResourceMapper();
    |                                                                      
    |   System.setProperty("transaction.service.id", bankName);
    |                                                                      
    |   // Initialize the ORB and get instance of TM
    |   ORB orb = ORB.init(args, null);
    |   TransactionService ts = (TransactionService)
    |       orb.resolve_initial_references("TransactionService");
    |                                                                      
    |   //
    |   // Recover TM.
    |   //
    |   ts.recover(resMapper, false, null);
    |                                                                      
    |   // get the root POA
    |   POA rootPOA = (POA) orb.resolve_initial_references("RootPOA");
    |                                                                      
    |   // create the POA for Transient Account Objects
    |   Any otsPolicy = orb.create_any();
    |   otsPolicy.insert_short(ADAPTS.value);
    |                                                                      
    |   Any invPolicy = orb.create_any();
    |   invPolicy.insert_short(SHARED.value);
    |                                                                      
    |   POA bankPOA = rootPOA.create_POA("poa_"+bankName,
    |       rootPOA.the_POAManager(),
    |       new Policy[] {
    |       |   rootPOA.create_id_assignment_policy(IdAssignmentPolicyValue.USER_ID)
    |       |   , rootPOA.create_lifespan_policy(LifespanPolicyValue.TRANSIENT)
    |       |   , orb.create_policy(OTS_POLICY_TYPE.value, otsPolicy)
    |       |   , orb.create_policy(INVOCATION_POLICY_TYPE.value, invPolicy)
    |           }
    |       );
    |   bankPOA.the_POAManager().activate();
    |                                                                      
    |   Servant servant;
    |   org.omg.CORBA.Object bankobj;
    |                                                                      
    |   servant = new BankImpl(orb, bankPOA, dbName, bankName, resMapper);
    |   bankPOA.activate_object_with_id(("bank_"+bankName).getBytes(), servant);
    |   bankobj = bankPOA.servant_to_reference(servant);
    |                                                                      
    |   String bankAIOR = orb.object_to_string(bankobj);
    |                                                                      
    |   // write the stringified object reference
    |   Utils.writeIOR(bankAIOR, iorName+".ior"); 
    |                                                                      
    |   System.out.println("+ "+bankName+" is now open for business ...");
    |   orb.run();
    }
}

BankServerB Implementaion

Extends BankServerA for initializing the Bank object with a different name and database.

package txBank;
                                                                           
import common.TestDataSource;
                                                                           
public class BankServerB extends BankServerA
{
    public static void main(String args[]) throws Exception
    {
    |   mainBody(args, "bankB", TestDataSource._testDBkB, "InsolventBank");
    }
}

BankClient Implementation

This the client program that establishes connection with the two Bank servers, and transfers funds from one account to another in a distributed transaction.

package txBank;
                                                                           
import java.io.*;
import java.util.*;
                                                                           
import org.omg.CORBA.ORB;
                                                                           
import org.omg.CosTransactions.Current;
                                                                           
import com.sssw.jts.api.TransactionService;
import com.sssw.jbroker.api.transaction.TSIdentification;
                                                                           
import common.Utils;
import common.ResourceMapper;
                                                                           
import txBank.BankPackage.noSuchAccount;
import txBank.BankPackage.AccountCreationError;
                                                                           
public class BankClient
{
    public static void main(String args[]) throws Exception
    {
    |   // Create a ResourceHandleMapper
    |   ResourceMapper resMapper = new ResourceMapper();
    |                                                                      
    |   System.setProperty("transaction.service.id", "BankClient");
    |                                                                      
    |   // Initialize the ORB and get instance of TM
    |   ORB orb = ORB.init(args, null);
    |   TransactionService ts = (TransactionService)
    |       orb.resolve_initial_references("TransactionService");
    |                                                                      
    |   //
    |   // Recover TM
    |   //
    |   ts.recover(resMapper, false, null);
    |                                                                      
    |   //
    |   // Locate BankA and BankB
    |   //
    |   String bankIOR;
    |                                                                      
    |   bankIOR = Utils.readIOR("bankA.ior");
    |   Bank bankA = BankHelper.narrow(orb.string_to_object(bankIOR));
    |   System.out.println("+ Connected to : "+bankA.bankname());
    |                                                                      
    |   bankIOR = Utils.readIOR("bankB.ior");
    |   Bank bankB = BankHelper.narrow(orb.string_to_object(bankIOR));
    |   System.out.println("+ Connected to : "+bankB.bankname());
    |                                                                      
    |   //
    |   // Make sure accounts exists
    |   //
    |   Account accntWithdraw;
    |   Account accntDeposit;
    |                                                                      
    |   try {
    |   |   accntWithdraw = bankA.getAccount(1);
    |   } catch (noSuchAccount ex) {
    |   |   accntWithdraw = bankA.getAccount(bankA.newAccount());
    |   |   accntWithdraw.deposit(100);
    |   }
    |                                                                      
    |   float diff = 100 - accntWithdraw.balance();
    |   if (diff > 0) {
    |   |   System.out.println("+ Adding funds to "+accntWithdraw.accountname()+" "+diff);
    |   |   accntWithdraw.deposit(diff);
    |   }
    |   try {
    |   |   accntDeposit = bankB.getAccount(1);
    |   } catch (noSuchAccount ex) {
    |   |   accntDeposit = bankB.getAccount(bankB.newAccount());
    |   |   accntDeposit.deposit(100);
    |   }
    |                                                                      
    |   Current txCurrent = (Current)
    |       orb.resolve_initial_references("TransactionCurrent");
    |                                                                      
    |   System.out.println("+ Staring balances");
    |   System.out.println("+    Account: "+accntWithdraw.accountname()+" Balance: "+accntWithdraw.balance());
    |   System.out.println("+    Account: "+accntDeposit.accountname()+" Balance: "+accntDeposit.balance());
    |   System.out.println();
    |                                                                      
    |   transfer(txCurrent, accntWithdraw, accntDeposit, 75);
    |                                                                      
    |   System.out.println();
    |   System.out.println("+ Balances after first transfer");
    |   System.out.println("+    Account: "+accntWithdraw.accountname()+" Balance: "+accntWithdraw.balance());
    |   System.out.println("+    Account: "+accntDeposit.accountname()+" Balance: "+accntDeposit.balance());
    |   System.out.println();
    |                                                                      
    |   transfer(txCurrent, accntWithdraw, accntDeposit, 50);
    |                                                                      
    |   System.out.println();
    |   System.out.println("+ Balances after second transfer");
    |   System.out.println("+    Account: "+accntWithdraw.accountname()+" Balance: "+accntWithdraw.balance());
    |   System.out.println("+    Account: "+accntDeposit.accountname()+" Balance: "+accntDeposit.balance());
    |                                                                      
//      try { bankA.closedown(); } catch (Throwable T) {}
//      try { bankB.closedown(); } catch (Throwable T) {}
    }
                                                                           
    static void transfer(Current txCurrent, Account accntWithdraw, Account accntDeposit, float amnt)
        throws Exception
    {
    |   boolean success = false;
    |   txCurrent.begin();
    |                                                                      
    |   System.out.print("+ Transfer of "+amnt+" from ["
    |       + accntWithdraw.accountname() + "] to [" + accntDeposit.accountname() + "] ");
    |                                                                      
    |   try {
    |   |   accntDeposit.deposit(amnt);
    |   |   accntWithdraw.withdraw(amnt);
    |   |   success = true;
    |   |   txCurrent.commit(false);
    |   |   System.out.println("Succeeded.");
    |   } catch (Throwable T) {
    |   |   System.out.println("Failed due to "+T);
    |   |   if (!success)
    |   |       txCurrent.rollback();
    |   }
    }
}

Bank IDL

The following is IDL describing the Bank and Account interfaces.

module txBank
{
    interface Account 
    {
    |   exception InsufficientFunds {};
    |   exception CannotDeposit {};
    |   exception CannotWithdraw {};
    |   exception BalanceNotAvailable {};
    |                                                                      
    |   string accountname();
    |   float deposit(in float sum) raises (CannotDeposit);
    |   float withdraw(in float sum) raises (InsufficientFunds, CannotWithdraw);
    |   float balance() raises (BalanceNotAvailable);
    };
                                                                           
    interface Bank
    {
    |   exception noSuchAccount {};
    |   exception AccountCreationError{};
    |                                                                      
    |   string bankname();
    |   long closedown();
    |   long newAccount() raises (AccountCreationError);
    |   Account getAccount(in long accountNum) raises (noSuchAccount);
    };
};


Copyright © 2003, 2004 Novell, Inc. All rights reserved. Copyright © 2001, 2002, 2003 SilverStream Software, LLC. All rights reserved.