OnePhaseResource in JTA Transactions

This example illustrates the last resource optimization feature of the Novell exteNd Messaging Platform's JTS where a non-XA capable database can participate in a distributed transaction with its connections being wrapped with the OnePhaseResource interface.

The source files are:

Test1PH.java            // Test program using two XAResources and OnePhaseResource
XAResourceWrapper.java  // The OnePhaseResource class that wrappers a JDBC connection.

1 Compiling and Executing

Use the supplied ANT script to compile and execute the example programs. To compile run:
ant
To execute the tests run:
ant run

2 Test1PH Implementation

This program illustrates how to initialize the transaction manager, and manage a distributed transaction with multiple XA capable databases and one OnePhaseResource.
package tx1ph;
                                                                           
import java.util.Properties;
                                                                           
import java.sql.*;
import javax.sql.*;
                                                                           
import javax.transaction.xa.XAResource;
import javax.transaction.TransactionManager;
                                                                           
import org.omg.CORBA.ORB;
                                                                           
import com.sssw.jts.api.TransactionService;
import com.sssw.jbroker.api.transaction.TSIdentification;
                                                                           
import common.ResourceMapper;
import common.TestDataSource;
                                                                           
public class Test1PH
{
    private static TestDataSource  _db1;
    private static TestDataSource  _db2;
    private static TestDataSource  _db3;
                                                                           
    public static void main(String args[]) throws Exception
    {
    |   // Create a ResourceHandleMapper
    |   ResourceMapper resMapper = new ResourceMapper();
    |                                                                      
    |   System.setProperty("transaction.service.id", "EXAMPLE-RECRESOURCE");
    |                                                                      
    |   // Initialize the ORB and get instance of TM
    |   ORB orb = ORB.init(args, null);
    |   TransactionService ts = (TransactionService)
    |       orb.resolve_initial_references("TransactionService");
    |                                                                      
    |   //
    |   // Recover TM with COLD Start.
    |   //
    |   ts.recover(resMapper, true, null);
    |                                                                      
    |   TransactionManager tm = ts.getTransactionManager();
    |                                                                      
    |   //
    |   _db1 = new TestDataSource(TestDataSource._testDB1);
    |   _db2 = new TestDataSource(TestDataSource._testDB2);
    |   _db3 = new TestDataSource(TestDataSource._testDB3);
    |                                                                      
    |   // create test tables
    |   createTables();
    |                                                                      
    |   // Establish connections to 3 databases
    |   //  One Non-XA with OnePhaseResource wrapper.
    |   //  Two XAConnections
    |   Connection          conn1 = _db1.getConnection();
    |   XAResourceWrapper   xars1 = new XAResourceWrapper(conn1);
    |   resMapper.addResource(xars1);
    |                                                                      
    |   XAConnection        xaco2 = _db2.getXAConnection();
    |   XAResource          xars2 = xaco2.getXAResource();
    |   Connection          conn2 = xaco2.getConnection();
    |   resMapper.addResource(xars2);
    |                                                                      
    |   XAConnection        xaco3 = _db3.getXAConnection();
    |   XAResource          xars3 = xaco3.getXAResource();
    |   Connection          conn3 = xaco3.getConnection();
    |   resMapper.addResource(xars3);
    |                                                                      
    |                                                                      
    |   // Start a transaction
    |   System.out.println("Begin First Transaction");
    |   tm.begin();
    |   tm.getTransaction().enlistResource(xars1);
    |   tm.getTransaction().enlistResource(xars2);
    |   tm.getTransaction().enlistResource(xars3);
    |                                                                      
    |   System.out.println("Insert values(11, 12) into "+_db1+" - "+TestDataSource._testTB1);
    |   insertValue(conn1, TestDataSource._testTB1, "11", "12");
    |                                                                      
    |   System.out.println("Insert values(11, 12) into "+_db2+" - "+TestDataSource._testTB2);
    |   insertValue(conn2, TestDataSource._testTB2, "11", "12");
    |                                                                      
    |   System.out.println("Insert values(11, 12) into "+_db3+" - "+TestDataSource._testTB3);
    |   insertValue(conn3, TestDataSource._testTB3, "11", "12");
    |                                                                      
    |   tm.getTransaction().delistResource(xars1, XAResource.TMSUCCESS);
    |   tm.getTransaction().delistResource(xars2, XAResource.TMSUCCESS);
    |   tm.getTransaction().delistResource(xars3, XAResource.TMSUCCESS);
    |   tm.rollback();
    |   System.out.println("Rollback First Transaction");
    |   // Ended the Transaction with ROLLBACK
    |                                                                      
    |   // Make sure rollback removed the rows
    |   checkNotExists(_db1, TestDataSource._testTB1, "11", "12");
    |   checkNotExists(_db2, TestDataSource._testTB2, "11", "12");
    |   checkNotExists(_db3, TestDataSource._testTB3, "11", "12");
    |                                                                      
    |   // Start a second transaction
    |   System.out.println("Begin Second Transaction");
    |   tm.begin();
    |   tm.getTransaction().enlistResource(xars3);
    |   tm.getTransaction().enlistResource(xars2);
    |   tm.getTransaction().enlistResource(xars1);
    |                                                                      
    |   System.out.println("Insert values(21, 22) into "+_db1+" - "+TestDataSource._testTB1);
    |   insertValue(conn1, TestDataSource._testTB1, "21", "22");
    |                                                                      
    |   System.out.println("Insert values(21, 22) into "+_db2+" - "+TestDataSource._testTB2);
    |   insertValue(conn2, TestDataSource._testTB2, "21", "22");
    |                                                                      
    |   System.out.println("Insert values(21, 22) into "+_db3+" - "+TestDataSource._testTB3);
    |   insertValue(conn3, TestDataSource._testTB3, "21", "22");
    |                                                                      
    |   tm.getTransaction().delistResource(xars1, XAResource.TMSUCCESS);
    |   tm.getTransaction().delistResource(xars2, XAResource.TMSUCCESS);
    |   tm.getTransaction().delistResource(xars3, XAResource.TMSUCCESS);
    |   tm.commit();
    |   System.out.println("Commit Second Transaction");
    |   // Ended the Transaction with COMMIT
    |                                                                      
    |   // Make sure commit completed...
    |   checkExists(_db1, TestDataSource._testTB1, "21", "22");
    |   checkExists(_db2, TestDataSource._testTB2, "21", "22");
    |   checkExists(_db3, TestDataSource._testTB3, "21", "22");
    |                                                                      
    |   conn1.close();
    |   xaco2.close();
    |   xaco3.close();
    |                                                                      
    |   _db1.shutdown();
    |   _db2.shutdown();
    |   _db3.shutdown();
    }
                                                                           
    //
    // DATABASE Connection Initialization
    //
    static Connection getSQLConnection(String dbname) throws Exception
    {
    |   Class.forName("COM.cloudscape.core.JDBCDriver").newInstance();
    |   return DriverManager.getConnection("jdbc:cloudscape:../../"+dbname+";create=true");
    }
                                                                           
    //
    // Create test tables
    //
    static void createTables() throws Exception
    {
    |   Connection conn;
    |   conn = _db1.getConnection();
    |   createTable(conn, TestDataSource._testTB1);
    |   conn.close();
    |                                                                      
    |   conn = _db2.getConnection();
    |   createTable(conn, TestDataSource._testTB2);
    |   conn.close();
    |                                                                      
    |   conn = _db3.getConnection();
    |   createTable(conn, TestDataSource._testTB3);
    |   conn.close();
    }
                                                                           
    //
    // Create test table
    //
    static void createTable(Connection conn, String tablename) throws Exception
    {
    |   Statement stmt = null;
    |   try 
    |   {
    |   |   stmt = conn.createStatement();
    |   |   try
    |   |   {
    |   |   |   int nrow = stmt.executeUpdate("DELETE FROM "+tablename);
    |   |   |   System.out.println("Deleted "+nrow+" rows from table "+tablename);
    |   |   }
    |   |   catch (SQLException ex)
    |   |   {
    |   |   |   stmt.execute("CREATE TABLE "+tablename+"(COL1 INTEGER, COL2 INTEGER)");
    |   |   |   System.out.println("Created table "+tablename);
    |   |   }
    |   }
    |   finally
    |   {
    |   |   if (stmt != null) { try { stmt.close(); } catch (Exception ex) {} }
    |   }
    }
                                                                           
    static void insertValue(Connection conn, String tab, String v1, String v2)
    throws Exception
    {
    |   Statement stmt = conn.createStatement();
    |   stmt.execute("INSERT INTO "+tab+" (COL1, COL2) VALUES ("+v1+", "+v2+")");
    |   stmt.close();
    }
                                                                           
    static void checkNotExists(TestDataSource db, String tab, String v1, String v2)
    throws Exception
    {
    |   Connection  conn;
    |   Statement   stmt;
    |   ResultSet   rset;
    |   conn = db.getConnection();
    |   stmt = conn.createStatement();
    |   rset = stmt.executeQuery("SELECT COL2 FROM "+tab+" WHERE COL1="+v1);
    |   if (rset.next())
    |   {
    |   |   System.out.println("FAILED: Value("+v1+","+rset.getInt(1)+") in "+db+"-"+tab);
    |   }
    |   else
    |   {
    |   |   System.out.println("Value("+v1+","+v2+") not in table "+db+"-"+tab+" ... Ok");
    |   }
    |   rset.close();
    |   stmt.close();
    |   conn.close();
    }
    static void checkExists(TestDataSource db, String tab, String v1, String v2)
    throws Exception
    {
    |   Connection  conn;
    |   Statement   stmt;
    |   ResultSet   rset;
    |   conn = db.getConnection();
    |   stmt = conn.createStatement();
    |   rset = stmt.executeQuery("SELECT COL2 FROM "+tab+" WHERE COL1="+v1);
    |   if (!rset.next())
    |   {
    |   |   System.out.println("FAILED: Value("+v1+","+v2+") not in "+db+"-"+tab);
    |   }
    |   else
    |   {
    |   |   System.out.println("Values("+v1+","+rset.getInt(1)+") in table "+db+"-"+tab+" ... Ok");
    |   }
    |   rset.close();
    |   stmt.close();
    |   conn.close();
    }
}

3 XAResourceWrapper Implementation

This is the implementation of the OnePhaseResource that wrappers a JDBC Connection as an XAResource.
package tx1ph;
                                                                           
import java.sql.*;
                                                                           
import javax.transaction.xa.Xid;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.XAException;
                                                                           
import com.sssw.jts.api.OnePhaseResource;
                                                                           
//
// This class is provides a XAResource layer to a JDBC Connection,
// JTS will only call this XAResource for One Phase commits since
// it implements OnePhaseResource.
//
class XAResourceWrapper implements OnePhaseResource
{
    Connection _conn;
    XAResourceWrapper(Connection conn)
    {
    |   _conn = conn;
    }
    public boolean isSameRM(XAResource res) throws XAException
    {
    |   return (res instanceof XAResourceWrapper);
    }
    public boolean setTransactionTimeout(int t) throws XAException
    {
    |   return true;
    }
    public int getTransactionTimeout() throws XAException
    {
    |   return 60;
    }
    public Xid[] recover(int flags) throws XAException
    {
    |   return null;
    }
    public void forget(Xid xid) throws XAException
    {
    }
    public void start(Xid xid, int flags) throws XAException
    {
    |   if (flags == TMRESUME)
    |   {
    |   |   // Nothing to do ...
    |   }
    |   else if (flags == TMNOFLAGS)
    |   {
    |   |   try
    |   |   {
    |   |   |   // Turn auto commit off.
    |   |   |   _conn.setAutoCommit(false);
    |   |   }
    |   |   catch (SQLException e)
    |   |   {
    |   |   |   throw new XAException(e.toString());
    |   |   }
    |   }
    |   else
    |   {
    |   |   throw new XAException(XAException.XAER_RMERR);
    |   }
    }
    public void end(Xid xid, int flags) throws XAException
    {
    }
    public void commit(Xid xid, boolean onePhase) throws XAException
    {
    |   try
    |   {
    |   |   _conn.commit();
    |   }
    |   catch (SQLException e)
    |   {
    |   |   throw new XAException(e.toString());
    |   }
    }
    public void rollback(Xid xid) throws XAException
    {
    |   try
    |   {
    |   |   _conn.rollback();
    |   }
    |   catch (SQLException e)
    |   {
    |   |   throw new XAException(e.toString());
    |   }
    }
    public int prepare(Xid xid) throws XAException
    {
    |   throw new XAException(XAException.XAER_RMERR);
    }
}
Copyright © 2000-2003, Novell, Inc. All rights reserved.