import com.novell.ldap.*;
import com.novell.ldap.extensions.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Date;
import java.io.UnsupportedEncodingException;
import com.novell.ldap.util.LDIFReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
public class LburpClient extends Thread{
final static int WINDOWSIZE = 10;
static int tranSize;
static int seqNumber;
static int winSize;
private Object mutex;
static boolean notEmpty;
static boolean notFull;
static LDAPConnection ld;
static LBURPReplyWindow LRWin;
final static int FIN_FLAG = -2;
void StartLburpOperation(LDAPConnection ldc,
FileInputStream fis)
{
int version = 1;
LDIFReader reader = null;
LDAPMessage msg, retMsg;
LDAPLburpRequest[] sreq;
LDAPExtendedOperation request;
LDAPExtendedResponse sresponse;
LDAPResponseQueue aresponse;
LDAPExtendedResponse endresponse;
int currSize=0;
seqNumber = 0;
try
{
ld=ldc;
reader = new LDIFReader(fis, version);
request = new LburpStartRequest(LburpConstants.LBURPFullUpdateOID);
sresponse = ld.extendedOperation(request);
if ( (sresponse.getResultCode() == LDAPException.SUCCESS) &&
( sresponse instanceof LburpStartResponse )) {
tranSize = ((LburpStartResponse)sresponse).getTranSize();
}else {
System.out.println("Could not get Transaction information.\n");
throw new LDAPException( sresponse.getErrorMessage(),
sresponse.getResultCode(),
(String)null);
}
sreq = new LDAPLburpRequest[tranSize];
for(int i=0;i<tranSize;++i)
sreq[i]=null;
LRWin=new LBURPReplyWindow(tranSize);
while ( true ) {
msg = reader.readMessage();
if( msg != null )
{
sreq[currSize]=new LDAPLburpRequest(msg);
LRWin.LRArray[(seqNumber+1)%WINDOWSIZE].opList[currSize]=
sreq[currSize];
++currSize;
}
if( (msg == null ) && ( currSize != tranSize ) )
{
++seqNumber;
if (currSize != 0)
{
if (seqNumber == 1)
{
winSize=0;
mutex = new Object();
notFull=true;
notEmpty=false;
this.start();
}
// {
while(winSize >= WINDOWSIZE-1)
{
try {mutex.wait();}catch(Exception e){}
}
request = new LburpOperationRequest(sreq,seqNumber);
try{
aresponse = ld.extendedOperation(request,
(LDAPResponseQueue)null);
LRWin.LRArray[seqNumber%WINDOWSIZE].seqNumber=seqNumber;
LRWin.LRArray[seqNumber%WINDOWSIZE].msgQ=aresponse;
}catch(LDAPException e){
if( e.getResultCode() != 80)
throw e;
}
synchronized(mutex){
++winSize;
notFull=false;
mutex.notifyAll();
}
++seqNumber;
}
// {
while(winSize >= WINDOWSIZE-1)
{
try {mutex.wait();}catch(Exception e){}
}
LRWin.LRArray[seqNumber%WINDOWSIZE].seqNumber=FIN_FLAG;
synchronized(mutex){
++winSize;
notFull=false;
mutex.notifyAll();
}
LDAPExtendedOperation ereq= new LburpEndRequest(seqNumber++);
endresponse = ld.extendedOperation(ereq);
return;
}
if ( currSize == tranSize )
{
++seqNumber;
if(seqNumber == 1)
{
winSize=0;
mutex = new Object();
notFull=true;
notEmpty=false;
this.start();
}
synchronized(mutex)
{
while(winSize >= WINDOWSIZE-1)
{
try {mutex.wait();}catch(Exception e){}
}
}
request = new LburpOperationRequest(sreq,seqNumber);
try{
aresponse = ld.extendedOperation(request,
(LDAPResponseQueue)null);
LRWin.LRArray[seqNumber%WINDOWSIZE].seqNumber=seqNumber;
LRWin.LRArray[seqNumber%WINDOWSIZE].msgQ=aresponse;
}catch(LDAPException e){
if( e.getResultCode() != 80)
throw e;
}
synchronized(mutex){
++winSize;
notFull=false;
mutex.notifyAll();
}
for(int i=0;i<tranSize;++i)
sreq[i]=null;
currSize=0;
}
}
}catch( LDAPException e ) {
System.out.println( "Error: " + e.toString() );
System.exit( 1 );
}catch ( IOException ioe ) {
System.out.println("Error: " + ioe.toString() );
System.exit( 1 );
}catch( Exception e ) {
System.out.println( "Error: " + e.toString() );
e.printStackTrace();
System.exit( 1 );
}
return;
}
public void run() {
int LRIdx = 1;
// {
while(winSize == 0)
{
try {mutex.wait();}catch(Exception e){}
}
while(true){
if( LRWin.LRArray[LRIdx % WINDOWSIZE].seqNumber == FIN_FLAG )
{
break;
}
// {
while(winSize <= 0)
{
try {mutex.wait();}catch(Exception e){}
}
try{
LDAPResponseQueue lresponse =
LRWin.LRArray[LRIdx % WINDOWSIZE].msgQ;
while(true){
if(lresponse.isResponseReceived())
break;
}
LDAPMessage lmsg=lresponse.getResponse();
if(lmsg instanceof LburpOperationResponse)
{
HashMap resp=((LburpOperationResponse)lmsg).getResponse();
for(int i=1;i<LRWin.tranSize;++i) {
int rec= i+ (LRWin.LRArray[LRIdx % WINDOWSIZE].seqNumber-1)*LRWin.tranSize;
String emsg=(String)resp.get(new Integer(i));
if(emsg !=null )
System.err.println("Record Number: "+ rec + " Error: " + emsg);
}
}
}catch(LDAPException e){
e.printStackTrace();
}
synchronized(mutex){
--winSize;
notEmpty=true;
mutex.notifyAll();
}
++LRIdx;
}
return;
}
}
class LBURPReplyWindow {
protected class LBURPReply{
int seqNumber;
LDAPResponseQueue msgQ;
LDAPLburpRequest[] opList;
LBURPReply(int sn,
LDAPResponseQueue Q,
LDAPLburpRequest[] opl)
{
seqNumber=sn;
msgQ=Q;
opList=opl;
}
}
int tranSize;
LBURPReply[] LRArray;
LBURPReplyWindow(int tSize)
{
tranSize=tSize;
LRArray = new LBURPReply[LburpClient.WINDOWSIZE];
for(int i=0;i<LburpClient.WINDOWSIZE;++i)
{
LRArray[i] = new LBURPReply(-1,null,new LDAPLburpRequest[tranSize]);
}
}
}