1 /*
  2  * Copyright © [2008-2009] Novell, Inc.  All Rights Reserved.
  3  * 
  4  * USE AND REDISTRIBUTION OF THIS WORK IS SUBJECT TO THE DEVELOPER LICENSE AGREEMENT
  5  * OR OTHER AGREEMENT THROUGH WHICH NOVELL, INC. MAKES THE WORK AVAILABLE.  THIS WORK 
  6  * MAY NOT BE ADAPTED WITHOUT NOVELL'S PRIOR WRITTEN CONSENT.
  7  * 
  8  * NOVELL PROVIDES THE WORK "AS IS," WITHOUT ANY EXPRESS OR IMPLIED WARRANTY, 
  9  * INCLUDING WITHOUT LIMITATION THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR 
 10  * A PARTICULAR PURPOSE, AND NON-INFRINGEMENT.  NOVELL, THE AUTHORS OF THE WORK, AND THE 
 11  * OWNERS OF COPYRIGHT IN THE WORK ARE NOT LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER 
 12  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT, OR OTHERWISE, ARISING FROM, OUT OF, 
 13  * OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS IN THE WORK.
 14  */
 15 
 16 /**
 17  * @fileoverview
 18  * This file defines the Connector class, which represents the Connector(s) that pass
 19  * input data to the Collector.
 20  * @name Connector Class
 21  */
 22 
 23 /**
 24  * This class represents the Connector(s) passing data to the Collector.
 25  * Connectors actually feed a single inbound queue of event data, so you cannot 
 26  * address them individually. This has implications for scenarios where you must
 27  * send data to the Connector - in this case you can only have a single Connector
 28  * per Collector (DB Connector, for example).
 29  * The template handles most things to do with reading and writing to the Connector,
 30  * but in some unusual circumstances you may need to use this class.
 31  * @author   Novell Engineering
 32  * @version  6.1
 33  * @constructor
 34  * @param {String} uuid  UUID of the Connector in ESM
 35  */
 36 function Connector(uuid) {
 37 	
 38 	if (typeof uuid != "undefined" && typeof ESM != "undefined") {
 39   	this.UUID = uuid;
 40   	this.connectorObject = ESM.groupForId(this.UUID);
 41   }
 42 		
 43 	/**
 44 	 * Starts this Connector in ESM.
 45 	 */
 46 	this.start = function() {
 47 		if( typeof this.connectorObject != "undefined" ) {
 48 			this.connectorObject.start();
 49 		}
 50 	};
 51 
 52 	/**
 53  	* Stops this Connector in ESM.
 54  	*/
 55 	this.stop = function() {
 56 		if( typeof this.connectorObject != "undefined" ) {
 57 			this.connectorObject.stop();
 58 		}
 59 	};
 60 	
 61 	/**
 62 	 * Adds a default parser that will automatically be called to determine the current offset.
 63 	 * The parser should be defined as a function that can be attached to a SQLQuery.
 64 	 * instance.PARSER is provided as a convenient place to store pre-defined parser routines.
 65 	 * Any time a new event source is discovered and a SQLQuery is allocated for it, this default parser
 66 	 * will be attached to it.
 67 	 * <p>
 68 	 * Example:
 69 	 * <pre>
 70 	 * // ...in initialize() method
 71 	 * instance.PARSER.getOffset = function(input) {
 72 	 *   return input.RXMap.RecNumber;
 73 	 *   ...
 74 	 * };
 75 	 * conn.addParser(instance.PARSER.getOffset);
 76 	 * </pre>
 77 	 * Note that you could simply create a single parser that will be assigned by default to all SQLQuery objects,
 78 	 * but you can also predefine several parsers and dynamically attach them directly to SQLQuery objects; see
 79 	 * the SQLQuery object API for details.
 80 	 * @see SQLQuery
 81 	 * @param {Function} parser  - The parser function to attach to this session
 82 	 * @return {Boolean} Result
 83 	 */
 84 	this.addParser = function( parser ) {
 85 			if ( typeof parser != 'function' ) { return false; }
 86 			this.Parser = parser;
 87 			return true;
 88 	};
 89 		
 90 	/**
 91 	 * The parser used to pull the latest offset out of the last database record.
 92 	 */
 93 	this.Parser = function(input) {
 94 		return true;
 95 	};
 96 
 97 	/**
 98 	 * Enable suboffset handling for SQLQuery objects and add a handler that will be automatically called to determine the suboffset.
 99 	 * Many databases don't use an incremental event ID that can be guaranteed to be unique per event, and properly increment over time.
100 	 * To handle this scenario, we commonly query based on a timestamp offset, but in many cases the resolution of the timestamp is such that 
101 	 * we may get multiple events for any given offset. Since we can't guarantee that we will get all the events assigned to a single offset
102 	 * (either because we hit MaxRows or because some of the events hadn't been generated yet), we need to query the same offset again until we
103 	 * see the next offset in the received data (which implies that we've passed into the next timeslice).
104 	 * The resulting issue, however, is that we may get duplicate events in subsequent queries which we must filter out be keeping track of 
105 	 * which events we've already processed. To do this, we need to have a way to uniquely identify each event by some field or combination of fields, 
106 	 * but this value need not be incremental.
107 	 * This feature uses an internal cache of processed events, plus a registered function to extract the unique value from processed events.
108 	 * @example
109 	 * <p>
110 	 * Example:
111 	 * <pre>
112 	 * // ...in initialize() method
113 	 * instance.PARSER.getSubOffset = function(input) {
114 	 *   return input.RXMap.RecID;
115 	 *   ...
116 	 * };
117 	 * conn.addSubParser(instance.PARSER.getSubOffset);
118 	 * </pre>
119 	 * Note that if you define a subparser, it will be automatically attached to all SQLQuery objects that are allocated for event sources.
120 	 * If you have some event sources that use suboffsets and some that don't, you'll manually have to disable suboffsets for those SQLQuery objects
121 	 * that don't use suboffsets (see SQLQuery API).
122 	 * @param {Function} parser  - The parser function to attach to this session
123 	 * @return {Boolean} Result
124 	 * @see SQLQuery 
125 	 */
126 	this.addSubParser = function( subparser ) {
127 		if ( typeof subparser != 'function' ) { return false; }
128 		this.SubParser = subparser;
129 		instance.CONFIG.params.SubOffsets = true;
130 		return true;
131 	}
132 	
133 	/**
134 	 * The parser used to pull the latest suboffset out of the last database record.
135 	 */
136 	this.SubParser = function(input) {
137 		return true;
138 	};
139 
140 		
141 	if ( typeof scriptEnv != "undefined" ) { // This is an Action - short circuit
142 		return true;
143 	}
144 	
145 	if ( instance.CONFIG.params.Conn_Retry != "none" ) {
146 		// Set up queue and invoke separate thread to feed the queue with ScriptEngineContext.readData()
147 		var queue = new java.util.concurrent.SynchronousQueue();
148 		this.readThread = java.lang.Thread(function() {
149 			var ctx = instance.CONFIG.scriptContext;
150 			while(ctx.getKeepRunning()) {
151 				queue.put(ctx.readData());
152 			}
153 		});
154 		this.readThread.setName("collector-read-thread-"+this.UUID);
155 		this.readThread.setDaemon(true);
156 		this.readThread.start();
157 	
158 		this.retries = 1; // Initial retries for falloff
159 	
160 		function getData(secs){
161 		if (typeof secs == "undefined") {
162 			return queue.poll(0, java.util.concurrent.TimeUnit.SECONDS);
163 		}	
164 		if (secs > 0) {
165 			return queue.poll(secs, java.util.concurrent.TimeUnit.SECONDS);
166 		}
167 		else 
168 			if (secs == 0) {
169 				return queue.take();
170 			}
171 		}
172 	}
173 
174 	
175 	/**
176 	 * This method reads the next set of input data from the Connector queue.
177 	 * Connector.read() returns a Record object that is a map of the input data received plus Connector
178 	 * meta-information.
179 	 * @return {Record}  The record received from the Connector queue, basically a hash map
180 	 */
181 	this.read = function() {
182 		var record = new Record();
183 		try {
184 			if (instance.CONFIG.params.Conn_Retry != "none") {
185 				if (instance.CONFIG.params.Conn_Retry == "fixed") {
186 					record.connectorData = getData(instance.CONFIG.params.RX_Timeout);
187 				} else {
188 					for (var i=0; i < this.retries; i++) {
189 						// short retry to ensure that we check if we should keep running
190 						record.connectorData = getData(5);
191 						// Check if the Collector was stopped
192 						if(!ScriptEngineUtil.keepRunning() || record.connectorData != null) {
193 							break;
194 						}
195 						// else loop - no data yet
196 					}
197 				}
198 			}
199 
200 			if ( record.connectorData === null || typeof record.CONNECTION_ERROR != "undefined" ) {
201 				/* If we didn't get any data, or the data we got was a notification that another
202 				 * query was needed, then set NODATA on the record.
203 				 */
204 				record.CONNECTION_ERROR = record.CONNECTION_ERROR ? record.CONNECTION_ERROR : "NODATA";
205 				if (this.retries < 24) {
206 					/* If the last max number of retries we did was less than 24, then
207 					 * set max number of retries higher so that we'll retry more next time.
208 					 */
209 					this.retries = this.retries * 2;
210 				}
211 				return record; 
212 			}
213 						
214 			// We got data!
215 			this.retries = 1; // we got data so reset retries to 1
216 			var connectorUnmodData = record.connectorData.getUnmodifiableData();
217 			var entries = connectorUnmodData.entrySet().toArray();
218 			for (var j = entries.length-1; j > -1; --j) {  //  DGC Check this!!!
219 				var key = String(entries[j].key);
220 				var value = String(entries[j].value);
221 				
222 				// Put anything starting with "s_" or "i_" into the record.
223 				if (key.length >= 2 && key.charAt(1) == '_' && (key.charAt( 0) == 's' || key.charAt( 0) == 'i')) {
224 					record[key] = value;
225 				} else {
226 					switch (key) {
227 						case "CONNECTION_METHOD":
228 						case "CONNECTION_MODE":
229 						case "o":
230 						case "Row_Left":
231 						case "Application":
232 						case "CurrentFile":
233 						case "SourceIP":
234 						case "QueryID":
235 							record[key] = value;
236 							break;
237 						default:
238 							record.RXMap[key] = value;
239 							break;
240 					}
241 				}
242 					
243 			}
244 		} catch(err) {
245 			record.CONNECTION_ERROR = err;
246 			log("Error reading next connector data object: "+err);
247 			return record;
248 		}
249 			
250 		// Handle re-read of files created by Connectors ("Replay" mode)
251 		if (typeof record.CONNECTION_METHOD != 'undefined' && record.CONNECTION_MODE == "Replay") {
252 			record.CONNECTION_REPLAY = true;
253 			switch (record.s_RXBufferString[0]) {
254 				case "{":  // New-style JSON dump
255 					var tmpObj = JSON.parse(record.s_RXBufferString);
256 					break;
257 				default:   // old-style dump
258 					// this is a weird quoted thingy - replace the double quotes first
259 					var tmpString = record.s_RXBufferString.replace(/\"\"/g, "@QUOTE@");
260 					tmpString = tmpString.replace(/\"/g, "@EMBEDQUOTE@");
261 					tmpString = tmpString.replace(/@QUOTE@/g, "\"");
262 					tmpString = tmpString.trim();  // remove trailing whitespace
263 					tmpString = tmpString.replace(/\\n/g, "\n");  // get rid of encoded newlines
264 					tmpString = tmpString.replace(/\/x(\d\d\d\d)/g, "\u$1"); // and convert Java unicode to JS unicode for some common separators
265 					var tmpObj = tmpString.parseNVP(" ", "=", "\"");
266 					if (typeof tmpObj.s_RXBufferString != "undefined") {
267 						tmpObj.s_RXBufferString = tmpObj.s_RXBufferString.replace(/@EMBEDQUOTE@/g, "\"");
268 					}
269 					if (typeof tmpObj.s_Body != "undefined") {
270 						tmpObj.s_Body = tmpObj.s_Body.replace(/@EMBEDQUOTE@/g, "\"");
271 					}
272 					break;
273 				// End of Connector Dump switch
274 			}
275 			for (var key in tmpObj) {
276 				// replace input map with replayed values
277 				if (tmpObj.hasOwnProperty(key)) {
278 					var value = String(tmpObj[key]);
279 					// Put anything starting with "s_" or "i_" into the record.
280 					if (key.length >= 2 && key.charAt(1) == '_' && (key.charAt( 0) == 's' || key.charAt( 0) == 'i')) {
281 						record[key] = value;
282 					} else {
283 						switch (key) {
284 							case "CONNECTION_METHOD":
285 							case "CONNECTION_MODE":
286 							case "o":
287 							case "Row_Left":
288 							case "Application":
289 							case "CurrentFile":
290 							case "SourceIP":
291 							case "QueryID":
292 								record[key] = value;
293 								break;
294 							default:
295 								record.RXMap[key] = value;
296 								break;
297 						}
298 					}
299 				}
300 			}
301 		}  // End of Replay processing
302 		
303 		if (record.CONNECTION_METHOD == "SNMP")
304 		{
305 			// The current implementation of SNMP has a subtle bug that renders the indexes of two arrays into
306 			// named properties.   We're going to convert the named properties into appropriate arrays in JS to 
307 			// make the upstream consumption of that data match the documentation of the connector.
308 			if ((typeof (record.s_Trap_Value) == "undefined") || (typeof (record.s_Trap_OID) == "undefined"))
309 			{
310 				// declare these as arrays in all cases to make sure we have 
311 				record.s_Trap_Value = [];
312 				record.s_Trap_OID = [];
313 				if (record.i_Trap_Vars > 0)
314 				{
315 					for (i=0; i < record.i_Trap_Vars; i++)
316 					{
317 						record.s_Trap_Value[i] = record["s_Trap_Value["+i+"]"];
318 						record.s_Trap_OID[i] = record["s_Trap_OID["+i+"]"];
319 					} 
320 				}
321 			}
322 		}
323 		
324 		/* Databases are a bit special, mostly because we have to query them interactively. The general way that the Connector
325 		 * works is that it will send us a special "offset" record when it is first started to tell us where we left off, 
326 		 * then we send it a query, then the Connector gets a batch of data which it feeds to us one record at a time.
327 		 * If the query is a normal SQL query, the Row_Left variable will count down from "batchsize" to 0.
328 		 * If it's a stored procedure, I think right now it counts up.
329 		 * When we see the last record, we need to re-issue the query.
330 		 * We will also get a Row_Left=-1 record when a query returns no data (unlike other Connectors, which block).
331 		 */
332 		if (record.CONNECTION_METHOD == "DATABASE") {	// this is a DB Connector
333 			// First, check if we have seen any data from this particular event source before
334 			// If not, allocate a new SQLQuery object and associate it with the event source; initialize with the default query and parsers.
335 			if (typeof instance.QUERIES[record.s_RV24] == "undefined") {
336 				if(typeof instance.CONFIG.params.Query_Variant != "undefined") {
337 					instance.QUERIES[record.s_RV24] = new SQLQuery("sqlquery" + instance.CONFIG.params.Query_Variant + ".base");
338 				} else {
339 					instance.QUERIES[record.s_RV24] = new SQLQuery();
340 				}
341 				
342 				instance.QUERIES[record.s_RV24].addParser(this.Parser);
343 				if (instance.CONFIG.params.SubOffsets == true) { instance.QUERIES[record.s_RV24].addSubParser(this.SubParser); }
344 				
345 				// Special support for a test query specified in a parameter; DELETE PARAM FROM RELEASE COLLECTORS
346 				if(typeof instance.CONFIG.params.Test_Query != "undefined" && instance.CONFIG.params.Test_Query != "") {
347 					instance.QUERIES[record.s_RV24].baseQuery = instance.CONFIG.params.Test_Query;
348 				}	
349 			}
350 
351 			// Next, handle special records that should trigger new queries or other special behavior
352 			switch(record.Row_Left) {
353 				case "-2":
354 					instance.QUERIES[record.s_RV24].setOffset(record.connectorData.offset);
355 					// This handles a case where multiple -2 records are sitting in the Connector queue and have to be consumed before
356 					// we get to the results from our query
357 					if (instance.QUERIES[record.s_RV24].queryDelay > -1) {
358 						conn.send(instance.QUERIES[record.s_RV24]);
359 						instance.QUERIES[record.s_RV24].queryDelay = -1;
360 					}
361 					// Send the initial query to the DB
362 					record.CONNECTION_ERROR = "OFFSET_NOT_EVENT";
363 					return record;
364 				case "-1":
365 					// This means that no data was returned from our query, which means that we should pause, then query again
366 					// We keep the status and schedule for the next query in the SQLQuery object
367 					instance.QUERIES[record.s_RV24].needQuery = true;
368 					if (instance.QUERIES[record.s_RV24].queryDelay > 0 && instance.QUERIES[record.s_RV24].queryDelay < 33) {
369 						instance.QUERIES[record.s_RV24].queryDelay=instance.QUERIES[record.s_RV24].queryDelay * 2;
370 					} else {
371 						instance.QUERIES[record.s_RV24].queryDelay = 1;
372 					}
373 					instance.QUERIES[record.s_RV24].queryScheduled = new Date().addSeconds(instance.QUERIES[record.s_RV24].queryDelay * 2);
374 					instance.QUERIES[record.s_RV24].noNewData = true;
375 					instance.QUERIES[record.s_RV24].fullBatch = false;
376 					record.CONNECTION_ERROR = "NODATA";
377 					return record;
378 				case "0":
379 					instance.QUERIES[record.s_RV24].needQuery = true
380 				default:
381 					// Note since there's no break, this is also executed for case 0
382 					instance.QUERIES[record.s_RV24].setOffset(instance.QUERIES[record.s_RV24].Parser(record));
383 					instance.QUERIES[record.s_RV24].queryDelay = 0;
384 					break;
385 			}
386 			
387 			// Special suboffset handling
388 			if ( instance.QUERIES[record.s_RV24].SubOffsets == true ) {
389 				// Check if we got a full batch back and it's the first record in the batch 
390 				if ( record.CONNECTION_MODE == "sp" )  { // Handle stored procedures, which count the wrong way
391 					if (record.Row_Left >= instance.QUERIES[record.s_RV24].maxRows) {
392 						instance.QUERIES[record.s_RV24].fullBatch = true;
393 					}
394 					if (record.Row_Left == 1) {
395 						instance.QUERIES[record.s_RV24].baseoffset = instance.QUERIES[record.s_RV24].Parser(record);
396 					}
397 				} else {
398 					if (record.Row_Left >= (instance.QUERIES[record.s_RV24].maxRows-1) ) { // This will only work for regular (non-SP) queries
399 						instance.QUERIES[record.s_RV24].fullBatch = true;
400 						instance.QUERIES[record.s_RV24].baseoffset = instance.QUERIES[record.s_RV24].Parser(record);
401 					}
402 				}
403 				// If this record has been seen before, suppress it
404 				if ( instance.QUERIES[record.s_RV24].CACHE.hasOwnProperty(instance.QUERIES[record.s_RV24].SubParser(record)) ) {
405 					record.CONNECTION_ERROR = "ALREADY_SEEN";  // Should short-circuit parsing
406 				} else {
407 					instance.QUERIES[record.s_RV24].noNewData = false;  // Got new data in this batch
408 				}
409 				// If we processed the entire batch without any new data, then we should pause as we got no data
410 				if (record.Row_Left == 0 && instance.QUERIES[record.s_RV24].noNewData) {
411 					instance.QUERIES[record.s_RV24].needQuery = true;
412 					if (instance.QUERIES[record.s_RV24].queryDelay > 0 && instance.QUERIES[record.s_RV24].queryDelay < 33) {
413 						instance.QUERIES[record.s_RV24].queryDelay=instance.QUERIES[record.s_RV24].queryDelay * 2;
414 					} else {
415 						instance.QUERIES[record.s_RV24].queryDelay = 1;
416 					}
417 					instance.QUERIES[record.s_RV24].queryScheduled = new Date().addSeconds(instance.QUERIES[record.s_RV24].queryDelay * 2);
418 					// If the offset didn't go up, then maxRows is too small
419 					if (instance.QUERIES[record.s_RV24].fullBatch) {
420 						instance.CONFIG.scriptContext.clearError(instance.UUID);
421 						if ( instance.QUERIES[record.s_RV24].maxRows < instance.CONFIG.params.batchLimit ) {
422 							instance.QUERIES[record.s_RV24].maxRows = instance.QUERIES[record.s_RV24].maxRows * 2;
423 						} else {
424 							log("MaxRows query size too large to handle!", 5);
425 							instance.CONFIG.scriptContext.addError(instance.UUID, "MaxRows query size too large to handle!"); 
426 							ScriptEngineUtil.setStopFlag(instance.UUID);
427 						}
428 						instance.QUERIES[record.s_RV24].fullBatch = false;
429 						log("MaxRows parameter is too small for this database source!", 3);
430 					}
431 				}
432 			}
433 		}
434 
435 		return record;
436 	};
437 	
438 	/**
439 	 * This method sends data to the attached Connector. 
440 	 * Note that if this function is used the Collector can support attachment 
441 	 * of only a single Connector. It handles both sending strings to Process Connectors
442 	 * and SQL Queries to DB Connectors.
443 	 * 
444 	 * @param {Object} msg  Message to send to the Connector
445 	 * @return {Boolean}  Status of send attempt
446 	 * @see SQLQuery
447 	 */
448 	this.send = function(msg) {
449 		// Ideally I'd like a return result in case the message could not be sent
450 		var data = msg;
451 		if ( msg instanceof SQLQuery ) {
452 			data = msg.buildQuery();
453 			// Reset the flags for the next batch
454 			msg.noNewData=true;
455 			msg.fullBatch=false;
456 		}
457 		return instance.CONFIG.scriptContext.sendTXData(data);
458 	};
459 
460 	/**
461 	 * This method frees up resources held by this object.
462 	 */
463 	this.cleanup = function() {
464 		if ( this.readThread != null ) {
465 			/* Interrupt the thread so that it exists now, rather than
466 			 * blocking on reading from the queue.
467 			 */
468 			this.readThread.interrupt();
469 			this.readThread = null;
470 		}
471 	}
472 	
473 	/**
474 	 * The DBQuery object is used to hold the default SQL query that will be issued
475 	 * against a database source. It is only used if the Connector is a DB Connector,
476 	 * and will be initialized on receipt of the first record.
477 	 * @see SQLQuery
478 	 */
479 		this.DBQuery = null;
480 }
481 
482