This example illustrates how to implement CORBA recoverable resource objects for a transctional application. In this implementation, updates to each account in a transaction are represented as activity objects. Activity is a resource that participates in a transaction, and based on the transaction outcome the Activity object updates the account balance.
There are two database tables used for storage, the first, Accounts
table, keeps track of account activities, and the second, TXInfo
table, keeps track of prepared resources for recovery. The Accounts
table has a TXInfoID
field that associates an activity with a
transaction. If this field is zero, then the activity is considered
permanent. If it is non-zero then the activity belongs to in a
transaction that was prepared.
When deposits or withdrawals are made in a transaction, they are kept track of in an activity object and do not reflect on the permanent balance of the account.
When the transaction is rolled back, the activity object is released and the account permanent balance is not effected by the transaction. There are no updates to the database tables.
When the transaction is committed with one phase, the activity record
is written to the Accounts
table as a permanent record and the
account permanent balance is updated.
When the transaction is completed with two phase commit, during the
prepare stage a transaction recovery record is written to the TXInfo
table, and the activity is written to the Accounts
table with
associated link to the TXInfo
table in its TXInfoID
field. If the
final outcome of the transction is to commit then, the the TXInfoID
field of the activity record in the Accounts
table is cleared, the
transaction record is deleted from the TXInfo
table, and the account
permanent balance is updated. If the final outcome is to rollback
then the activity record in the Accounts
table, and the transaction
record in the TXInfo
table are deleted.
During startup of the bank servers the TXInfo
table is used for
recovery. All entries in that table are considered transactions that
were prepared but not completed. The RecoveryCoordinator
for each
transaction is located and the Accounts
table is updated based on the
completion state of the transaction.
The source files are:
bank.idl // IDL describing the Bank and Account objects AccountImpl.java // Implementation of the Account object. ActivityImpl.java // Implementation of the Activity Resource object. BankImpl.java // Implementation of the Bank object. BankClient.java // Client program that transfers funds. BankServerA.java // First Bank Server program. BankServerB.java // Second Bank Server program. TableBase.java // Abstract class for DB access. TableAccount.java // Accounts table access class. TableTXInfo.java // Transaction Information table access class.
Run the build script to compile and execute the example programs. Use the supplied ANT build script to compile:
ant
To execute the test, you need to start the two bank servers, and when the two bank server programs are ready to receive requests you can run the bank client program.
To start the servers, in separate console windows run:
ant servera ant serverb
When each server program prints out a message like "Bank ... open for business", start the client program by running:
ant client
package resBank; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.Hashtable; import org.omg.CORBA.ORB; import org.omg.CORBA.PERSIST_STORE; import resBank.AccountPackage.CannotDeposit; import resBank.AccountPackage.CannotWithdraw; import resBank.AccountPackage.InsufficientFunds; import resBank.AccountPackage.BalanceNotAvailable; import org.omg.CosTransactions.Vote; import org.omg.CosTransactions.Control; import org.omg.CosTransactions.Current; import org.omg.CosTransactions.Resource; import org.omg.CosTransactions.Coordinator; import org.omg.CosTransactions.Unavailable; import org.omg.CosTransactions.NotPrepared; import org.omg.CosTransactions.HeuristicMixed; import org.omg.CosTransactions.HeuristicHazard; import org.omg.CosTransactions.HeuristicCommit; import org.omg.CosTransactions.HeuristicRollback; import common.Utils; import common.TestDataSource; /* * Implementation of the Account interface. * It keeps track of the current account balance * and transactional activity balance. The account * balance is based on the current balance plus * any activity that belongs to the current transaction * if one is present. * * If the Transaction prepared the activity is stored * with a transaction information. When the Transaction * is then commited the activity is made permanent by clearing * the TXINFOID field in that row. If the Transaction is * rolled back after a prepare, the activity row is deleted. * * If the Transaciton is completed with one-phase commit * the activity is stored as permanent record. * * If the Transction is rolledback without prepare, then * there are no updates to the storage. */ public class AccountImpl extends AccountPOA { int _id; BankImpl _bank; Account _account; float _balance; Hashtable _txActivity; public AccountImpl(BankImpl bank, int id, float balance) { | _id = id; | _bank = bank; | _balance = balance; | _txActivity = new Hashtable(); } ORB getORB() { | return _bank.getORB(); } Account getReference() { | return _account; } void setReference(Account account) { | _account = account; } public String accountname() { | return _bank.bankname()+".#."+_id; } public synchronized float deposit(float amnt) throws CannotDeposit { | float balance = _balance + amnt; | ActivityImpl activity; | | try { | | activity = getTXActivity(); | } catch (Exception ex) { | | Utils.dp("Deposit: Unexpected error "+ex, ex); | | throw new CannotDeposit(); | } | | if (activity != null) { | | // Transactional Deposit | | balance += activity._balance; | | activity._balance += amnt; | } else { | | // Non-Transactional Deposit | | try { | | | _bank.insertActivity(_id, amnt, "credit", 0); | | | _balance += amnt; | | } catch (Exception ex) { | | | Utils.dp("Deposit: Unexpected error "+ex, ex); | | | throw new CannotDeposit(); | | } | } | Utils.ifdp("Deposit "+amnt+" -> "+balance); | return balance; } public synchronized float withdraw(float amnt) throws InsufficientFunds, CannotWithdraw { | float balance = _balance - amnt; | ActivityImpl activity; | | try { | | activity = getTXActivity(); | } catch (Exception ex) { | | Utils.dp("Withdraw: Unexpected error "+ex,ex); | | throw new CannotWithdraw(); | } | | if (activity != null) { | | // Transactional Withdrawal | | balance += activity._balance; | | if (balance >= 0) | | activity._balance -= amnt; | | | } else if (balance >= 0) { | | // Non-Transactional Withdrawal | | try { | | | _bank.insertActivity(_id, (0-amnt), "debit", 0); | | | _balance -= amnt; | | } catch (Exception ex) { | | | Utils.dp("Withdraw: Unexpected error "+ex,ex); | | | throw new CannotWithdraw(); | | } | } | | if (balance < 0) | throw new InsufficientFunds(); | | Utils.ifdp("Withdraw "+amnt+" -> "+balance); | return balance; } public synchronized float balance() throws BalanceNotAvailable { | return _balance; } void adjustBalance(float amount) { | _balance += amount; } void releaseActivity(int hash, ActivityImpl activity) { | Integer key = new Integer(hash); | ActivityImpl act = (ActivityImpl) _txActivity.get(key); | if (act != null) { | | act = act.remove(activity); | | if (act != null) | | _txActivity.put(key, act); | } } private ActivityImpl getTXActivity() throws Exception { | ActivityImpl activity; | Control control = _bank._ts.getTransactionCurrent().get_control(); | | Coordinator coord = null; | if (control != null) { | | try { | | | coord = control.get_coordinator(); | | } catch (Unavailable ex) { | | | if (Utils._debug) Utils.dp("getTXActivity: Unavailable",ex); | | } | } | if (coord == null) { | | Utils.ifdp("No transction"); | | return null; | } | | int hash = coord.hash_top_level_tran(); | Integer key = new Integer(hash); | ActivityImpl act = (ActivityImpl) _txActivity.get(key); | Resource res = null; | if (act == null) { | | // Transaciton not in hashtable | | Utils.ifdp("new Transction"); | | activity = new ActivityImpl(null, hash, this, coord); | | res = (Resource)_bank.activateServant(activity.getID(), activity); | | _txActivity.put(key, activity); | } else { | | // Transaction is in the hash bucket | | // Find the exact one. | | Utils.ifdp("found Transction"); | | activity = act.find(coord); | | if (activity == null) { | | | // Add to this hash bucket. | | | Utils.ifdp("new Transction"); | | | activity = new ActivityImpl(null, hash, this, coord); | | | res = (Resource)_bank.activateServant(activity.getID(), activity); | | | act.append(activity); | | } | } | if (res != null) { | | activity.setRecoveryCoord(coord.register_resource(res)); | } | return activity; } }
package resBank; import java.io.IOException; import java.io.Serializable; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import org.omg.CORBA.ORB; import org.omg.CosTransactions.Vote; import org.omg.CosTransactions.Status; import org.omg.CosTransactions.Resource; import org.omg.CosTransactions.Coordinator; import org.omg.CosTransactions.NotPrepared; import org.omg.CosTransactions.HeuristicMixed; import org.omg.CosTransactions.HeuristicHazard; import org.omg.CosTransactions.HeuristicCommit; import org.omg.CosTransactions.HeuristicRollback; import org.omg.CosTransactions.RecoveryCoordinator; import common.Utils; import common.TestDataSource; /* * Implementation of the Activity interface that extends * CosTransacitons.Resource interface. This object represents * the activity to an account during a transaction. It is * used to keep track of individual transaction outcomes, as * well as, providing recovery during startup. * * A Resource can be registered with one-and-only-one Transaction. * It is up to the application to keep track of Resources that are * registered in Transactions, an to make sure a Resource is only * registered once with a Transaction and that it is not reused. * In this example the AccountImpl object keeps track of the * Activity-Resource to Transaction mapping. * */ class ActivityImpl extends ActivityPOA implements Serializable, Runnable { ActivityImpl _next = null; int _hash = 0; AccountImpl _account = null; int _accntID = 0; float _balance = 0; String _refobjid = null; Coordinator _coord = null; RecoveryCoordinator _recCoord = null; Resource _resource = null; String _recCoordIOR = null; boolean _recovered = false; boolean _prepared = false; int _preptxid = 0; boolean _prepcommit = false; public ActivityImpl() { } ActivityImpl(ActivityImpl next, int hash, AccountImpl account, Coordinator coord) { | _next = next; | _hash = hash; | _account = account; | _accntID = account._id; | _balance = 0; | _refobjid = genID(); | _coord = coord; | _recovered = true; } ORB getORB() { | return _account.getORB(); } void setRecoveryCoord(RecoveryCoordinator recCoord) { | _recCoord = recCoord; | _recCoordIOR = getORB().object_to_string(recCoord); } void recover() { | Thread thread = new Thread(this); | thread.setDaemon(true); | thread.start(); } public void run() { | for (;;) | { | | Utils.ifdp("Recovery for "+_preptxid+" start ..."); | | | | // Make sure we have a recovery coordinator | | if (_recCoord == null) { | | | try { | | | | Utils.ifdp("Recovery resolve "+_recCoordIOR); | | | | _recCoord = (RecoveryCoordinator) | | | | getORB().string_to_object(_recCoordIOR); | | | } catch (org.omg.CORBA.COMM_FAILURE ex) { | | | | Utils.ifdp("Recovery RecCoord COMM_FAILURE"); | | | | // Cant be found...try later | | | } catch (org.omg.CORBA.OBJECT_NOT_EXIST ex) { | | | | // Does not exists... | | | | Utils.ifdp("Recovery RecCoord OBJECT_NOT_EXIST ... presumed rollback"); | | | | rollbackWork(); | | | | return; | | | } | | } | | | | if (_recCoord != null) { | | | try { | | | | Utils.ifdp("Recovery find status ... "); | | | | Status status = _recCoord.replay_completion(_resource); | | | | switch (status.value()) { | | | | | case Status._StatusActive: | | | | | case Status._StatusCommitting: | | | | | case Status._StatusPrepared: | | | | | case Status._StatusPreparing: | | | | | case Status._StatusRollingBack: | | | | | case Status._StatusMarkedRollback: | | | | | case Status._StatusCommitted: | | | | | case Status._StatusUnknown: | | | | | // Coordinator still active ... | | | | | // Wait to be called. | | | | | Utils.ifdp("Recovery Status Wait to be called back"); | | | | | return; | | | | | | | | | | case Status._StatusRolledBack: | | | | | case Status._StatusNoTransaction: | | | | | Utils.ifdp("Recovery Status Rolledback"); | | | | | rollbackWork(); | | | | | return; | | | | } | | | } catch (NotPrepared ex) { | | | | // Wait for the ccordinator to call us | | | | Utils.ifdp("Recovery NotPrepared"); | | | | break; | | | } catch (Exception ex) { | | | | // try later... | | | } | | } | | | | // Wait to try later... | | try { | | | Utils.ifdp("Recovery for "+_preptxid+" sleep ..."); | | | Thread.sleep(10000); // sleep for 10 seconds | | } catch (InterruptedException ex) { | | | break; | | } | } } static long _incr = System.currentTimeMillis(); static private synchronized String genID() { | return "ACT-"+_incr++; } byte[] getID() { | return _refobjid.getBytes(); } private void deactivate() { | if (_account != null) { | | _account.releaseActivity(_hash, this); | | if (_resource != null) { | | | try { _account._bank.deactivateObject(_resource); } catch(Exception e) {} | | } | | _resource = null; | } } // // Activity hash bucket chain // void append(ActivityImpl newactivity) { | ActivityImpl activity = this; | while(activity._next != null) { | | activity = activity._next; | } | activity._next = newactivity; | newactivity._next = null; } ActivityImpl remove(ActivityImpl other) { | if (other == this) { | | return _next; | } | for(ActivityImpl curr = this; curr != null; curr = curr._next) { | | if (curr._next == other) { | | | curr._next = other._next; | | | break; | | } | } | return null; } ActivityImpl find(Coordinator coord) { | ActivityImpl activity = this; | while(activity != null) { | | if (coord.is_same_transaction(activity._coord)) | | return activity; | | activity = activity._next; | } | return null; } // // CosTransactions.Resource methods // public synchronized org.omg.CosTransactions.Vote prepare() throws HeuristicMixed, HeuristicHazard { | Utils.ifdp("Prepare: prepd="+_prepared+" bal="+_balance); | if (_prepared) { | | throw new HeuristicHazard(); | } | if (_balance == 0) { | | Utils.ifdp("Prepare -> VoteReadOnly"); | | return Vote.VoteReadOnly; | } | | // Write self into TXINFO table | try { | | Utils.ifdp("Prepare: save TXInfo"); | | _preptxid = _account._bank.insertTXInfo(this); | } catch (Throwable ex) { | | Utils.ifdp("Prepare: unexpected error: "+ex, ex); | | throw new HeuristicHazard(); | } | | _prepared = true; | | // Write activity intto ACCOUNTS table | try { | | Utils.ifdp("Prepare: save activity tx="+_preptxid); | | _account._bank.insertActivity(_accntID, _balance, | | "TX-"+_preptxid, _preptxid); | } catch (Throwable ex) { | | Utils.ifdp("Prepare: unexpected error: "+ex, ex); | | Utils.ifdp("Prepare -> VoteRollback"); | | return Vote.VoteRollback; | } | _prepcommit = true; | Utils.ifdp("Prepare -> VoteCommit"); | return Vote.VoteCommit; } private synchronized boolean rollbackWork() { | boolean haveerr = false; | // Undo/delete prepared rows... | try { | | _account._bank.deleteActivity(_preptxid); | } catch (Exception ex) { | | Utils.ifdp("Rollback: delete activity error: "+ex, ex); | | haveerr = true; | } | try { | | _account._bank.deleteTXInfo(_preptxid); | } catch (Exception ex) { | | Utils.ifdp("Rollback: delete TXInfo error: "+ex, ex); | | haveerr = true; | } | return haveerr; } public synchronized void rollback() throws HeuristicCommit, HeuristicMixed, HeuristicHazard { | Utils.ifdp("Rollback: prepd="+_prepared); | try { | | boolean haveerr = false; | | if (_prepared) | | haveerr = rollbackWork(); | | | | if (haveerr) | | throw new HeuristicHazard(); | } finally { | | deactivate(); | } } public synchronized void commit_one_phase() throws HeuristicHazard { | Utils.ifdp("Commit_One_Phase: prepd="+_prepared); | try { | | if (_prepared) { | | | try { rollback(); } catch(Exception ex) {} | | | throw new HeuristicHazard(); | | } | | // insert permanent activity ... | | try { | | | _account._bank.insertActivity(_accntID, _balance, | | | "TX-"+_preptxid, 0); | | | _account.adjustBalance(_balance); | | } catch (Exception ex) { | | | Utils.ifdp("Commit_One_Phase: insert activity error: "+ex, ex); | | | throw new HeuristicHazard(); | | } | } finally { | | deactivate(); | } } public synchronized void commit() throws NotPrepared, HeuristicRollback, HeuristicMixed, HeuristicHazard { | Utils.ifdp("Commit: prepd="+_prepared+" vote-commit="+_prepcommit); //TEST RECOVERY*/ System.exit(0); | try { | | if (!_prepared) | | throw new NotPrepared(); | | | | if (!_prepcommit) { | | | try { rollback(); } catch(Exception ex) {} | | | throw new HeuristicRollback(); | | } | | | | // make activity permanent ... | | try { | | | _account._bank.updateActivity(_preptxid); | | | _account.adjustBalance(_balance); | | } catch (Exception ex) { | | | Utils.ifdp("Commit: update activity error: "+ex, ex); | | | throw new HeuristicHazard(); | | } finally { | | | // delete TX info ... | | | try { | | | | _account._bank.deleteTXInfo(_preptxid); | | | } catch (Exception ex) { | | | | Utils.ifdp("Commit: delete TXInfo error: "+ex, ex); | | | } | | } | } finally { | | deactivate(); | } } public synchronized void forget() { | Utils.ifdp("Forget: "); | // Forget about it... | if (_preptxid != 0) { | | try {_account._bank.deleteActivity(_preptxid);} catch (Exception ex) {} | | try {_account._bank.deleteTXInfo(_preptxid);} catch (Exception ex) {} | } } // // Serializable // private void writeObject(ObjectOutputStream out) throws IOException { | out.writeInt(_accntID); | out.writeFloat(_balance); | out.writeUTF(_refobjid); | out.writeUTF(_recCoordIOR); } private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { | _accntID = in.readInt(); | _balance = in.readFloat(); | _refobjid = in.readUTF(); | _recCoordIOR = in.readUTF(); } }
package resBank; import java.io.File; import java.io.Serializable; import java.util.Date; import java.util.Stack; import java.util.Hashtable; import java.text.SimpleDateFormat; import java.sql.*; import javax.sql.*; import javax.transaction.*; import javax.transaction.xa.*; import org.omg.CORBA.ORB; import org.omg.CORBA.Policy; import org.omg.CORBA.Object; import org.omg.PortableServer.POA; import org.omg.PortableServer.Servant; import org.omg.CosTransactions.Resource; import com.sssw.jts.api.TransactionService; import common.Utils; import common.TestDataSource; import common.ResourceMapper; import resBank.BankPackage.noSuchAccount; import resBank.BankPackage.AccountCreationError; public class BankImpl extends BankPOA { ORB _orb; TransactionService _ts; POA _acctPOA; Hashtable _accounts; ResourceMapper _resmapper; TestDataSource _ds; String _tableName; TableAccount _tabAccnt; TableTXInfo _tabTxInfo; Stack _recovery; static SimpleDateFormat _datefmt = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss.SSS"); BankImpl(ORB orb, POA bankPOA, String db, String tabname, ResourceMapper resmapper) throws Exception { | _orb = orb; | _ts = (TransactionService) | orb.resolve_initial_references("TransactionService"); | _resmapper = resmapper; | _tableName = tabname; | _acctPOA = bankPOA; | _accounts = new Hashtable(); | _ds = new TestDataSource(db); | _tabAccnt = new TableAccount(_ds, this); | _tabTxInfo = new TableTXInfo(_ds, this); | _recovery = new Stack(); | Utils.dp("Initializing Accounts"); | _tabAccnt.initialize(); | | Utils.dp("Initializing TXInfo"); | _tabTxInfo.initialize(); | | // Start recovery of TXInfo... | recoverTXInfo(); } // // Public .... // public int closedown() { | try { | | _ds.shutdown(); | | _orb.shutdown(false); | } catch (Throwable T) { | | Utils.dp("Shutdown problems ... "+T,T); | } | return 0; } public String bankname() { | return _tableName; } public Account getAccount(int acctNum) throws noSuchAccount { | AccountImpl account = null; | Utils.dp("Lookup account "+acctNum); | try { | | account = (AccountImpl) _accounts.get(new Integer(acctNum)); | } catch (Throwable T) { | } | if (account == null) | throw new noSuchAccount(); | | return account.getReference(); } public synchronized int newAccount() throws AccountCreationError { | int id = _tabAccnt.getNextID(); | | try { | | createAccnt(id); | } catch (Exception e) { | | throw new AccountCreationError(); | } | try { | | insertActivity(id, 0, "New account", 0); | } catch (Exception e) { | | Utils.dp("newAccount: insert activity "+e,e); | } | | Utils.dp("Created new account "+id); | return id; } // // Private level // private AccountImpl createAccnt(int accntid) throws Exception { | byte[] objectId = new byte[1]; | objectId[0] = (byte) accntid; | | AccountImpl acctount = new AccountImpl(this, accntid, 0); | acctount.setReference((Account) activateServant(objectId, acctount)); | _accounts.put(new Integer(accntid), acctount); | return acctount; } // // package level // ORB getORB() { | return _orb; } org.omg.CORBA.Object activateServant(byte[] objectId, Servant servant) throws Exception { | _acctPOA.activate_object_with_id(objectId, servant); | return _acctPOA.servant_to_reference(servant); } void deactivateObject(org.omg.CORBA.Object obj) throws Exception { | _acctPOA.deactivate_object(_acctPOA.reference_to_id(obj)); } void loadAccount(int accntid, float amount, String descr, int txinfoid) throws Exception { | AccountImpl account = null; | account = (AccountImpl) _accounts.get(new Integer(accntid)); | if (account == null) { | | account = createAccnt(accntid); | } else if (txinfoid == 0) { | | account.adjustBalance(amount); | } } void insertActivity(int accntid, float amount, String descr, int txinfoid) throws Exception { | descr = "[" + _datefmt.format(new Date()) + "] " + descr; | | _tabAccnt.insertRow(accntid, amount, descr, txinfoid); } void deleteActivity(int txinfoid) throws Exception { | _tabAccnt.deleteRow(txinfoid); } void updateActivity(int txinfoid) throws Exception { | _tabAccnt.updateRow(txinfoid); } void recoverTXInfo() { | while (!_recovery.empty()) { | | ActivityImpl activity = (ActivityImpl) _recovery.pop(); | | | | Utils.ifdp("recover ["+activity._preptxid+"]["+activity._accntID+"]"); | | if (activity._account == null) { | | | _tabTxInfo.deleteRow(activity._preptxid); | | } else { | | | activity.recover(); | | } | } } void loadTXInfo(int txid, ActivityImpl activity) throws Exception { | int acctNum = _tabAccnt.selectTXID(txid); | AccountImpl account; | | account = (AccountImpl) _accounts.get(new Integer(acctNum)); | | Utils.ifdp("tbTXinfo-loadRow: ["+txid+"]["+acctNum+"]["+account+"]"); | | if (account != null) { | | activity._account = account; | | activity._accntID = acctNum; | | activity._resource = (Resource)activateServant(activity.getID(), activity); | | activity._prepared = true; | } | | activity._preptxid = txid; | _recovery.push(activity); } int insertTXInfo(ActivityImpl act) throws Exception { | int id = _tabTxInfo.getNextID(); | _tabTxInfo.insertRow(id, act); | return id; } void deleteTXInfo(int txinfoid) throws Exception { | _tabTxInfo.deleteRow(txinfoid); } // // TXInfo... // static void DbgOut(String s) { | System.out.println("+ "+s); } }
Server program that creates and activates a Bank object. It demonstrates creating
a POA with OTSPolicy
.
package resBank; import java.util.Properties; import org.omg.CORBA.Any; import org.omg.CORBA.ORB; import org.omg.CORBA.Object; import org.omg.CORBA.Policy; import org.omg.PortableServer.POA; import org.omg.PortableServer.Servant; import org.omg.PortableServer.LifespanPolicyValue; import org.omg.PortableServer.IdAssignmentPolicyValue; import org.omg.PortableServer.ImplicitActivationPolicyValue; import org.omg.CosTransactions.ADAPTS; import org.omg.CosTransactions.SHARED; import org.omg.CosTransactions.OTS_POLICY_TYPE; import org.omg.CosTransactions.INVOCATION_POLICY_TYPE; import com.sssw.jbroker.api.transaction.TSIdentification; import com.sssw.jts.api.TransactionService; import common.Utils; import common.ResourceMapper; import common.TestDataSource; public class BankServerA { public static void main(String args[]) throws Exception { | mainBody(args, "bankA", TestDataSource._testDBkA, "FleeceNationalBank", "44441"); } static void mainBody(String args[], String iorName, String dbName, String bankName, String orbPort) throws Exception { | // Create a ResourceHandleMapper | ResourceMapper resMapper = new ResourceMapper(); | | System.setProperty("transaction.service.id", bankName); | | // Initialize the ORB and get instance of TM | System.setProperty("ORBPort", orbPort); | ORB orb = ORB.init(args, null); | TransactionService ts = (TransactionService) | orb.resolve_initial_references("TransactionService"); | | // | // Recover TM. | // | ts.recover(resMapper, false, null); | | // get the root POA | POA rootPOA = (POA) orb.resolve_initial_references("RootPOA"); | | // create the POA for Transient Account Objects | Any otsPolicy = orb.create_any(); | otsPolicy.insert_short(ADAPTS.value); | | Any invPolicy = orb.create_any(); | invPolicy.insert_short(SHARED.value); | | POA bankPOA = rootPOA.create_POA("poa_"+bankName, | rootPOA.the_POAManager(), | new Policy[] { | | rootPOA.create_id_assignment_policy(IdAssignmentPolicyValue.USER_ID), | | rootPOA.create_lifespan_policy(LifespanPolicyValue.TRANSIENT), | | orb.create_policy(OTS_POLICY_TYPE.value, otsPolicy), | | orb.create_policy(INVOCATION_POLICY_TYPE.value, invPolicy) | } | ); | bankPOA.the_POAManager().activate(); | | Servant servant; | org.omg.CORBA.Object bankobj; | | servant = new BankImpl(orb, bankPOA, dbName, bankName, resMapper); | bankPOA.activate_object_with_id(("bank_"+bankName).getBytes(), servant); | bankobj = bankPOA.servant_to_reference(servant); | | String bankAIOR = orb.object_to_string(bankobj); | | // write the stringified object reference | Utils.writeIOR(bankAIOR, iorName + ".ior"); | | System.out.println("+ " + bankName + " is now open for business ..."); | orb.run(); } }
Extends BankServerA
for initializing the Bank object with a
different name and database.
package resBank; import common.TestDataSource; public class BankServerB extends BankServerA { public static void main(String args[]) throws Exception { | mainBody(args, "bankB", TestDataSource._testDBkB, "InsolventBank", "44442"); } }
This the client program that establishes connection with the two bank servers, and transfers funds from one account to another in a distributed transaction.
package resBank; import java.io.*; import java.util.*; import org.omg.CORBA.ORB; import org.omg.CosTransactions.Current; import com.sssw.jbroker.api.transaction.TSIdentification; import com.sssw.jts.api.TransactionService; import common.Utils; import common.ResourceMapper; import resBank.BankPackage.noSuchAccount; import resBank.BankPackage.AccountCreationError; public class BankClient { public static void main(String args[]) throws Exception { | // Create a ResourceHandleMapper | ResourceMapper resMapper = new ResourceMapper(); | | // Initialize the ORB and get instance of TM | ORB orb = ORB.init(args, null); | TransactionService ts = (TransactionService) | orb.resolve_initial_references("TransactionService"); | | // | // Recover TM | // | ts.recover(resMapper, false, null); | | // | // Locate BankA and BankB | // | String bankIOR; | | bankIOR = Utils.readIOR("bankA.ior"); | Bank bankA = BankHelper.narrow(orb.string_to_object(bankIOR)); | System.out.println("+ Connected to : "+bankA.bankname()); | | bankIOR = Utils.readIOR("bankB.ior"); | Bank bankB = BankHelper.narrow(orb.string_to_object(bankIOR)); | System.out.println("+ Connected to : "+bankB.bankname()); | | // | // Make sure accounts exists | // | Account acctA; | Account acctB; | | try { | | acctA = bankA.getAccount(1); | } catch (noSuchAccount ex) { | | acctA = bankA.getAccount(bankA.newAccount()); | | acctA.deposit(100); | } | | float diff = 100 - acctA.balance(); | if (diff > 0) { | | System.out.println("+ Adding funds to "+acctA.accountname()+" "+diff); | | acctA.deposit(diff); | } | try { | | acctB = bankB.getAccount(1); | } catch (noSuchAccount ex) { | | acctB = bankB.getAccount(bankB.newAccount()); | | acctB.deposit(100); | } | | Current txCurrent = ts.getTransactionCurrent(); | | System.out.println("+ Staring balances"); | System.out.println("+ Account: "+acctA.accountname()+" Balance: "+acctA.balance()); | System.out.println("+ Account: "+acctB.accountname()+" Balance: "+acctB.balance()); | System.out.println(); | | transfer(txCurrent, acctA, acctB, 75); | | System.out.println(); | System.out.println("+ Balances after first transfer"); | System.out.println("+ Account: "+acctA.accountname()+" Balance: "+acctA.balance()); | System.out.println("+ Account: "+acctB.accountname()+" Balance: "+acctB.balance()); | System.out.println(); | | transfer(txCurrent, acctA, acctB, 50); | | System.out.println(); | System.out.println("+ Balances after second transfer"); | System.out.println("+ Account: "+acctA.accountname()+" Balance: "+acctA.balance()); | System.out.println("+ Account: "+acctB.accountname()+" Balance: "+acctB.balance()); | // try { bankA.closedown(); } catch (Throwable T) {} // try { bankB.closedown(); } catch (Throwable T) {} | | System.out.println(); | System.out.print("+ Press <ENTER> to stop client "); | System.in.read(); } static void transfer(Current txCurrent, Account acctA, Account acctB, float amnt) throws Exception { | boolean success = false; | txCurrent.begin(); | | System.out.print("+ Transfer of "+amnt+" from [" | + acctA.accountname() + "] to [" + acctB.accountname() + "] "); | | try { | | acctB.deposit(amnt); | | acctA.withdraw(amnt); | | success = true; | | txCurrent.commit(false); | | System.out.println("Succeeded."); | } catch (Throwable T) { | | System.out.println("Failed due to "+T); | | if (!success) | | txCurrent.rollback(); | } } }
This a base class that provides generic database access routines.
package resBank; import java.sql.*; import common.Utils; import common.TestDataSource; abstract class TableBase { TestDataSource _ds; String _tabname; String _sqlCreate; String _sqlSelOne; String _sqlSelAll; String _sqlInsert; String _sqlDelete; String _sqlUpdate; TableBase(TestDataSource ds, String tabname) { | _ds = ds; | _tabname = tabname; } synchronized void initialize() { | if (!loadTable()) | createTable(); } abstract void loadRow(ResultSet rset) throws SQLException; abstract void prepIns(PreparedStatement stmt, Object rowdata) throws SQLException; abstract void prepSel(PreparedStatement stmt, Object seldata) throws SQLException; abstract void loadSel(ResultSet rset, Object seldata) throws SQLException; abstract void prepDel(PreparedStatement stmt, Object rowdata) throws SQLException; abstract void prepUpd(PreparedStatement stmt, Object rowdata) throws SQLException; private boolean loadTable() { | boolean retv = false; | try { | | Connection conn = _ds.getConnection(); | | try { | | | Statement stmt = conn.createStatement(); | | | ResultSet rset = null; | | | try { | | | | try { | | | | | rset = stmt.executeQuery(_sqlSelAll); | | | | | retv = true; | | | | } catch (SQLException ex) { | | | | } | | | | if (rset != null) { | | | | | while(rset.next()) { | | | | | | loadRow(rset); | | | | | } | | | | | BankImpl.DbgOut("Table "+_tabname+" Loaded"); | | | | | rset.close(); | | | | } | | | } finally { | | | | stmt.close(); | | | } | | } finally { | | | conn.close(); | | } | } catch (Exception ex) { | | Utils.displayError("ERROR: Loading Table "+_tabname+" : "+ex, ex); | } | return retv; } private boolean createTable() { | boolean retv = false; | try { | | Connection conn = _ds.getConnection(); | | try { | | | Statement stmt = conn.createStatement(); | | | try { | | | | stmt.execute(_sqlCreate); | | | | BankImpl.DbgOut("Created table "+_tabname); | | | | retv = true; | | | } finally { | | | | stmt.close(); | | | } | | } finally { | | | conn.close(); | | } | } catch (SQLException ex) { | | Utils.displayError("ERROR: Creating Table "+_tabname+" : "+ex, ex); | } | return retv; } protected synchronized int insertRow(Object rowdata) { | int cnt = 0; | try { | | Connection conn = _ds.getConnection(); | | try { | | | PreparedStatement stmt = conn.prepareStatement(_sqlInsert); | | | try { | | | | prepIns(stmt, rowdata); | | | | cnt = stmt.executeUpdate(); | | | } finally { | | | | stmt.close(); | | | } | | } finally { | | | conn.close(); | | } | } catch (SQLException ex) { | | Utils.displayError("ERROR: Updating Table "+_tabname+" : "+ex, ex); | } | return cnt; } protected synchronized int selectRow(Object seldata) { | int cnt = 0; | try { | | Connection conn = _ds.getConnection(); | | ResultSet rset = null; | | try { | | | PreparedStatement stmt = conn.prepareStatement(_sqlSelOne); | | | try { | | | | prepSel(stmt, seldata); | | | | rset = stmt.executeQuery(); | | | | if (rset != null) { | | | | | while (rset.next()) { | | | | | | loadSel(rset, seldata); | | | | | } | | | | | rset.close(); | | | | } | | | } finally { | | | | stmt.close(); | | | } | | } finally { | | | conn.close(); | | } | } catch (SQLException ex) { | | Utils.displayError("ERROR: Select from Table "+_tabname+" : "+ex, ex); | } | return cnt; } protected synchronized int deleteRow(Object deldata) { | int cnt = 0; | try { | | Connection conn = _ds.getConnection(); | | ResultSet rset = null; | | try { | | | PreparedStatement stmt = conn.prepareStatement(_sqlDelete); | | | try { | | | | prepDel(stmt, deldata); | | | | cnt = stmt.executeUpdate(); | | | } finally { | | | | stmt.close(); | | | } | | } finally { | | | conn.close(); | | } | } catch (SQLException ex) { | | Utils.displayError("ERROR: Delete from Table "+_tabname+" : "+ex, ex); | } | return cnt; } protected synchronized int updateRow(Object upddata) { | int cnt = 0; | try { | | Connection conn = _ds.getConnection(); | | ResultSet rset = null; | | try { | | | PreparedStatement stmt = conn.prepareStatement(_sqlUpdate); | | | try { | | | | prepDel(stmt, upddata); | | | | cnt = stmt.executeUpdate(); | | | } finally { | | | | stmt.close(); | | | } | | } finally { | | | conn.close(); | | } | } catch (SQLException ex) { | | Utils.displayError("ERROR: Update from Table "+_tabname+" : "+ex, ex); | } | return cnt; } }
This class defines the Accounts
table, which holds activities for
all accounts. Each row contains the account ID, activity amount,
activity description, and TXInfoID
to corresponding TXInfo
table record
if one is associated. If the TXInfoID
is zero then the activity is
permanent. Otherwise, the taransaction to which this activity belongs
to has not completed.
package resBank; import java.sql.*; import common.Utils; import common.TestDataSource; class TableAccount extends TableBase { BankImpl _bank; int _nextid; TableAccount(TestDataSource ds, BankImpl bank) { | super(ds, bank.bankname()+"_ACCOUNTS"); | | _bank = bank; | _nextid = 1; | _sqlCreate = "CREATE TABLE "+_tabname | + " (ACCNTID INTEGER" | + " ,ACTIVITY NUMERIC(5,2)" | + " ,DESCRIPTION VARCHAR(200)" | + " ,TXINFOID INTEGER)"; | | _sqlSelAll = "SELECT ACCNTID, ACTIVITY, DESCRIPTION, TXINFOID" | + " FROM "+_tabname; | | _sqlSelOne = "SELECT ACCNTID" | + " FROM "+_tabname+" WHERE TXINFOID=?"; | | _sqlInsert = "INSERT INTO "+_tabname | + " (ACCNTID, ACTIVITY, DESCRIPTION, TXINFOID)" | + " VALUES (?,?,?,?)"; | | _sqlDelete = "DELETE FROM "+_tabname | + " WHERE TXINFOID=?"; | | _sqlUpdate = "UPDATE "+_tabname | + " SET TXINFOID=0" | + " WHERE TXINFOID=?"; | } synchronized int getNextID() { | return _nextid++; } // // LOAD TABLE // void loadRow(ResultSet rset) throws SQLException { | int accntid = rset.getInt(1); | float activity = rset.getFloat(2); | String descr = rset.getString(3); | int txinfoid = rset.getInt(4); | | if (accntid > _nextid) | _nextid = accntid + 1; | | try { | | Utils.ifdp("tbAccnt-loadRow: ["+accntid+"]["+activity+"]["+txinfoid+"]"); | | _bank.loadAccount(accntid, activity, descr, txinfoid); | } catch (Exception e) { | | throw new SQLException("BankImpl.loadAcount : "+e); | } } // // INSERT ROW // int insertRow(int id, float amnt, String desc, int txid) throws Exception { | Utils.ifdp("tbAccnt-insertRow: ["+id+"]["+amnt+"]["+txid+"]"); | return insertRow(new RowData(id, amnt, desc, txid)); } void prepIns(PreparedStatement stmt, Object rowdata) throws SQLException { | RowData row = (RowData)rowdata; | stmt.setInt (1, row._id); | stmt.setFloat (2, row._amnt); | stmt.setString(3, row._desc); | stmt.setInt (4, row._txid); } // // SELECT ROW // int selectTXID(int txid) throws Exception { | RowData row = new RowData(0, 0, null, txid); | super.selectRow(row); | return row._id; } void prepSel(PreparedStatement stmt, Object seldata) throws SQLException { | RowData row = (RowData)seldata; | stmt.setInt (1, row._txid); } void loadSel(ResultSet rset, Object seldata) throws SQLException { | RowData row = (RowData)seldata; | row._id = rset.getInt(1); } // // DELETE ROW // void deleteRow(int id) { | Utils.ifdp("tbAccnt-deleteRow: ["+id+"]"); | super.deleteRow(new Integer(id)); } void prepDel(PreparedStatement stmt, Object rowdata) throws SQLException { | stmt.setInt(1, ((Integer)rowdata).intValue()); } // // UPDATE ROW // void updateRow(int txid) throws Exception { | Utils.ifdp("tbAccnt-updateRow: ["+txid+"]"); | super.updateRow(new Integer(txid)); } void prepUpd(PreparedStatement stmt, Object rowdata) throws SQLException { | stmt.setInt(1, ((Integer)rowdata).intValue()); } private class RowData { | int _id; | float _amnt; | String _desc; | int _txid; | RowData(int id, float amnt, String desc, int txid) | { | | _id = id; _amnt = amnt; _desc = desc; _txid = txid; | } } }
This class defines the TXInfo
table, which holds information about
a transaciton that was prepared for commit. When the transaction completes
with commit or rollback its corresponding record in this table is deleted.
During start up all rows in this table are used for recovery. The information
stored in the table contains the IOR of the RecoveryCoordinator
object for the
transaction. During recovery the final outcome of the transaction is
determined by relocating the RecoveryCoordinator
object and calling its
replay_completion
method, based on the result the corresponding rows in the
Account
table are made permanent or deleted.
package resBank; import java.io.*; import java.sql.*; import common.Utils; import common.TestDataSource; class TableTXInfo extends TableBase { BankImpl _bank; int _nextid; TableTXInfo(TestDataSource ds, BankImpl bank) { | super(ds, bank.bankname()+"_TXINFO"); | | _bank = bank; | _nextid = 1; | | _sqlCreate = "CREATE TABLE "+_tabname | + " (TXINFOID INTEGER" | + " ,TXINFODATA LONG VARBINARY)"; | | _sqlSelAll = "SELECT TXINFOID, TXINFODATA" | + " FROM "+_tabname; | | _sqlSelOne = "SELECT TXINFODATA" | + " FROM "+_tabname+" WHERE TXINFOID=?"; | | _sqlInsert = "INSERT INTO "+_tabname | + " (TXINFOID, TXINFODATA)" | + " VALUES (?,?)"; | | _sqlDelete = "DELETE FROM "+_tabname | + " WHERE TXINFOID=?"; } synchronized int getNextID() { | return _nextid++; } // // LOAD TABLE // void loadRow(ResultSet rset) throws SQLException { | int id = rset.getInt(1); | InputStream is = rset.getBinaryStream(2); | ActivityImpl activity = null; | try { | | ObjectInputStream ois = new ObjectInputStream(is); | | activity = (ActivityImpl)ois.readObject(); | | ois.close(); | } catch (Exception ex) { | | throw new SQLException("Unable to Read TXINFO: "+ex); | } | | if (id > _nextid) | _nextid = id + 1; | | try { | | _bank.loadTXInfo(id, activity); | } catch (Exception e) { | | throw new SQLException("BankImpl.loadTXInfo : "+e); | } } // // INSERT ROW // int insertRow(int id, Serializable obj) throws Exception { | Utils.ifdp("tbTXinfo-insertRow: ["+id+"]"); | return insertRow(new TxRowData(id, obj)); } void prepIns(PreparedStatement stmt, Object rowdata) throws SQLException { | TxRowData row = (TxRowData)rowdata; | byte[] data; | // Serialize Object into byte array... | // | ByteArrayOutputStream bos = new ByteArrayOutputStream(); | try { | | ObjectOutputStream oos = new ObjectOutputStream(bos); | | oos.writeObject(row._obj); | | oos.flush(); | | oos.close(); | } catch (IOException ex) { | | throw new SQLException("Unable to write TXINFO: "+ex); | } | data = bos.toByteArray(); | // | stmt.setInt (1, row._id); | stmt.setBinaryStream (2, new ByteArrayInputStream(data), data.length); } // // SELECT ROW // float selectRow(int id) throws Exception { | throw new Exception("NO-IMPLEMENTATION"); } void prepSel(PreparedStatement stmt, Object seldata) throws SQLException { | throw new SQLException("NO-IMPLEMENTATION"); } void loadSel(ResultSet rset, Object seldata) throws SQLException { | throw new SQLException("NO-IMPLEMENTATION"); } // // DELETE ROW // void deleteRow(int id) { | Utils.ifdp("tbTXinfo-deleteRow: ["+id+"]"); | super.deleteRow(new Integer(id)); } void prepDel(PreparedStatement stmt, Object rowdata) throws SQLException { | stmt.setInt(1, ((Integer)rowdata).intValue()); } // // UPDATE ROW // void updateRow(int id1, int id2) throws Exception { | throw new Exception("NO-IMPLEMENTATION"); } void prepUpd(PreparedStatement stmt, Object rowdata) throws SQLException { | throw new SQLException("NO-IMPLEMENTATION"); } private class TxRowData { | int _id; | Serializable _obj; | TxRowData(int id, Serializable obj) | { | | _id = id; | | _obj = obj; | } } }
Copyright © 2003, 2004 Novell, Inc. All rights reserved. Copyright © 2001, 2002, 2003 SilverStream Software, LLC. All rights reserved.