1 /*
  2  * Copyright © [2008-2009] Novell, Inc.  All Rights Reserved.
  3  * 
  4  * USE AND REDISTRIBUTION OF THIS WORK IS SUBJECT TO THE DEVELOPER LICENSE AGREEMENT
  5  * OR OTHER AGREEMENT THROUGH WHICH NOVELL, INC. MAKES THE WORK AVAILABLE.  THIS WORK 
  6  * MAY NOT BE ADAPTED WITHOUT NOVELL'S PRIOR WRITTEN CONSENT.
  7  * 
  8  * NOVELL PROVIDES THE WORK "AS IS," WITHOUT ANY EXPRESS OR IMPLIED WARRANTY, 
  9  * INCLUDING WITHOUT LIMITATION THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR 
 10  * A PARTICULAR PURPOSE, AND NON-INFRINGEMENT.  NOVELL, THE AUTHORS OF THE WORK, AND THE 
 11  * OWNERS OF COPYRIGHT IN THE WORK ARE NOT LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER 
 12  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT, OR OTHERWISE, ARISING FROM, OUT OF, 
 13  * OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS IN THE WORK.
 14  */
 15 
 16 /**
 17  * @fileoverview
 18  * This file defines the Connector class, which represents the Connector(s) that pass
 19  * input data to the Collector.
 20  * @name Connector Class
 21  */
 22 
 23 /**
 24  * This class represents the Connector(s) passing data to the Collector.
 25  * Connectors actually feed a single inbound queue of event data, so you cannot 
 26  * address them individually. This has implications for scenarios where you must
 27  * send data to the Connector - in this case you can only have a single Connector
 28  * per Collector (DB Connector, for example).
 29  * The template handles most things to do with reading and writing to the Connector,
 30  * but in some unusual circumstances you may need to use this class.
 31  * @author   Novell Engineering
 32  * @version  6.1
 33  * @constructor
 34  * @param {String} uuid  UUID of the Connector in ESM
 35  */
 36 function Connector(uuid) {
 37 	
 38 	if (typeof uuid != "undefined" && typeof ESM != "undefined") {
 39   	this.UUID = uuid;
 40   	this.connectorObject = ESM.groupForId(this.UUID);
 41   }
 42 		
 43 	/**
 44 	 * Starts this Connector in ESM.
 45 	 */
 46 	this.start = function() {
 47 		if( typeof this.connectorObject != "undefined" ) {
 48 			this.connectorObject.start();
 49 		}
 50 	};
 51 
 52 	/**
 53  	* Stops this Connector in ESM.
 54  	*/
 55 	this.stop = function() {
 56 		if( typeof this.connectorObject != "undefined" ) {
 57 			this.connectorObject.stop();
 58 		}
 59 	};
 60 	
 61 	/**
 62 	 * Adds a default parser that will automatically be called to determine the current offset.
 63 	 * The parser should be defined as a function that can be attached to a SQLQuery.
 64 	 * instance.PARSER is provided as a convenient place to store pre-defined parser routines.
 65 	 * Any time a new event source is discovered and a SQLQuery is allocated for it, this default parser
 66 	 * will be attached to it.
 67 	 * <p>
 68 	 * Example:
 69 	 * <pre>
 70 	 * // ...in initialize() method
 71 	 * instance.PARSER.getOffset = function(input) {
 72 	 *   return input.RXMap.RecNumber;
 73 	 *   ...
 74 	 * };
 75 	 * conn.addParser(instance.PARSER.getOffset);
 76 	 * </pre>
 77 	 * Note that you could simply create a single parser that will be assigned by default to all SQLQuery objects,
 78 	 * but you can also predefine several parsers and dynamically attach them directly to SQLQuery objects; see
 79 	 * the SQLQuery object API for details.
 80 	 * @see SQLQuery
 81 	 * @param {Function} parser  - The parser function to attach to this session
 82 	 * @return {Boolean} Result
 83 	 */
 84 	this.addParser = function( parser ) {
 85 			if ( typeof parser != 'function' ) { return false; }
 86 			this.Parser = parser;
 87 			return true;
 88 	};
 89 		
 90 	/**
 91 	 * The parser used to pull the latest offset out of the last database record.
 92 	 */
 93 	this.Parser = function(input) {
 94 		return true;
 95 	};
 96 
 97 	/**
 98 	 * Enable suboffset handling for SQLQuery objects and add a handler that will be automatically called to determine the suboffset.
 99 	 * Many databases don't use an incremental event ID that can be guaranteed to be unique per event, and properly increment over time.
100 	 * To handle this scenario, we commonly query based on a timestamp offset, but in many cases the resolution of the timestamp is such that 
101 	 * we may get multiple events for any given offset. Since we can't guarantee that we will get all the events assigned to a single offset
102 	 * (either because we hit MaxRows or because some of the events hadn't been generated yet), we need to query the same offset again until we
103 	 * see the next offset in the received data (which implies that we've passed into the next timeslice).
104 	 * The resulting issue, however, is that we may get duplicate events in subsequent queries which we must filter out be keeping track of 
105 	 * which events we've already processed. To do this, we need to have a way to uniquely identify each event by some field or combination of fields, 
106 	 * but this value need not be incremental.
107 	 * This feature uses an internal cache of processed events, plus a registered function to extract the unique value from processed events.
108 	 * @example
109 	 * <p>
110 	 * Example:
111 	 * <pre>
112 	 * // ...in initialize() method
113 	 * instance.PARSER.getSubOffset = function(input) {
114 	 *   return input.RXMap.RecID;
115 	 *   ...
116 	 * };
117 	 * conn.addSubParser(instance.PARSER.getSubOffset);
118 	 * </pre>
119 	 * Note that if you define a subparser, it will be automatically attached to all SQLQuery objects that are allocated for event sources.
120 	 * If you have some event sources that use suboffsets and some that don't, you'll manually have to disable suboffsets for those SQLQuery objects
121 	 * that don't use suboffsets (see SQLQuery API).
122 	 * @param {Function} parser  - The parser function to attach to this session
123 	 * @return {Boolean} Result
124 	 * @see SQLQuery 
125 	 */
126 	this.addSubParser = function( subparser ) {
127 		if ( typeof subparser != 'function' ) { return false; }
128 		this.SubParser = subparser;
129 		instance.CONFIG.params.SubOffsets = true;
130 		return true;
131 	}
132 	
133 	/**
134 	 * The parser used to pull the latest suboffset out of the last database record.
135 	 */
136 	this.SubParser = function(input) {
137 		return true;
138 	};
139 
140 		
141 	if ( typeof scriptEnv != "undefined" ) { // This is an Action - short circuit
142 		return true;
143 	}
144 	
145 	if ( instance.CONFIG.params.Conn_Retry != "none" ) {
146 		// Set up queue and invoke separate thread to feed the queue with ScriptEngineContext.readData()
147 		var queue = new java.util.concurrent.SynchronousQueue();
148 		this.readThread = java.lang.Thread(function() {
149 			var ctx = instance.CONFIG.scriptContext;
150 			while(ctx.getKeepRunning()) {
151 				queue.put(ctx.readData());
152 			}
153 		});
154 		this.readThread.setName("collector-read-thread-"+this.UUID);
155 		this.readThread.setDaemon(true);
156 		this.readThread.start();
157 	
158 		this.retries = 1; // Initial retries for falloff
159 	
160 		function getData(secs){
161 		if (typeof secs == "undefined") {
162 			return queue.poll(0, java.util.concurrent.TimeUnit.SECONDS);
163 		}	
164 		if (secs > 0) {
165 			return queue.poll(secs, java.util.concurrent.TimeUnit.SECONDS);
166 		}
167 		else 
168 			if (secs == 0) {
169 				return queue.take();
170 			}
171 		}
172 	}
173 
174 	
175 	/**
176 	 * This method reads the next set of input data from the Connector queue.
177 	 * Connector.read() returns a Record object that is a map of the input data received plus Connector
178 	 * meta-information.
179 	 * @return {Record}  The record received from the Connector queue, basically a hash map
180 	 */
181 	this.read = function() {
182 		var record = new Record();
183 		try {
184 			if (instance.CONFIG.params.Conn_Retry != "none") {
185 				if (instance.CONFIG.params.Conn_Retry == "fixed") {
186 					record.connectorData = getData(instance.CONFIG.params.RX_Timeout);
187 				} else {
188 					for (var i=0; i < this.retries; i++) {
189 						// short retry to ensure that we check if we should keep running
190 						record.connectorData = getData(5);
191 						// Check if the Collector was stopped
192 						if(!ScriptEngineUtil.keepRunning() || record.connectorData != null) {
193 							break;
194 						}
195 						// else loop - no data yet
196 					}
197 				}
198 			}
199 
200 			if ( record.connectorData === null || typeof record.CONNECTION_ERROR != "undefined" ) {
201 				/* If we didn't get any data, or the data we got was a notification that another
202 				 * query was needed, then set NODATA on the record.
203 				 */
204 				record.CONNECTION_ERROR = record.CONNECTION_ERROR ? record.CONNECTION_ERROR : "NODATA";
205 				if (this.retries < 24) {
206 					/* If the last max number of retries we did was less than 24, then
207 					 * set max number of retries higher so that we'll retry more next time.
208 					 */
209 					this.retries = this.retries * 2;
210 				}
211 				return record; 
212 			}
213 						
214 			// We got data!
215 			this.retries = 1; // we got data so reset retries to 1
216 			var connectorUnmodData = record.connectorData.getUnmodifiableData();
217 			var entries = connectorUnmodData.entrySet().toArray();
218 			for (var j = entries.length-1; j > -1; --j) {  //  DGC Check this!!!
219 				var key = String(entries[j].key);
220 				var value = String(entries[j].value);
221 				
222 				// Put anything starting with "s_" or "i_" into the record.
223 				if (key.length >= 2 && key.charAt(1) == '_' && (key.charAt( 0) == 's' || key.charAt( 0) == 'i')) {
224 					record[key] = value;
225 				} else {
226 					switch (key) {
227 						case "CONNECTION_METHOD":
228 						case "CONNECTION_MODE":
229 						case "o":
230 						case "Row_Left":
231 						case "Application":
232 						case "CurrentFile":
233 						case "SourceIP":
234 						case "QueryID":
235 							record[key] = value;
236 							break;
237 						default:
238 							record.RXMap[key] = value;
239 							break;
240 					}
241 				}
242 					
243 			}
244 		} catch(err) {
245 			record.CONNECTION_ERROR = err;
246 			log("Error reading next connector data object: "+err);
247 			return record;
248 		}
249 			
250 		// Handle re-read of files created by Connectors ("Replay" mode)
251 		if (typeof record.CONNECTION_METHOD != 'undefined' && record.CONNECTION_MODE == "Replay") {
252 			switch (record.s_RXBufferString[0]) {
253 				case "{":  // New-style JSON dump
254 					var tmpObj = JSON.parse(record.s_RXBufferString);
255 					break;
256 				default:   // old-style dump
257 					// this is a weird quoted thingy - replace the double quotes first
258 					var tmpString = record.s_RXBufferString.replace(/\"\"/g, "@QUOTE@");
259 					tmpString = tmpString.replace(/\"/g, "@EMBEDQUOTE@");
260 					tmpString = tmpString.replace(/@QUOTE@/g, "\"");
261 					tmpString = tmpString.trim();  // remove trailing whitespace
262 					tmpString = tmpString.replace(/\\n/g, "\n");  // get rid of encoded newlines
263 					tmpString = tmpString.replace(/\/x(\d\d\d\d)/g, "\u$1"); // and convert Java unicode to JS unicode for some common separators
264 					var tmpObj = tmpString.parseNVP(" ", "=", "\"");
265 					if (typeof tmpObj.s_RXBufferString != "undefined") {
266 						tmpObj.s_RXBufferString = tmpObj.s_RXBufferString.replace(/@EMBEDQUOTE@/g, "\"");
267 					}
268 					if (typeof tmpObj.s_Body != "undefined") {
269 						tmpObj.s_Body = tmpObj.s_Body.replace(/@EMBEDQUOTE@/g, "\"");
270 					}
271 					break;
272 				// End of Connector Dump switch
273 			}
274 			for (var key in tmpObj) {
275 				// replace input map with replayed values
276 				if (tmpObj.hasOwnProperty(key)) {
277 					var value = String(tmpObj[key]);
278 					// Put anything starting with "s_" or "i_" into the record.
279 					if (key.length >= 2 && key.charAt(1) == '_' && (key.charAt( 0) == 's' || key.charAt( 0) == 'i')) {
280 						record[key] = value;
281 					} else {
282 						switch (key) {
283 							case "CONNECTION_METHOD":
284 							case "CONNECTION_MODE":
285 							case "o":
286 							case "Row_Left":
287 							case "Application":
288 							case "CurrentFile":
289 							case "SourceIP":
290 							case "QueryID":
291 								record[key] = value;
292 								break;
293 							default:
294 								record.RXMap[key] = value;
295 								break;
296 						}
297 					}
298 				}
299 			}
300 		}  // End of Replay processing
301 		
302 		/* Databases are a bit special, mostly because we have to query them interactively. The general way that the Connector
303 		 * works is that it will send us a special "offset" record when it is first started to tell us where we left off, 
304 		 * then we send it a query, then the Connector gets a batch of data which it feeds to us one record at a time.
305 		 * If the query is a normal SQL query, the Row_Left variable will count down from "batchsize" to 0.
306 		 * If it's a stored procedure, I think right now it counts up.
307 		 * When we see the last record, we need to re-issue the query.
308 		 * We will also get a Row_Left=-1 record when a query returns no data (unlike other Connectors, which block).
309 		 */
310 		if (record.CONNECTION_METHOD == "DATABASE") {	// this is a DB Connector
311 			// First, check if we have seen any data from this particular event source before
312 			// If not, allocate a new SQLQuery object and associate it with the event source; initialize with the default query and parsers.
313 			if (typeof instance.QUERIES[record.s_RV24] == "undefined") {
314 				if(typeof instance.CONFIG.params.Query_Variant != "undefined") {
315 					instance.QUERIES[record.s_RV24] = new SQLQuery("sqlquery" + instance.CONFIG.params.Query_Variant + ".base");
316 				} else {
317 					instance.QUERIES[record.s_RV24] = new SQLQuery();
318 				}
319 				
320 				instance.QUERIES[record.s_RV24].addParser(this.Parser);
321 				if (instance.CONFIG.params.SubOffsets == true) { instance.QUERIES[record.s_RV24].addSubParser(this.SubParser); }
322 				
323 				// Special support for a test query specified in a parameter; DELETE PARAM FROM RELEASE COLLECTORS
324 				if(typeof instance.CONFIG.params.Test_Query != "undefined" && instance.CONFIG.params.Test_Query != "") {
325 					instance.QUERIES[record.s_RV24].baseQuery = instance.CONFIG.params.Test_Query;
326 				}	
327 			}
328 
329 			// Next, handle special records that should trigger new queries or other special behavior
330 			switch(record.Row_Left) {
331 				case "-2":
332 					instance.QUERIES[record.s_RV24].setOffset(record.connectorData.offset);
333 					// This handles a case where multiple -2 records are sitting in the Connector queue and have to be consumed before
334 					// we get to the results from our query
335 					if (instance.QUERIES[record.s_RV24].queryDelay > -1) {
336 						conn.send(instance.QUERIES[record.s_RV24]);
337 						instance.QUERIES[record.s_RV24].queryDelay = -1;
338 					}
339 					// Send the initial query to the DB
340 					record.CONNECTION_ERROR = "OFFSET_NOT_EVENT";
341 					return record;
342 				case "-1":
343 					// This means that no data was returned from our query, which means that we should pause, then query again
344 					// We keep the status and schedule for the next query in the SQLQuery object
345 					instance.QUERIES[record.s_RV24].needQuery = true;
346 					if (instance.QUERIES[record.s_RV24].queryDelay > 0 && instance.QUERIES[record.s_RV24].queryDelay < 33) {
347 						instance.QUERIES[record.s_RV24].queryDelay=instance.QUERIES[record.s_RV24].queryDelay * 2;
348 					} else {
349 						instance.QUERIES[record.s_RV24].queryDelay = 1;
350 					}
351 					instance.QUERIES[record.s_RV24].queryScheduled = new Date().addSeconds(instance.QUERIES[record.s_RV24].queryDelay * 2);
352 					instance.QUERIES[record.s_RV24].noNewData = true;
353 					instance.QUERIES[record.s_RV24].fullBatch = false;
354 					record.CONNECTION_ERROR = "NODATA";
355 					return record;
356 				case "0":
357 					instance.QUERIES[record.s_RV24].needQuery = true
358 				default:
359 					// Note since there's no break, this is also executed for case 0
360 					instance.QUERIES[record.s_RV24].setOffset(instance.QUERIES[record.s_RV24].Parser(record));
361 					instance.QUERIES[record.s_RV24].queryDelay = 0;
362 					break;
363 			}
364 			
365 			// Special suboffset handling
366 			if ( instance.QUERIES[record.s_RV24].SubOffsets == true ) {
367 				// Check if we got a full batch back and it's the first record in the batch 
368 				if ( record.CONNECTION_MODE == "sp" )  { // Handle stored procedures, which count the wrong way
369 					if (record.Row_Left >= instance.QUERIES[record.s_RV24].maxRows) {
370 						instance.QUERIES[record.s_RV24].fullBatch = true;
371 					}
372 					if (record.Row_Left == 1) {
373 						instance.QUERIES[record.s_RV24].baseoffset = instance.QUERIES[record.s_RV24].Parser(record);
374 					}
375 				} else {
376 					if (record.Row_Left >= (instance.QUERIES[record.s_RV24].maxRows-1) ) { // This will only work for regular (non-SP) queries
377 						instance.QUERIES[record.s_RV24].fullBatch = true;
378 						instance.QUERIES[record.s_RV24].baseoffset = instance.QUERIES[record.s_RV24].Parser(record);
379 					}
380 				}
381 				// If this record has been seen before, suppress it
382 				if ( instance.QUERIES[record.s_RV24].CACHE.hasOwnProperty(instance.QUERIES[record.s_RV24].SubParser(record)) ) {
383 					record.CONNECTION_ERROR = "ALREADY_SEEN";  // Should short-circuit parsing
384 				} else {
385 					instance.QUERIES[record.s_RV24].noNewData = false;  // Got new data in this batch
386 				}
387 				// If we processed the entire batch without any new data, then we should pause as we got no data
388 				if (record.Row_Left == 0 && instance.QUERIES[record.s_RV24].noNewData) {
389 					instance.QUERIES[record.s_RV24].needQuery = true;
390 					if (instance.QUERIES[record.s_RV24].queryDelay > 0 && instance.QUERIES[record.s_RV24].queryDelay < 33) {
391 						instance.QUERIES[record.s_RV24].queryDelay=instance.QUERIES[record.s_RV24].queryDelay * 2;
392 					} else {
393 						instance.QUERIES[record.s_RV24].queryDelay = 1;
394 					}
395 					instance.QUERIES[record.s_RV24].queryScheduled = new Date().addSeconds(instance.QUERIES[record.s_RV24].queryDelay * 2);
396 					// If the offset didn't go up, then maxRows is too small
397 					if (instance.QUERIES[record.s_RV24].fullBatch) {
398 						instance.CONFIG.scriptContext.clearError(instance.UUID);
399 						if ( instance.QUERIES[record.s_RV24].maxRows < 5000 ) {
400 							instance.QUERIES[record.s_RV24].maxRows = instance.QUERIES[record.s_RV24].maxRows * 2;
401 						} else {
402 							log("MaxRows query size too large to handle!", 5);
403 							instance.CONFIG.scriptContext.addError(instance.UUID, "MaxRows query size too large to handle!"); 
404 							ScriptEngineUtil.setStopFlag(instance.UUID);
405 						}
406 						instance.QUERIES[record.s_RV24].fullBatch = false;
407 						log("MaxRows parameter is too small for this database source!", 3);
408 					}
409 				}
410 			}
411 		}
412 
413 		return record;
414 	};
415 	
416 	/**
417 	 * This method sends data to the attached Connector. 
418 	 * Note that if this function is used the Collector can support attachment 
419 	 * of only a single Connector. It handles both sending strings to Process Connectors
420 	 * and SQL Queries to DB Connectors.
421 	 * 
422 	 * @param {Object} msg  Message to send to the Connector
423 	 * @return {Boolean}  Status of send attempt
424 	 * @see SQLQuery
425 	 */
426 	this.send = function(msg) {
427 		// Ideally I'd like a return result in case the message could not be sent
428 		var data = msg;
429 		if ( msg instanceof SQLQuery ) {
430 			data = msg.buildQuery();
431 			// Reset the noNewData flag for the next batch
432 			msg.noNewData=true;
433 		}
434 		return instance.CONFIG.scriptContext.sendTXData(data);
435 	};
436 
437 	/**
438 	 * This method frees up resources held by this object.
439 	 */
440 	this.cleanup = function() {
441 		if ( this.readThread != null ) {
442 			/* Interrupt the thread so that it exists now, rather than
443 			 * blocking on reading from the queue.
444 			 */
445 			this.readThread.interrupt();
446 			this.readThread = null;
447 		}
448 	}
449 	
450 	/**
451 	 * The DBQuery object is used to hold the default SQL query that will be issued
452 	 * against a database source. It is only used if the Connector is a DB Connector,
453 	 * and will be initialized on receipt of the first record.
454 	 * @see SQLQuery
455 	 */
456 		this.DBQuery = null;
457 }
458 
459