There are two database tables used for storage, the first, accounts table, keeps track of account activities, and the second, TXInfo Table, keeps track of prepared Resources for recovery. The Account table has a TXInfoID field that associates an activity with a transaction. If this field is zero, then the activity is considered permanent. If it is non-zero then the activity belongs to in a transaction that was prepared.
When deposits or withdrawals are made in a transaction, they are kept track of in an activity object and do not reflect on the permanent balance of the account.
When the transaction is rolledback, the activity object is released and the account permanent balance is not effected by the transaction. There are no updates to the database tables.
When the transaction is committed with one phase, the activity record is written to the Accounts table as a permanent record and the account permanent balance is updated.
When the transaction is completed with two phase commit, during the prepare stage a Transaction Recovery record is written to the TXInfo Table, and the activity is written to the Accounts table with associated link to the TXInfo Table in its TXInfoID field. If the final outcome of the transction is to commit then, the the TXInfoID field of the activity record in the Accounts table is cleared, the Transaction record is deleted from the TXInfo Table, and the account permanent balance is updated. If the final outcome is to rollback then the activity record in the Accounts table, and the Transaction record in the TXInfo table are deleted.
During startup of the Bank Servers the TXInfo table is used for recovery. All entries in that table are considered Transaction that were prepared but not completed. The RecoveryCoordinator for each transaction is located and the Accounts table is updated based on the completion state of the transaction.
The source files are:
bank.idl // IDL describing the Bank and Account objects AccountImpl.java // Implementation of the Account object. ActivityImpl.java // Implementation of the Activity Resource object. BankImpl.java // Implementation of the Bank object. BankClient.java // Client program that transfers funds. BankServerA.java // First Bank Server program. BankServerB.java // Second Bank Server program. TableBase.java // Abstract class for DB access. TableAccount.java // Accounts table access class. TableTXInfo.java // Transaction Information table access class.
antTo execute the test, you need to start the two Bank servers, and when the two Bank server programs are ready to receive requests you can run the Bank client program.
ant servera
ant serverb
ant client
package resBank; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.Hashtable; import org.omg.CORBA.ORB; import org.omg.CORBA.PERSIST_STORE; import resBank.AccountPackage.CannotDeposit; import resBank.AccountPackage.CannotWithdraw; import resBank.AccountPackage.InsufficientFunds; import resBank.AccountPackage.BalanceNotAvailable; import org.omg.CosTransactions.Vote; import org.omg.CosTransactions.Control; import org.omg.CosTransactions.Current; import org.omg.CosTransactions.Resource; import org.omg.CosTransactions.Coordinator; import org.omg.CosTransactions.Unavailable; import org.omg.CosTransactions.NotPrepared; import org.omg.CosTransactions.HeuristicMixed; import org.omg.CosTransactions.HeuristicHazard; import org.omg.CosTransactions.HeuristicCommit; import org.omg.CosTransactions.HeuristicRollback; import common.Utils; import common.TestDataSource; /* * Implementation of the Account interface. * It keeps track of the current account balance * and transactional activity balance. The account * balance is based on the current balance plus * any activity that belongs to the current transaction * if one is present. * * If the Transaction prepared the activity is stored * with a transaction information. When the Transaction * is then commited the activity is made permanent by clearing * the TXINFOID field in that row. If the Transaction is * rolled back after a prepare, the activity row is deleted. * * If the Transaciton is completed with one-phase commit * the activity is stored as permanent record. * * If the Transction is rolledback without prepare, then * there are no updates to the storage. */ public class AccountImpl extends AccountPOA { int _id; BankImpl _bank; Account _account; float _balance; Hashtable _txActivity; public AccountImpl(BankImpl bank, int id, float balance) { | _id = id; | _bank = bank; | _balance = balance; | _txActivity = new Hashtable(); } ORB getORB() { | return _bank.getORB(); } Account getReference() { | return _account; } void setReference(Account account) { | _account = account; } public String accountname() { | return _bank.bankname()+".#."+_id; } public synchronized float deposit(float amnt) throws CannotDeposit { | float balance = _balance + amnt; | ActivityImpl activity; | | try { | | activity = getTXActivity(); | } catch (Exception ex) { | | Utils.dp("Deposit: Unexpected error "+ex, ex); | | throw new CannotDeposit(); | } | | if (activity != null) { | | // Transactional Deposit | | balance += activity._balance; | | activity._balance += amnt; | } else { | | // Non-Transactional Deposit | | try { | | | _bank.insertActivity(_id, amnt, "credit", 0); | | | _balance += amnt; | | } catch (Exception ex) { | | | Utils.dp("Deposit: Unexpected error "+ex, ex); | | | throw new CannotDeposit(); | | } | } | Utils.ifdp("Deposit "+amnt+" -> "+balance); | return balance; } public synchronized float withdraw(float amnt) throws InsufficientFunds, CannotWithdraw { | float balance = _balance - amnt; | ActivityImpl activity; | | try { | | activity = getTXActivity(); | } catch (Exception ex) { | | Utils.dp("Withdraw: Unexpected error "+ex,ex); | | throw new CannotWithdraw(); | } | | if (activity != null) { | | // Transactional Withdrawal | | balance += activity._balance; | | if (balance >= 0) | | activity._balance -= amnt; | | | } else if (balance >= 0) { | | // Non-Transactional Withdrawal | | try { | | | _bank.insertActivity(_id, (0-amnt), "debit", 0); | | | _balance -= amnt; | | } catch (Exception ex) { | | | Utils.dp("Withdraw: Unexpected error "+ex,ex); | | | throw new CannotWithdraw(); | | } | } | | if (balance < 0) | throw new InsufficientFunds(); | | Utils.ifdp("Withdraw "+amnt+" -> "+balance); | return balance; } public synchronized float balance() throws BalanceNotAvailable { | return _balance; } void adjustBalance(float amount) { | _balance += amount; } void releaseActivity(int hash, ActivityImpl activity) { | Integer key = new Integer(hash); | ActivityImpl act = (ActivityImpl) _txActivity.get(key); | if (act != null) { | | act = act.remove(activity); | | if (act != null) | | _txActivity.put(key, act); | } } private ActivityImpl getTXActivity() throws Exception { | ActivityImpl activity; | Control control = _bank._ts.getTransactionCurrent().get_control(); | | Coordinator coord = null; | if (control != null) { | | try { | | | coord = control.get_coordinator(); | | } catch (Unavailable ex) { | | | if (Utils._debug) Utils.dp("getTXActivity: Unavailable",ex); | | } | } | if (coord == null) { | | Utils.ifdp("No transction"); | | return null; | } | | int hash = coord.hash_top_level_tran(); | Integer key = new Integer(hash); | ActivityImpl act = (ActivityImpl) _txActivity.get(key); | Resource res = null; | if (act == null) { | | // Transaciton not in hashtable | | Utils.ifdp("new Transction"); | | activity = new ActivityImpl(null, hash, this, coord); | | res = (Resource)_bank.activateServant(activity.getID(), activity); | | _txActivity.put(key, activity); | } else { | | // Transaction is in the hash bucket | | // Find the exact one. | | Utils.ifdp("found Transction"); | | activity = act.find(coord); | | if (activity == null) { | | | // Add to this hash bucket. | | | Utils.ifdp("new Transction"); | | | activity = new ActivityImpl(null, hash, this, coord); | | | res = (Resource)_bank.activateServant(activity.getID(), activity); | | | act.append(activity); | | } | } | if (res != null) { | | activity.setRecoveryCoord(coord.register_resource(res)); | } | return activity; } }
package resBank; import java.io.IOException; import java.io.Serializable; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import org.omg.CORBA.ORB; import org.omg.CosTransactions.Vote; import org.omg.CosTransactions.Status; import org.omg.CosTransactions.Resource; import org.omg.CosTransactions.Coordinator; import org.omg.CosTransactions.NotPrepared; import org.omg.CosTransactions.HeuristicMixed; import org.omg.CosTransactions.HeuristicHazard; import org.omg.CosTransactions.HeuristicCommit; import org.omg.CosTransactions.HeuristicRollback; import org.omg.CosTransactions.RecoveryCoordinator; import common.Utils; import common.TestDataSource; /* * Implementation of the Activity interface that extends * CosTransacitons.Resource interface. This object represents * the activity to an account during a transaction. It is * used to keep track of individual transaction outcomes, as * well as, providing recovery during startup. * * A Resource can be registered with one-and-only-one Transaction. * It is up to the application to keep track of Resources that are * registered in Transactions, an to make sure a Resource is only * registered once with a Transaction and that it is not reused. * In this example the AccountImpl object keeps track of the * Activity-Resource to Transaction mapping. * */ class ActivityImpl extends ActivityPOA implements Serializable, Runnable { ActivityImpl _next = null; int _hash = 0; AccountImpl _account = null; int _accntID = 0; float _balance = 0; String _refobjid = null; Coordinator _coord = null; RecoveryCoordinator _recCoord = null; Resource _resource = null; String _recCoordIOR = null; boolean _recovered = false; boolean _prepared = false; int _preptxid = 0; boolean _prepcommit = false; public ActivityImpl() { } ActivityImpl(ActivityImpl next, int hash, AccountImpl account, Coordinator coord) { | _next = next; | _hash = hash; | _account = account; | _accntID = account._id; | _balance = 0; | _refobjid = genID(); | _coord = coord; | _recovered = true; } ORB getORB() { | return _account.getORB(); } void setRecoveryCoord(RecoveryCoordinator recCoord) { | _recCoord = recCoord; | _recCoordIOR = getORB().object_to_string(recCoord); } void recover() { | Thread thread = new Thread(this); | thread.setDaemon(true); | thread.start(); } public void run() { | for (;;) | { | | Utils.ifdp("Recovery for "+_preptxid+" start ..."); | | | | // Make sure we have a recovery coordinator | | if (_recCoord == null) { | | | try { | | | | Utils.ifdp("Recovery resolve "+_recCoordIOR); | | | | _recCoord = (RecoveryCoordinator) | | | | getORB().string_to_object(_recCoordIOR); | | | } catch (org.omg.CORBA.COMM_FAILURE ex) { | | | | Utils.ifdp("Recovery RecCoord COMM_FAILURE"); | | | | // Cant be found...try later | | | } catch (org.omg.CORBA.OBJECT_NOT_EXIST ex) { | | | | // Does not exists... | | | | Utils.ifdp("Recovery RecCoord OBJECT_NOT_EXIST ... presumed rollback"); | | | | rollbackWork(); | | | | return; | | | } | | } | | | | if (_recCoord != null) { | | | try { | | | | Utils.ifdp("Recovery find status ... "); | | | | Status status = _recCoord.replay_completion(_resource); | | | | switch (status.value()) { | | | | | case Status._StatusActive: | | | | | case Status._StatusCommitting: | | | | | case Status._StatusPrepared: | | | | | case Status._StatusPreparing: | | | | | case Status._StatusRollingBack: | | | | | case Status._StatusMarkedRollback: | | | | | case Status._StatusCommitted: | | | | | case Status._StatusUnknown: | | | | | // Coordinator still active ... | | | | | // Wait to be called. | | | | | Utils.ifdp("Recovery Status Wait to be called back"); | | | | | return; | | | | | | | | | | case Status._StatusRolledBack: | | | | | case Status._StatusNoTransaction: | | | | | Utils.ifdp("Recovery Status Rolledback"); | | | | | rollbackWork(); | | | | | return; | | | | } | | | } catch (NotPrepared ex) { | | | | // Wait for the ccordinator to call us | | | | Utils.ifdp("Recovery NotPrepared"); | | | | break; | | | } catch (Exception ex) { | | | | // try later... | | | } | | } | | | | // Wait to try later... | | try { | | | Utils.ifdp("Recovery for "+_preptxid+" sleep ..."); | | | Thread.sleep(10000); // sleep for 10 seconds | | } catch (InterruptedException ex) { | | | break; | | } | } } static long _incr = System.currentTimeMillis(); static private synchronized String genID() { | return "ACT-"+_incr++; } byte[] getID() { | return _refobjid.getBytes(); } private void deactivate() { | if (_account != null) { | | _account.releaseActivity(_hash, this); | | if (_resource != null) { | | | try { _account._bank.deactivateObject(_resource); } catch(Exception e) {} | | } | | _resource = null; | } } // // Activity hash bucket chain // void append(ActivityImpl newactivity) { | ActivityImpl activity = this; | while(activity._next != null) { | | activity = activity._next; | } | activity._next = newactivity; | newactivity._next = null; } ActivityImpl remove(ActivityImpl other) { | if (other == this) { | | return _next; | } | for(ActivityImpl curr = this; curr != null; curr = curr._next) { | | if (curr._next == other) { | | | curr._next = other._next; | | | break; | | } | } | return null; } ActivityImpl find(Coordinator coord) { | ActivityImpl activity = this; | while(activity != null) { | | if (coord.is_same_transaction(activity._coord)) | | return activity; | | activity = activity._next; | } | return null; } // // CosTransactions.Resource methods // public synchronized org.omg.CosTransactions.Vote prepare() throws HeuristicMixed, HeuristicHazard { | Utils.ifdp("Prepare: prepd="+_prepared+" bal="+_balance); | if (_prepared) { | | throw new HeuristicHazard(); | } | if (_balance == 0) { | | Utils.ifdp("Prepare -> VoteReadOnly"); | | return Vote.VoteReadOnly; | } | | // Write self into TXINFO table | try { | | Utils.ifdp("Prepare: save TXInfo"); | | _preptxid = _account._bank.insertTXInfo(this); | } catch (Throwable ex) { | | Utils.ifdp("Prepare: unexpected error: "+ex, ex); | | throw new HeuristicHazard(); | } | | _prepared = true; | | // Write activity intto ACCOUNTS table | try { | | Utils.ifdp("Prepare: save activity tx="+_preptxid); | | _account._bank.insertActivity(_accntID, _balance, | | "TX-"+_preptxid, _preptxid); | } catch (Throwable ex) { | | Utils.ifdp("Prepare: unexpected error: "+ex, ex); | | Utils.ifdp("Prepare -> VoteRollback"); | | return Vote.VoteRollback; | } | _prepcommit = true; | Utils.ifdp("Prepare -> VoteCommit"); | return Vote.VoteCommit; } private synchronized boolean rollbackWork() { | boolean haveerr = false; | // Undo/delete prepared rows... | try { | | _account._bank.deleteActivity(_preptxid); | } catch (Exception ex) { | | Utils.ifdp("Rollback: delete activity error: "+ex, ex); | | haveerr = true; | } | try { | | _account._bank.deleteTXInfo(_preptxid); | } catch (Exception ex) { | | Utils.ifdp("Rollback: delete TXInfo error: "+ex, ex); | | haveerr = true; | } | return haveerr; } public synchronized void rollback() throws HeuristicCommit, HeuristicMixed, HeuristicHazard { | Utils.ifdp("Rollback: prepd="+_prepared); | try { | | boolean haveerr = false; | | if (_prepared) | | haveerr = rollbackWork(); | | | | if (haveerr) | | throw new HeuristicHazard(); | } finally { | | deactivate(); | } } public synchronized void commit_one_phase() throws HeuristicHazard { | Utils.ifdp("Commit_One_Phase: prepd="+_prepared); | try { | | if (_prepared) { | | | try { rollback(); } catch(Exception ex) {} | | | throw new HeuristicHazard(); | | } | | // insert permanent activity ... | | try { | | | _account._bank.insertActivity(_accntID, _balance, | | | "TX-"+_preptxid, 0); | | | _account.adjustBalance(_balance); | | } catch (Exception ex) { | | | Utils.ifdp("Commit_One_Phase: insert activity error: "+ex, ex); | | | throw new HeuristicHazard(); | | } | } finally { | | deactivate(); | } } public synchronized void commit() throws NotPrepared, HeuristicRollback, HeuristicMixed, HeuristicHazard { | Utils.ifdp("Commit: prepd="+_prepared+" vote-commit="+_prepcommit); //TEST RECOVERY*/ System.exit(0); | try { | | if (!_prepared) | | throw new NotPrepared(); | | | | if (!_prepcommit) { | | | try { rollback(); } catch(Exception ex) {} | | | throw new HeuristicRollback(); | | } | | | | // make activity permanent ... | | try { | | | _account._bank.updateActivity(_preptxid); | | | _account.adjustBalance(_balance); | | } catch (Exception ex) { | | | Utils.ifdp("Commit: update activity error: "+ex, ex); | | | throw new HeuristicHazard(); | | } finally { | | | // delete TX info ... | | | try { | | | | _account._bank.deleteTXInfo(_preptxid); | | | } catch (Exception ex) { | | | | Utils.ifdp("Commit: delete TXInfo error: "+ex, ex); | | | } | | } | } finally { | | deactivate(); | } } public synchronized void forget() { | Utils.ifdp("Forget: "); | // Forget about it... | if (_preptxid != 0) { | | try {_account._bank.deleteActivity(_preptxid);} catch (Exception ex) {} | | try {_account._bank.deleteTXInfo(_preptxid);} catch (Exception ex) {} | } } // // Serializable // private void writeObject(ObjectOutputStream out) throws IOException { | out.writeInt(_accntID); | out.writeFloat(_balance); | out.writeUTF(_refobjid); | out.writeUTF(_recCoordIOR); } private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { | _accntID = in.readInt(); | _balance = in.readFloat(); | _refobjid = in.readUTF(); | _recCoordIOR = in.readUTF(); } }
package resBank; import java.io.File; import java.io.Serializable; import java.util.Date; import java.util.Stack; import java.util.Hashtable; import java.text.SimpleDateFormat; import java.sql.*; import javax.sql.*; import javax.transaction.*; import javax.transaction.xa.*; import org.omg.CORBA.ORB; import org.omg.CORBA.Policy; import org.omg.CORBA.Object; import org.omg.PortableServer.POA; import org.omg.PortableServer.Servant; import org.omg.CosTransactions.Resource; import com.sssw.jts.api.TransactionService; import common.Utils; import common.TestDataSource; import common.ResourceMapper; import resBank.BankPackage.noSuchAccount; import resBank.BankPackage.AccountCreationError; public class BankImpl extends BankPOA { ORB _orb; TransactionService _ts; POA _acctPOA; Hashtable _accounts; ResourceMapper _resmapper; TestDataSource _ds; String _tableName; TableAccount _tabAccnt; TableTXInfo _tabTxInfo; Stack _recovery; static SimpleDateFormat _datefmt = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss.SSS"); BankImpl(ORB orb, POA bankPOA, String db, String tabname, ResourceMapper resmapper) throws Exception { | _orb = orb; | _ts = (TransactionService) | orb.resolve_initial_references("TransactionService"); | _resmapper = resmapper; | _tableName = tabname; | _acctPOA = bankPOA; | _accounts = new Hashtable(); | _ds = new TestDataSource(db); | _tabAccnt = new TableAccount(_ds, this); | _tabTxInfo = new TableTXInfo(_ds, this); | _recovery = new Stack(); | Utils.dp("Initializing Accounts"); | _tabAccnt.initialize(); | | Utils.dp("Initializing TXInfo"); | _tabTxInfo.initialize(); | | // Start recovery of TXInfo... | recoverTXInfo(); } // // Public .... // public int closedown() { | try { | | _ds.shutdown(); | | _orb.shutdown(false); | } catch (Throwable T) { | | Utils.dp("Shutdown problems ... "+T,T); | } | return 0; } public String bankname() { | return _tableName; } public Account getAccount(int acctNum) throws noSuchAccount { | AccountImpl account = null; | Utils.dp("Lookup account "+acctNum); | try { | | account = (AccountImpl) _accounts.get(new Integer(acctNum)); | } catch (Throwable T) { | } | if (account == null) | throw new noSuchAccount(); | | return account.getReference(); } public synchronized int newAccount() throws AccountCreationError { | int id = _tabAccnt.getNextID(); | | try { | | createAccnt(id); | } catch (Exception e) { | | throw new AccountCreationError(); | } | try { | | insertActivity(id, 0, "New account", 0); | } catch (Exception e) { | | Utils.dp("newAccount: insert activity "+e,e); | } | | Utils.dp("Created new account "+id); | return id; } // // Private level // private AccountImpl createAccnt(int accntid) throws Exception { | byte[] objectId = new byte[1]; | objectId[0] = (byte) accntid; | | AccountImpl acctount = new AccountImpl(this, accntid, 0); | acctount.setReference((Account) activateServant(objectId, acctount)); | _accounts.put(new Integer(accntid), acctount); | return acctount; } // // package level // ORB getORB() { | return _orb; } org.omg.CORBA.Object activateServant(byte[] objectId, Servant servant) throws Exception { | _acctPOA.activate_object_with_id(objectId, servant); | return _acctPOA.servant_to_reference(servant); } void deactivateObject(org.omg.CORBA.Object obj) throws Exception { | _acctPOA.deactivate_object(_acctPOA.reference_to_id(obj)); } void loadAccount(int accntid, float amount, String descr, int txinfoid) throws Exception { | AccountImpl account = null; | account = (AccountImpl) _accounts.get(new Integer(accntid)); | if (account == null) { | | account = createAccnt(accntid); | } else if (txinfoid == 0) { | | account.adjustBalance(amount); | } } void insertActivity(int accntid, float amount, String descr, int txinfoid) throws Exception { | descr = "[" + _datefmt.format(new Date()) + "] " + descr; | | _tabAccnt.insertRow(accntid, amount, descr, txinfoid); } void deleteActivity(int txinfoid) throws Exception { | _tabAccnt.deleteRow(txinfoid); } void updateActivity(int txinfoid) throws Exception { | _tabAccnt.updateRow(txinfoid); } void recoverTXInfo() { | while (!_recovery.empty()) { | | ActivityImpl activity = (ActivityImpl) _recovery.pop(); | | | | Utils.ifdp("recover ["+activity._preptxid+"]["+activity._accntID+"]"); | | if (activity._account == null) { | | | _tabTxInfo.deleteRow(activity._preptxid); | | } else { | | | activity.recover(); | | } | } } void loadTXInfo(int txid, ActivityImpl activity) throws Exception { | int acctNum = _tabAccnt.selectTXID(txid); | AccountImpl account; | | account = (AccountImpl) _accounts.get(new Integer(acctNum)); | | Utils.ifdp("tbTXinfo-loadRow: ["+txid+"]["+acctNum+"]["+account+"]"); | | if (account != null) { | | activity._account = account; | | activity._accntID = acctNum; | | activity._resource = (Resource)activateServant(activity.getID(), activity); | | activity._prepared = true; | } | | activity._preptxid = txid; | _recovery.push(activity); } int insertTXInfo(ActivityImpl act) throws Exception { | int id = _tabTxInfo.getNextID(); | _tabTxInfo.insertRow(id, act); | return id; } void deleteTXInfo(int txinfoid) throws Exception { | _tabTxInfo.deleteRow(txinfoid); } // // TXInfo... // static void DbgOut(String s) { | System.out.println("+ "+s); } }
package resBank; import java.util.Properties; import org.omg.CORBA.Any; import org.omg.CORBA.ORB; import org.omg.CORBA.Object; import org.omg.CORBA.Policy; import org.omg.PortableServer.POA; import org.omg.PortableServer.Servant; import org.omg.PortableServer.LifespanPolicyValue; import org.omg.PortableServer.IdAssignmentPolicyValue; import org.omg.PortableServer.ImplicitActivationPolicyValue; import org.omg.CosTransactions.ADAPTS; import org.omg.CosTransactions.SHARED; import org.omg.CosTransactions.OTS_POLICY_TYPE; import org.omg.CosTransactions.INVOCATION_POLICY_TYPE; import com.sssw.jbroker.api.transaction.TSIdentification; import com.sssw.jts.api.TransactionService; import common.Utils; import common.ResourceMapper; import common.TestDataSource; public class BankServerA { public static void main(String args[]) throws Exception { | mainBody(args, "bankA", TestDataSource._testDBkA, "FleeceNationalBank", "44441"); } static void mainBody(String args[], String iorName, String dbName, String bankName, String orbPort) throws Exception { | // Create a ResourceHandleMapper | ResourceMapper resMapper = new ResourceMapper(); | | System.setProperty("transaction.service.id", bankName); | | // Initialize the ORB and get instance of TM | System.setProperty("ORBPort", orbPort); | ORB orb = ORB.init(args, null); | TransactionService ts = (TransactionService) | orb.resolve_initial_references("TransactionService"); | | // | // Recover TM. | // | ts.recover(resMapper, false, null); | | // get the root POA | POA rootPOA = (POA) orb.resolve_initial_references("RootPOA"); | | // create the POA for Transient Account Objects | Any otsPolicy = orb.create_any(); | otsPolicy.insert_short(ADAPTS.value); | | Any invPolicy = orb.create_any(); | invPolicy.insert_short(SHARED.value); | | POA bankPOA = rootPOA.create_POA("poa_"+bankName, | rootPOA.the_POAManager(), | new Policy[] { | | rootPOA.create_id_assignment_policy(IdAssignmentPolicyValue.USER_ID), | | rootPOA.create_lifespan_policy(LifespanPolicyValue.TRANSIENT), | | orb.create_policy(OTS_POLICY_TYPE.value, otsPolicy), | | orb.create_policy(INVOCATION_POLICY_TYPE.value, invPolicy) | } | ); | bankPOA.the_POAManager().activate(); | | Servant servant; | org.omg.CORBA.Object bankobj; | | servant = new BankImpl(orb, bankPOA, dbName, bankName, resMapper); | bankPOA.activate_object_with_id(("bank_"+bankName).getBytes(), servant); | bankobj = bankPOA.servant_to_reference(servant); | | String bankAIOR = orb.object_to_string(bankobj); | | // write the stringified object reference | Utils.writeIOR(bankAIOR, iorName + ".ior"); | | System.out.println("+ " + bankName + " is now open for business ..."); | orb.run(); } }
package resBank; import common.TestDataSource; public class BankServerB extends BankServerA { public static void main(String args[]) throws Exception { | mainBody(args, "bankB", TestDataSource._testDBkB, "InsolventBank", "44442"); } }
package resBank; import java.io.*; import java.util.*; import org.omg.CORBA.ORB; import org.omg.CosTransactions.Current; import com.sssw.jbroker.api.transaction.TSIdentification; import com.sssw.jts.api.TransactionService; import common.Utils; import common.ResourceMapper; import resBank.BankPackage.noSuchAccount; import resBank.BankPackage.AccountCreationError; public class BankClient { public static void main(String args[]) throws Exception { | // Create a ResourceHandleMapper | ResourceMapper resMapper = new ResourceMapper(); | | // Initialize the ORB and get instance of TM | ORB orb = ORB.init(args, null); | TransactionService ts = (TransactionService) | orb.resolve_initial_references("TransactionService"); | | // | // Recover TM | // | ts.recover(resMapper, false, null); | | // | // Locate BankA and BankB | // | String bankIOR; | | bankIOR = Utils.readIOR("bankA.ior"); | Bank bankA = BankHelper.narrow(orb.string_to_object(bankIOR)); | System.out.println("+ Connected to : "+bankA.bankname()); | | bankIOR = Utils.readIOR("bankB.ior"); | Bank bankB = BankHelper.narrow(orb.string_to_object(bankIOR)); | System.out.println("+ Connected to : "+bankB.bankname()); | | // | // Make sure accounts exists | // | Account acctA; | Account acctB; | | try { | | acctA = bankA.getAccount(1); | } catch (noSuchAccount ex) { | | acctA = bankA.getAccount(bankA.newAccount()); | | acctA.deposit(100); | } | | float diff = 100 - acctA.balance(); | if (diff > 0) { | | System.out.println("+ Adding funds to "+acctA.accountname()+" "+diff); | | acctA.deposit(diff); | } | try { | | acctB = bankB.getAccount(1); | } catch (noSuchAccount ex) { | | acctB = bankB.getAccount(bankB.newAccount()); | | acctB.deposit(100); | } | | Current txCurrent = ts.getTransactionCurrent(); | | System.out.println("+ Staring balances"); | System.out.println("+ Account: "+acctA.accountname()+" Balance: "+acctA.balance()); | System.out.println("+ Account: "+acctB.accountname()+" Balance: "+acctB.balance()); | System.out.println(); | | transfer(txCurrent, acctA, acctB, 75); | | System.out.println(); | System.out.println("+ Balances after first transfer"); | System.out.println("+ Account: "+acctA.accountname()+" Balance: "+acctA.balance()); | System.out.println("+ Account: "+acctB.accountname()+" Balance: "+acctB.balance()); | System.out.println(); | | transfer(txCurrent, acctA, acctB, 50); | | System.out.println(); | System.out.println("+ Balances after second transfer"); | System.out.println("+ Account: "+acctA.accountname()+" Balance: "+acctA.balance()); | System.out.println("+ Account: "+acctB.accountname()+" Balance: "+acctB.balance()); | // try { bankA.closedown(); } catch (Throwable T) {} // try { bankB.closedown(); } catch (Throwable T) {} | | System.out.println(); | System.out.print("+ Press <ENTER> to stop client "); | System.in.read(); } static void transfer(Current txCurrent, Account acctA, Account acctB, float amnt) throws Exception { | boolean success = false; | txCurrent.begin(); | | System.out.print("+ Transfer of "+amnt+" from [" | + acctA.accountname() + "] to [" + acctB.accountname() + "] "); | | try { | | acctB.deposit(amnt); | | acctA.withdraw(amnt); | | success = true; | | txCurrent.commit(false); | | System.out.println("Succeeded."); | } catch (Throwable T) { | | System.out.println("Failed due to "+T); | | if (!success) | | txCurrent.rollback(); | } } }
package resBank; import java.sql.*; import common.Utils; import common.TestDataSource; abstract class TableBase { TestDataSource _ds; String _tabname; String _sqlCreate; String _sqlSelOne; String _sqlSelAll; String _sqlInsert; String _sqlDelete; String _sqlUpdate; TableBase(TestDataSource ds, String tabname) { | _ds = ds; | _tabname = tabname; } synchronized void initialize() { | if (!loadTable()) | createTable(); } abstract void loadRow(ResultSet rset) throws SQLException; abstract void prepIns(PreparedStatement stmt, Object rowdata) throws SQLException; abstract void prepSel(PreparedStatement stmt, Object seldata) throws SQLException; abstract void loadSel(ResultSet rset, Object seldata) throws SQLException; abstract void prepDel(PreparedStatement stmt, Object rowdata) throws SQLException; abstract void prepUpd(PreparedStatement stmt, Object rowdata) throws SQLException; private boolean loadTable() { | boolean retv = false; | try { | | Connection conn = _ds.getConnection(); | | try { | | | Statement stmt = conn.createStatement(); | | | ResultSet rset = null; | | | try { | | | | try { | | | | | rset = stmt.executeQuery(_sqlSelAll); | | | | | retv = true; | | | | } catch (SQLException ex) { | | | | } | | | | if (rset != null) { | | | | | while(rset.next()) { | | | | | | loadRow(rset); | | | | | } | | | | | BankImpl.DbgOut("Table "+_tabname+" Loaded"); | | | | | rset.close(); | | | | } | | | } finally { | | | | stmt.close(); | | | } | | } finally { | | | conn.close(); | | } | } catch (Exception ex) { | | Utils.displayError("ERROR: Loading Table "+_tabname+" : "+ex, ex); | } | return retv; } private boolean createTable() { | boolean retv = false; | try { | | Connection conn = _ds.getConnection(); | | try { | | | Statement stmt = conn.createStatement(); | | | try { | | | | stmt.execute(_sqlCreate); | | | | BankImpl.DbgOut("Created table "+_tabname); | | | | retv = true; | | | } finally { | | | | stmt.close(); | | | } | | } finally { | | | conn.close(); | | } | } catch (SQLException ex) { | | Utils.displayError("ERROR: Creating Table "+_tabname+" : "+ex, ex); | } | return retv; } protected synchronized int insertRow(Object rowdata) { | int cnt = 0; | try { | | Connection conn = _ds.getConnection(); | | try { | | | PreparedStatement stmt = conn.prepareStatement(_sqlInsert); | | | try { | | | | prepIns(stmt, rowdata); | | | | cnt = stmt.executeUpdate(); | | | } finally { | | | | stmt.close(); | | | } | | } finally { | | | conn.close(); | | } | } catch (SQLException ex) { | | Utils.displayError("ERROR: Updating Table "+_tabname+" : "+ex, ex); | } | return cnt; } protected synchronized int selectRow(Object seldata) { | int cnt = 0; | try { | | Connection conn = _ds.getConnection(); | | ResultSet rset = null; | | try { | | | PreparedStatement stmt = conn.prepareStatement(_sqlSelOne); | | | try { | | | | prepSel(stmt, seldata); | | | | rset = stmt.executeQuery(); | | | | if (rset != null) { | | | | | while (rset.next()) { | | | | | | loadSel(rset, seldata); | | | | | } | | | | | rset.close(); | | | | } | | | } finally { | | | | stmt.close(); | | | } | | } finally { | | | conn.close(); | | } | } catch (SQLException ex) { | | Utils.displayError("ERROR: Select from Table "+_tabname+" : "+ex, ex); | } | return cnt; } protected synchronized int deleteRow(Object deldata) { | int cnt = 0; | try { | | Connection conn = _ds.getConnection(); | | ResultSet rset = null; | | try { | | | PreparedStatement stmt = conn.prepareStatement(_sqlDelete); | | | try { | | | | prepDel(stmt, deldata); | | | | cnt = stmt.executeUpdate(); | | | } finally { | | | | stmt.close(); | | | } | | } finally { | | | conn.close(); | | } | } catch (SQLException ex) { | | Utils.displayError("ERROR: Delete from Table "+_tabname+" : "+ex, ex); | } | return cnt; } protected synchronized int updateRow(Object upddata) { | int cnt = 0; | try { | | Connection conn = _ds.getConnection(); | | ResultSet rset = null; | | try { | | | PreparedStatement stmt = conn.prepareStatement(_sqlUpdate); | | | try { | | | | prepDel(stmt, upddata); | | | | cnt = stmt.executeUpdate(); | | | } finally { | | | | stmt.close(); | | | } | | } finally { | | | conn.close(); | | } | } catch (SQLException ex) { | | Utils.displayError("ERROR: Update from Table "+_tabname+" : "+ex, ex); | } | return cnt; } }
package resBank; import java.sql.*; import common.Utils; import common.TestDataSource; class TableAccount extends TableBase { BankImpl _bank; int _nextid; TableAccount(TestDataSource ds, BankImpl bank) { | super(ds, bank.bankname()+"_ACCOUNTS"); | | _bank = bank; | _nextid = 1; | _sqlCreate = "CREATE TABLE "+_tabname | + " (ACCNTID INTEGER" | + " ,ACTIVITY NUMERIC(5,2)" | + " ,DESCRIPTION VARCHAR(200)" | + " ,TXINFOID INTEGER)"; | | _sqlSelAll = "SELECT ACCNTID, ACTIVITY, DESCRIPTION, TXINFOID" | + " FROM "+_tabname; | | _sqlSelOne = "SELECT ACCNTID" | + " FROM "+_tabname+" WHERE TXINFOID=?"; | | _sqlInsert = "INSERT INTO "+_tabname | + " (ACCNTID, ACTIVITY, DESCRIPTION, TXINFOID)" | + " VALUES (?,?,?,?)"; | | _sqlDelete = "DELETE FROM "+_tabname | + " WHERE TXINFOID=?"; | | _sqlUpdate = "UPDATE "+_tabname | + " SET TXINFOID=0" | + " WHERE TXINFOID=?"; | } synchronized int getNextID() { | return _nextid++; } // // LOAD TABLE // void loadRow(ResultSet rset) throws SQLException { | int accntid = rset.getInt(1); | float activity = rset.getFloat(2); | String descr = rset.getString(3); | int txinfoid = rset.getInt(4); | | if (accntid > _nextid) | _nextid = accntid + 1; | | try { | | Utils.ifdp("tbAccnt-loadRow: ["+accntid+"]["+activity+"]["+txinfoid+"]"); | | _bank.loadAccount(accntid, activity, descr, txinfoid); | } catch (Exception e) { | | throw new SQLException("BankImpl.loadAcount : "+e); | } } // // INSERT ROW // int insertRow(int id, float amnt, String desc, int txid) throws Exception { | Utils.ifdp("tbAccnt-insertRow: ["+id+"]["+amnt+"]["+txid+"]"); | return insertRow(new RowData(id, amnt, desc, txid)); } void prepIns(PreparedStatement stmt, Object rowdata) throws SQLException { | RowData row = (RowData)rowdata; | stmt.setInt (1, row._id); | stmt.setFloat (2, row._amnt); | stmt.setString(3, row._desc); | stmt.setInt (4, row._txid); } // // SELECT ROW // int selectTXID(int txid) throws Exception { | RowData row = new RowData(0, 0, null, txid); | super.selectRow(row); | return row._id; } void prepSel(PreparedStatement stmt, Object seldata) throws SQLException { | RowData row = (RowData)seldata; | stmt.setInt (1, row._txid); } void loadSel(ResultSet rset, Object seldata) throws SQLException { | RowData row = (RowData)seldata; | row._id = rset.getInt(1); } // // DELETE ROW // void deleteRow(int id) { | Utils.ifdp("tbAccnt-deleteRow: ["+id+"]"); | super.deleteRow(new Integer(id)); } void prepDel(PreparedStatement stmt, Object rowdata) throws SQLException { | stmt.setInt(1, ((Integer)rowdata).intValue()); } // // UPDATE ROW // void updateRow(int txid) throws Exception { | Utils.ifdp("tbAccnt-updateRow: ["+txid+"]"); | super.updateRow(new Integer(txid)); } void prepUpd(PreparedStatement stmt, Object rowdata) throws SQLException { | stmt.setInt(1, ((Integer)rowdata).intValue()); } private class RowData { | int _id; | float _amnt; | String _desc; | int _txid; | RowData(int id, float amnt, String desc, int txid) | { | | _id = id; _amnt = amnt; _desc = desc; _txid = txid; | } } }
package resBank; import java.io.*; import java.sql.*; import common.Utils; import common.TestDataSource; class TableTXInfo extends TableBase { BankImpl _bank; int _nextid; TableTXInfo(TestDataSource ds, BankImpl bank) { | super(ds, bank.bankname()+"_TXINFO"); | | _bank = bank; | _nextid = 1; | | _sqlCreate = "CREATE TABLE "+_tabname | + " (TXINFOID INTEGER" | + " ,TXINFODATA LONG VARBINARY)"; | | _sqlSelAll = "SELECT TXINFOID, TXINFODATA" | + " FROM "+_tabname; | | _sqlSelOne = "SELECT TXINFODATA" | + " FROM "+_tabname+" WHERE TXINFOID=?"; | | _sqlInsert = "INSERT INTO "+_tabname | + " (TXINFOID, TXINFODATA)" | + " VALUES (?,?)"; | | _sqlDelete = "DELETE FROM "+_tabname | + " WHERE TXINFOID=?"; } synchronized int getNextID() { | return _nextid++; } // // LOAD TABLE // void loadRow(ResultSet rset) throws SQLException { | int id = rset.getInt(1); | InputStream is = rset.getBinaryStream(2); | ActivityImpl activity = null; | try { | | ObjectInputStream ois = new ObjectInputStream(is); | | activity = (ActivityImpl)ois.readObject(); | | ois.close(); | } catch (Exception ex) { | | throw new SQLException("Unable to Read TXINFO: "+ex); | } | | if (id > _nextid) | _nextid = id + 1; | | try { | | _bank.loadTXInfo(id, activity); | } catch (Exception e) { | | throw new SQLException("BankImpl.loadTXInfo : "+e); | } } // // INSERT ROW // int insertRow(int id, Serializable obj) throws Exception { | Utils.ifdp("tbTXinfo-insertRow: ["+id+"]"); | return insertRow(new TxRowData(id, obj)); } void prepIns(PreparedStatement stmt, Object rowdata) throws SQLException { | TxRowData row = (TxRowData)rowdata; | byte[] data; | // Serialize Object into byte array... | // | ByteArrayOutputStream bos = new ByteArrayOutputStream(); | try { | | ObjectOutputStream oos = new ObjectOutputStream(bos); | | oos.writeObject(row._obj); | | oos.flush(); | | oos.close(); | } catch (IOException ex) { | | throw new SQLException("Unable to write TXINFO: "+ex); | } | data = bos.toByteArray(); | // | stmt.setInt (1, row._id); | stmt.setBinaryStream (2, new ByteArrayInputStream(data), data.length); } // // SELECT ROW // float selectRow(int id) throws Exception { | throw new Exception("NO-IMPLEMENTATION"); } void prepSel(PreparedStatement stmt, Object seldata) throws SQLException { | throw new SQLException("NO-IMPLEMENTATION"); } void loadSel(ResultSet rset, Object seldata) throws SQLException { | throw new SQLException("NO-IMPLEMENTATION"); } // // DELETE ROW // void deleteRow(int id) { | Utils.ifdp("tbTXinfo-deleteRow: ["+id+"]"); | super.deleteRow(new Integer(id)); } void prepDel(PreparedStatement stmt, Object rowdata) throws SQLException { | stmt.setInt(1, ((Integer)rowdata).intValue()); } // // UPDATE ROW // void updateRow(int id1, int id2) throws Exception { | throw new Exception("NO-IMPLEMENTATION"); } void prepUpd(PreparedStatement stmt, Object rowdata) throws SQLException { | throw new SQLException("NO-IMPLEMENTATION"); } private class TxRowData { | int _id; | Serializable _obj; | TxRowData(int id, Serializable obj) | { | | _id = id; | | _obj = obj; | } } }
Copyright © 2000-2003, Novell, Inc. All rights reserved. |