This example illustrates how to implement a transactional object that use a database for storage. It also illustrates a distributed transaction across multiple processes. The example uses two server processes that provide "Bank" objects, and a client process that transfers funds from an account in one bank into an account in the other bank.
The source files are:
BankImpl // Implementation of the Bank object. AccountImpl // Implementation of the Account object. BankServerA // First Bank Server program. BankServerB // Second Bank Server program. BankClient // Client program that transfers funds.
Use the supplied ANT script to compile and execute the example programs. To compile run:
ant
To execute the test, you need to start the two Bank servers, and when the two Bank server programs are ready to receive requests you can run the Bank client program.
To start the servers, in separate console windows run:
ant servera ant serverb
When each server program prints out a message like "Bank ... open for business", start the client program by running:
ant client
This implementation uses a database table to store the activity for each account. If there is a transaction associated with the current thread, then the access to the database is via XA connection, which allows the creator of the transaction to control the outcome of the transaction. If there is no transaction associated with the current thread, then the database access is via normal JDBC connection, where each update is automatically committed.
package txBank; import java.io.File; import java.util.Date; import java.util.Hashtable; import java.text.SimpleDateFormat; import java.sql.*; import javax.sql.*; import javax.transaction.*; import javax.transaction.xa.*; import com.sssw.jts.api.TransactionService; import org.omg.CORBA.ORB; import org.omg.CORBA.Policy; import org.omg.PortableServer.POA; import org.omg.PortableServer.Servant; import txBank.BankPackage.noSuchAccount; import txBank.BankPackage.AccountCreationError; import common.Utils; import common.TestDataSource; import common.ResourceMapper; public class BankImpl extends BankPOA { ORB _orb; TransactionManager _tm; String _tableName; ResourceMapper _resmapper; TestDataSource _ds; POA _acctPOA; Hashtable _accounts; int _nextAccnt; private String _sqlCreate; private String _sqlUpdate; private String _sqlInsert; private String _sqlSelect; private String _sqlSelAll; static SimpleDateFormat _datefmt = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss.SSS"); BankImpl(ORB orb, POA bankPOA, String db, String tabname, ResourceMapper resmapper) throws Exception { | TransactionService ts = (TransactionService) | orb.resolve_initial_references("TransactionService"); | | | _orb = orb; | _tm = ts.getTransactionManager(); | _resmapper = resmapper; | _tableName = tabname; | _nextAccnt = 1; | _acctPOA = bankPOA; | _accounts = new Hashtable(); | _ds = new TestDataSource(db); | | _sqlCreate = "CREATE TABLE " + _tableName | + "(ACCNTID INTEGER" | + ",ACTIVITY NUMERIC(5,2)" | + ",DESCRIPTION VARCHAR(200)" | + ")"; | _sqlInsert = "INSERT INTO " + _tableName | + " (ACCNTID, ACTIVITY, DESCRIPTION) VALUES (?,?,?)"; | _sqlSelect = "SELECT ACTIVITY FROM " + _tableName | + " WHERE ACCNTID=?"; | _sqlSelAll = "SELECT ACCNTID, ACTIVITY FROM " + _tableName; | | if (!loadTable()) { | | createTable(); | } } public int closedown() { | try { | | _ds.shutdown(); | | _orb.shutdown(false); | } catch (Throwable T) { | | DbgOut("Shutdown problems ... "+T); | } | return 0; } public String bankname() { | return _tableName; } public Account getAccount(int acctNum) throws noSuchAccount { | Account account = null; | DbgOut("Lookup account "+acctNum); | try { | | account = (Account) _accounts.get(new Integer(acctNum)); | } catch (Throwable T) { | } | if (account == null) | throw new noSuchAccount(); | return account; } public synchronized int newAccount() throws AccountCreationError { | int id = _nextAccnt; | try { | | createAccnt(id); | } catch (Exception e) { | | throw new AccountCreationError(); | } | try { | | txAddActivity(id, 0); | } catch (Exception e) { | } | _nextAccnt = id + 1; | | DbgOut("Created new account "+id); | return id; } private void createAccnt(int accntid) throws Exception { | byte[] objectId = new byte[1]; | objectId[0] = (byte) accntid; | | Servant servant = new AccountImpl(this, accntid, 0); | _acctPOA.activate_object_with_id(objectId, servant); | Account account = (Account) _acctPOA.servant_to_reference(servant); | _accounts.put(new Integer(accntid), account); } void txAddActivity(int id, float amount) throws Exception { | Transaction tx = _tm.getTransaction(); | XAConnection xaconn = null; | XAResource xares = null; | Connection conn = null; | String descr = _datefmt.format(new Date()); | try { | | if (tx != null) { | | | xaconn = _ds.getXAConnection(); | | | xares = xaconn.getXAResource(); | | | conn = xaconn.getConnection(); | | | tx.enlistResource(xares); | | | _resmapper.addResource(xares); | | | String txname = tx.toString(); | | | if (txname.length() > 80) { | | | | txname = txname.substring(0, 80); | | | } | | | descr += " TX: "+txname; | | } else { | | | conn = _ds.getConnection(); | | | descr += " <no-transaction>"; | | } | | try { | | | PreparedStatement stmt = conn.prepareStatement(_sqlInsert); | | | try { | | | | stmt.setInt(1, id); | | | | stmt.setFloat(2, amount); | | | | stmt.setString(3, descr); | | | | int upd = stmt.executeUpdate(); | | | | DbgOut("Add activity: ("+id+", "+amount+", "+descr); | | | } finally { | | | | stmt.close(); | | | } | | } finally { | | | conn.close(); | | } | } catch (SQLException ex) { | | Utils.displayError("ERROR ACCESING TABLE : "+ex, ex); | | throw ex; | } finally { | | if (tx != null) { | | | if (xares != null) { | | | | try { | | | | | tx.delistResource(xares, xares.TMSUSPEND); | | | | } catch (Exception ex) { | | | | | Utils.displayError("ERROR : "+ex, ex); | | | | | throw ex; | | | | } | | | } | | } | } } float txReadBalance(int id) throws Exception { | float balance = 0; | Transaction tx = _tm.getTransaction(); | XAConnection xaconn = null; | XAResource xares = null; | Connection conn = null; | try { | | if (tx != null) { | | | xaconn = _ds.getXAConnection(); | | | xares = xaconn.getXAResource(); | | | conn = xaconn.getConnection(); | | | tx.enlistResource(xares); | | | _resmapper.addResource(xares); | | } else { | | | conn = _ds.getConnection(); | | } | | try { | | | PreparedStatement stmt = conn.prepareStatement(_sqlSelect); | | | ResultSet rset = null; | | | try { | | | | stmt.setInt(1, id); | | | | rset = stmt.executeQuery(); | | | | while (rset.next()) { | | | | | balance = balance + rset.getFloat(1); | | | | } | | | | DbgOut("read acct("+id+") balance("+balance+")"); | | | | rset.close(); | | | } finally { | | | | stmt.close(); | | | } | | } finally { | | | conn.close(); | | } | } catch (SQLException ex) { | | Utils.displayError("ERROR READING FROM TABLE : "+ex, ex); | | throw ex; | } finally { | | if (tx != null) { | | | if (xares != null) { | | | | try { | | | | | tx.delistResource(xares, xares.TMSUSPEND); | | | | } catch (Exception ex) { | | | | | Utils.displayError("ERROR : "+ex, ex); | | | | | throw ex; | | | | } | | | } | | } | } | return balance; } private boolean loadTable() { | boolean retv = false; | try { | | Connection conn = _ds.getConnection(); | | try { | | | Statement stmt = conn.createStatement(); | | | ResultSet rset = null; | | | try { | | | | try { | | | | | rset = stmt.executeQuery(_sqlSelAll); | | | | | retv = true; | | | | } catch (SQLException ex) { | | | | } | | | | if (rset != null) { | | | | | DbgOut("Loading account table "+_tableName); | | | | | while(rset.next()) { | | | | | | | | | | | | int accntid = rset.getInt(1); | | | | | | float amount = rset.getFloat(2); | | | | | | Integer accntkey = new Integer(accntid); | | | | | | if (_accounts.get(accntkey) == null) { | | | | | | | createAccnt(accntid); | | | | | | | DbgOut("Account : "+accntid+" : "+amount); | | | | | | | if (_nextAccnt <= accntid) { | | | | | | | | _nextAccnt = accntid+1; | | | | | | | } | | | | | | } else { | | | | | | | DbgOut("Activity: "+accntid+" : "+amount); | | | | | | } | | | | | } | | | | | DbgOut("Table Loaded"); | | | | | rset.close(); | | | | } | | | } finally { | | | | stmt.close(); | | | } | | } finally { | | | conn.close(); | | } | } catch (Exception ex) { | | Utils.displayError("ERROR LOADING TABLE : "+ex, ex); | } | return retv; } private boolean createTable() { | boolean retv = false; | try { | | Connection conn = _ds.getConnection(); | | try { | | | Statement stmt = conn.createStatement(); | | | try { | | | | stmt.execute(_sqlCreate); | | | | DbgOut("Created accounts table "+_tableName); | | | | retv = true; | | | } finally { | | | | stmt.close(); | | | } | | } finally { | | | conn.close(); | | } | } catch (SQLException ex) { | | Utils.displayError("ERROR CREATING TABLE : "+ex, ex); | } | return retv; } private static void DbgOut(String s) { | System.out.println("+ "+s); } }
This is a simple implementation that allows deposit and withdrawal of funds from an account. It checks for sufficient funds before withdrawing funds. Each activity, deposit or withdrawal, is recorded separately. The balance is always computed as a sum of all activities on an account.
package txBank; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import org.omg.CORBA.PERSIST_STORE; import txBank.AccountPackage.CannotDeposit; import txBank.AccountPackage.CannotWithdraw; import txBank.AccountPackage.InsufficientFunds; import txBank.AccountPackage.BalanceNotAvailable; import common.TestDataSource; public class AccountImpl extends AccountPOA { int _id; BankImpl _bank; public AccountImpl(BankImpl bank, int id, float balance) { | _id = id; | _bank = bank; } public String accountname() { | return _bank.bankname()+".#."+_id; } public synchronized float deposit(float sum) throws CannotDeposit { | try { | | writeActivity(sum); | | return balance(); | } catch (Exception ex) { | | throw new CannotDeposit(); | } } public synchronized float withdraw(float sum) throws InsufficientFunds, CannotWithdraw { | try { | | float balance = balance() - sum; | | if (balance < 0) | | throw new InsufficientFunds(); | | | | writeActivity(0-sum); | | return balance(); | } catch (InsufficientFunds ex) { | | throw ex; | } catch (Exception ex) { | | throw new CannotWithdraw(); | } } public synchronized float balance() throws BalanceNotAvailable { | try { | | return readBalance(); | } catch (Exception ex) { | | throw new BalanceNotAvailable(); | } } void writeActivity(float amount) throws Exception { | _bank.txAddActivity(_id, amount); } float readBalance() throws Exception { | return _bank.txReadBalance(_id); } }
Server program that creates and activates a Bank object. It demonstrates creating
a POA with OTSPolicy
.
package txBank; import java.util.Properties; import org.omg.CORBA.Any; import org.omg.CORBA.ORB; import org.omg.CORBA.Object; import org.omg.CORBA.Policy; import org.omg.PortableServer.POA; import org.omg.PortableServer.Servant; import org.omg.PortableServer.LifespanPolicyValue; import org.omg.PortableServer.IdAssignmentPolicyValue; import org.omg.PortableServer.ImplicitActivationPolicyValue; import org.omg.CosTransactions.ADAPTS; import org.omg.CosTransactions.SHARED; import org.omg.CosTransactions.OTS_POLICY_TYPE; import org.omg.CosTransactions.INVOCATION_POLICY_TYPE; import com.sssw.jts.api.TransactionService; import com.sssw.jbroker.api.transaction.TSIdentification; import common.Utils; import common.ResourceMapper; import common.TestDataSource; public class BankServerA { public static void main(String args[]) throws Exception { | mainBody(args, "bankA", TestDataSource._testDBkA, "FleeceNationalBank"); } static void mainBody(String args[], String iorName, String dbName, String bankName) throws Exception { | // Create a ResourceHandleMapper | ResourceMapper resMapper = new ResourceMapper(); | | System.setProperty("transaction.service.id", bankName); | | // Initialize the ORB and get instance of TM | ORB orb = ORB.init(args, null); | TransactionService ts = (TransactionService) | orb.resolve_initial_references("TransactionService"); | | // | // Recover TM. | // | ts.recover(resMapper, false, null); | | // get the root POA | POA rootPOA = (POA) orb.resolve_initial_references("RootPOA"); | | // create the POA for Transient Account Objects | Any otsPolicy = orb.create_any(); | otsPolicy.insert_short(ADAPTS.value); | | Any invPolicy = orb.create_any(); | invPolicy.insert_short(SHARED.value); | | POA bankPOA = rootPOA.create_POA("poa_"+bankName, | rootPOA.the_POAManager(), | new Policy[] { | | rootPOA.create_id_assignment_policy(IdAssignmentPolicyValue.USER_ID) | | , rootPOA.create_lifespan_policy(LifespanPolicyValue.TRANSIENT) | | , orb.create_policy(OTS_POLICY_TYPE.value, otsPolicy) | | , orb.create_policy(INVOCATION_POLICY_TYPE.value, invPolicy) | } | ); | bankPOA.the_POAManager().activate(); | | Servant servant; | org.omg.CORBA.Object bankobj; | | servant = new BankImpl(orb, bankPOA, dbName, bankName, resMapper); | bankPOA.activate_object_with_id(("bank_"+bankName).getBytes(), servant); | bankobj = bankPOA.servant_to_reference(servant); | | String bankAIOR = orb.object_to_string(bankobj); | | // write the stringified object reference | Utils.writeIOR(bankAIOR, iorName+".ior"); | | System.out.println("+ "+bankName+" is now open for business ..."); | orb.run(); } }
Extends BankServerA
for initializing the Bank object with a different
name and database.
package txBank; import common.TestDataSource; public class BankServerB extends BankServerA { public static void main(String args[]) throws Exception { | mainBody(args, "bankB", TestDataSource._testDBkB, "InsolventBank"); } }
This the client program that establishes connection with the two Bank servers, and transfers funds from one account to another in a distributed transaction.
package txBank; import java.io.*; import java.util.*; import org.omg.CORBA.ORB; import org.omg.CosTransactions.Current; import com.sssw.jts.api.TransactionService; import com.sssw.jbroker.api.transaction.TSIdentification; import common.Utils; import common.ResourceMapper; import txBank.BankPackage.noSuchAccount; import txBank.BankPackage.AccountCreationError; public class BankClient { public static void main(String args[]) throws Exception { | // Create a ResourceHandleMapper | ResourceMapper resMapper = new ResourceMapper(); | | System.setProperty("transaction.service.id", "BankClient"); | | // Initialize the ORB and get instance of TM | ORB orb = ORB.init(args, null); | TransactionService ts = (TransactionService) | orb.resolve_initial_references("TransactionService"); | | // | // Recover TM | // | ts.recover(resMapper, false, null); | | // | // Locate BankA and BankB | // | String bankIOR; | | bankIOR = Utils.readIOR("bankA.ior"); | Bank bankA = BankHelper.narrow(orb.string_to_object(bankIOR)); | System.out.println("+ Connected to : "+bankA.bankname()); | | bankIOR = Utils.readIOR("bankB.ior"); | Bank bankB = BankHelper.narrow(orb.string_to_object(bankIOR)); | System.out.println("+ Connected to : "+bankB.bankname()); | | // | // Make sure accounts exists | // | Account accntWithdraw; | Account accntDeposit; | | try { | | accntWithdraw = bankA.getAccount(1); | } catch (noSuchAccount ex) { | | accntWithdraw = bankA.getAccount(bankA.newAccount()); | | accntWithdraw.deposit(100); | } | | float diff = 100 - accntWithdraw.balance(); | if (diff > 0) { | | System.out.println("+ Adding funds to "+accntWithdraw.accountname()+" "+diff); | | accntWithdraw.deposit(diff); | } | try { | | accntDeposit = bankB.getAccount(1); | } catch (noSuchAccount ex) { | | accntDeposit = bankB.getAccount(bankB.newAccount()); | | accntDeposit.deposit(100); | } | | Current txCurrent = (Current) | orb.resolve_initial_references("TransactionCurrent"); | | System.out.println("+ Staring balances"); | System.out.println("+ Account: "+accntWithdraw.accountname()+" Balance: "+accntWithdraw.balance()); | System.out.println("+ Account: "+accntDeposit.accountname()+" Balance: "+accntDeposit.balance()); | System.out.println(); | | transfer(txCurrent, accntWithdraw, accntDeposit, 75); | | System.out.println(); | System.out.println("+ Balances after first transfer"); | System.out.println("+ Account: "+accntWithdraw.accountname()+" Balance: "+accntWithdraw.balance()); | System.out.println("+ Account: "+accntDeposit.accountname()+" Balance: "+accntDeposit.balance()); | System.out.println(); | | transfer(txCurrent, accntWithdraw, accntDeposit, 50); | | System.out.println(); | System.out.println("+ Balances after second transfer"); | System.out.println("+ Account: "+accntWithdraw.accountname()+" Balance: "+accntWithdraw.balance()); | System.out.println("+ Account: "+accntDeposit.accountname()+" Balance: "+accntDeposit.balance()); | // try { bankA.closedown(); } catch (Throwable T) {} // try { bankB.closedown(); } catch (Throwable T) {} } static void transfer(Current txCurrent, Account accntWithdraw, Account accntDeposit, float amnt) throws Exception { | boolean success = false; | txCurrent.begin(); | | System.out.print("+ Transfer of "+amnt+" from [" | + accntWithdraw.accountname() + "] to [" + accntDeposit.accountname() + "] "); | | try { | | accntDeposit.deposit(amnt); | | accntWithdraw.withdraw(amnt); | | success = true; | | txCurrent.commit(false); | | System.out.println("Succeeded."); | } catch (Throwable T) { | | System.out.println("Failed due to "+T); | | if (!success) | | txCurrent.rollback(); | } } }
The following is IDL describing the Bank
and Account
interfaces.
module txBank { interface Account { | exception InsufficientFunds {}; | exception CannotDeposit {}; | exception CannotWithdraw {}; | exception BalanceNotAvailable {}; | | string accountname(); | float deposit(in float sum) raises (CannotDeposit); | float withdraw(in float sum) raises (InsufficientFunds, CannotWithdraw); | float balance() raises (BalanceNotAvailable); }; interface Bank { | exception noSuchAccount {}; | exception AccountCreationError{}; | | string bankname(); | long closedown(); | long newAccount() raises (AccountCreationError); | Account getAccount(in long accountNum) raises (noSuchAccount); }; };
Copyright © 2003, 2004 Novell, Inc. All rights reserved. Copyright © 2001, 2002, 2003 SilverStream Software, LLC. All rights reserved.