1 /* 2 * Copyright © [2008-2009] Novell, Inc. All Rights Reserved. 3 * 4 * USE AND REDISTRIBUTION OF THIS WORK IS SUBJECT TO THE DEVELOPER LICENSE AGREEMENT 5 * OR OTHER AGREEMENT THROUGH WHICH NOVELL, INC. MAKES THE WORK AVAILABLE. THIS WORK 6 * MAY NOT BE ADAPTED WITHOUT NOVELL'S PRIOR WRITTEN CONSENT. 7 * 8 * NOVELL PROVIDES THE WORK "AS IS," WITHOUT ANY EXPRESS OR IMPLIED WARRANTY, 9 * INCLUDING WITHOUT LIMITATION THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR 10 * A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. NOVELL, THE AUTHORS OF THE WORK, AND THE 11 * OWNERS OF COPYRIGHT IN THE WORK ARE NOT LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER 12 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT, OR OTHERWISE, ARISING FROM, OUT OF, 13 * OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS IN THE WORK. 14 */ 15 16 /** 17 * @fileoverview 18 * This file defines the Connector class, which represents the Connector(s) that pass 19 * input data to the Collector. 20 * @name Connector Class 21 */ 22 23 /** 24 * This class represents the Connector(s) passing data to the Collector. 25 * Connectors actually feed a single inbound queue of event data, so you cannot 26 * address them individually. This has implications for scenarios where you must 27 * send data to the Connector - in this case you can only have a single Connector 28 * per Collector (DB Connector, for example). 29 * The template handles most things to do with reading and writing to the Connector, 30 * but in some unusual circumstances you may need to use this class. 31 * @author Novell Engineering 32 * @version 6.1 33 * @constructor 34 * @param {String} uuid UUID of the Connector in ESM 35 */ 36 function Connector(uuid) { 37 38 if (typeof uuid != "undefined" && typeof ESM != "undefined") { 39 this.UUID = uuid; 40 this.connectorObject = ESM.groupForId(this.UUID); 41 } 42 43 /** 44 * Starts this Connector in ESM. 45 */ 46 this.start = function() { 47 if( typeof this.connectorObject != "undefined" ) { 48 this.connectorObject.start(); 49 } 50 }; 51 52 /** 53 * Stops this Connector in ESM. 54 */ 55 this.stop = function() { 56 if( typeof this.connectorObject != "undefined" ) { 57 this.connectorObject.stop(); 58 } 59 }; 60 61 /** 62 * Adds a default parser that will automatically be called to determine the current offset. 63 * The parser should be defined as a function that can be attached to a SQLQuery. 64 * instance.PARSER is provided as a convenient place to store pre-defined parser routines. 65 * Any time a new event source is discovered and a SQLQuery is allocated for it, this default parser 66 * will be attached to it. 67 * <p> 68 * Example: 69 * <pre> 70 * // ...in initialize() method 71 * instance.PARSER.getOffset = function(input) { 72 * return input.RXMap.RecNumber; 73 * ... 74 * }; 75 * conn.addParser(instance.PARSER.getOffset); 76 * </pre> 77 * Note that you could simply create a single parser that will be assigned by default to all SQLQuery objects, 78 * but you can also predefine several parsers and dynamically attach them directly to SQLQuery objects; see 79 * the SQLQuery object API for details. 80 * @see SQLQuery 81 * @param {Function} parser - The parser function to attach to this session 82 * @return {Boolean} Result 83 */ 84 this.addParser = function( parser ) { 85 if ( typeof parser != 'function' ) { return false; } 86 this.Parser = parser; 87 return true; 88 }; 89 90 /** 91 * The parser used to pull the latest offset out of the last database record. 92 */ 93 this.Parser = function(input) { 94 return true; 95 }; 96 97 /** 98 * Enable suboffset handling for SQLQuery objects and add a handler that will be automatically called to determine the suboffset. 99 * Many databases don't use an incremental event ID that can be guaranteed to be unique per event, and properly increment over time. 100 * To handle this scenario, we commonly query based on a timestamp offset, but in many cases the resolution of the timestamp is such that 101 * we may get multiple events for any given offset. Since we can't guarantee that we will get all the events assigned to a single offset 102 * (either because we hit MaxRows or because some of the events hadn't been generated yet), we need to query the same offset again until we 103 * see the next offset in the received data (which implies that we've passed into the next timeslice). 104 * The resulting issue, however, is that we may get duplicate events in subsequent queries which we must filter out be keeping track of 105 * which events we've already processed. To do this, we need to have a way to uniquely identify each event by some field or combination of fields, 106 * but this value need not be incremental. 107 * This feature uses an internal cache of processed events, plus a registered function to extract the unique value from processed events. 108 * @example 109 * <p> 110 * Example: 111 * <pre> 112 * // ...in initialize() method 113 * instance.PARSER.getSubOffset = function(input) { 114 * return input.RXMap.RecID; 115 * ... 116 * }; 117 * conn.addSubParser(instance.PARSER.getSubOffset); 118 * </pre> 119 * Note that if you define a subparser, it will be automatically attached to all SQLQuery objects that are allocated for event sources. 120 * If you have some event sources that use suboffsets and some that don't, you'll manually have to disable suboffsets for those SQLQuery objects 121 * that don't use suboffsets (see SQLQuery API). 122 * @param {Function} parser - The parser function to attach to this session 123 * @return {Boolean} Result 124 * @see SQLQuery 125 */ 126 this.addSubParser = function( subparser ) { 127 if ( typeof subparser != 'function' ) { return false; } 128 this.SubParser = subparser; 129 instance.CONFIG.params.SubOffsets = true; 130 return true; 131 } 132 133 /** 134 * The parser used to pull the latest suboffset out of the last database record. 135 */ 136 this.SubParser = function(input) { 137 return true; 138 }; 139 140 141 if ( typeof scriptEnv != "undefined" ) { // This is an Action - short circuit 142 return true; 143 } 144 145 if ( instance.CONFIG.params.Conn_Retry != "none" ) { 146 // Set up queue and invoke separate thread to feed the queue with ScriptEngineContext.readData() 147 var queue = new java.util.concurrent.SynchronousQueue(); 148 this.readThread = java.lang.Thread(function() { 149 var ctx = instance.CONFIG.scriptContext; 150 while(ctx.getKeepRunning()) { 151 queue.put(ctx.readData()); 152 } 153 }); 154 this.readThread.setName("collector-read-thread-"+this.UUID); 155 this.readThread.setDaemon(true); 156 this.readThread.start(); 157 158 this.retries = 1; // Initial retries for falloff 159 160 function getData(secs){ 161 if (typeof secs == "undefined") { 162 return queue.poll(0, java.util.concurrent.TimeUnit.SECONDS); 163 } 164 if (secs > 0) { 165 return queue.poll(secs, java.util.concurrent.TimeUnit.SECONDS); 166 } 167 else 168 if (secs == 0) { 169 return queue.take(); 170 } 171 } 172 } 173 174 175 /** 176 * This method reads the next set of input data from the Connector queue. 177 * Connector.read() returns a Record object that is a map of the input data received plus Connector 178 * meta-information. 179 * @return {Record} The record received from the Connector queue, basically a hash map 180 */ 181 this.read = function() { 182 var record = new Record(); 183 try { 184 if (instance.CONFIG.params.Conn_Retry != "none") { 185 if (instance.CONFIG.params.Conn_Retry == "fixed") { 186 record.connectorData = getData(instance.CONFIG.params.RX_Timeout); 187 } else { 188 for (var i=0; i < this.retries; i++) { 189 // short retry to ensure that we check if we should keep running 190 record.connectorData = getData(5); 191 // Check if the Collector was stopped 192 if(!ScriptEngineUtil.keepRunning() || record.connectorData != null) { 193 break; 194 } 195 // else loop - no data yet 196 } 197 } 198 } 199 200 if ( record.connectorData === null || typeof record.CONNECTION_ERROR != "undefined" ) { 201 /* If we didn't get any data, or the data we got was a notification that another 202 * query was needed, then set NODATA on the record. 203 */ 204 record.CONNECTION_ERROR = record.CONNECTION_ERROR ? record.CONNECTION_ERROR : "NODATA"; 205 if (this.retries < 24) { 206 /* If the last max number of retries we did was less than 24, then 207 * set max number of retries higher so that we'll retry more next time. 208 */ 209 this.retries = this.retries * 2; 210 } 211 return record; 212 } 213 214 // We got data! 215 this.retries = 1; // we got data so reset retries to 1 216 var connectorUnmodData = record.connectorData.getUnmodifiableData(); 217 var entries = connectorUnmodData.entrySet().toArray(); 218 for (var j = entries.length-1; j > -1; --j) { // DGC Check this!!! 219 var key = String(entries[j].key); 220 var value = String(entries[j].value); 221 222 // Put anything starting with "s_" or "i_" into the record. 223 if (key.length >= 2 && key.charAt(1) == '_' && (key.charAt( 0) == 's' || key.charAt( 0) == 'i')) { 224 record[key] = value; 225 } else { 226 switch (key) { 227 case "CONNECTION_METHOD": 228 case "CONNECTION_MODE": 229 case "o": 230 case "Row_Left": 231 case "Application": 232 case "CurrentFile": 233 case "SourceIP": 234 case "QueryID": 235 record[key] = value; 236 break; 237 default: 238 record.RXMap[key] = value; 239 break; 240 } 241 } 242 243 } 244 } catch(err) { 245 record.CONNECTION_ERROR = err; 246 log("Error reading next connector data object: "+err); 247 return record; 248 } 249 250 // Handle re-read of files created by Connectors ("Replay" mode) 251 if (typeof record.CONNECTION_METHOD != 'undefined' && record.CONNECTION_MODE == "Replay") { 252 record.CONNECTION_REPLAY = true; 253 switch (record.s_RXBufferString[0]) { 254 case "{": // New-style JSON dump 255 var tmpObj = JSON.parse(record.s_RXBufferString); 256 break; 257 default: // old-style dump 258 // this is a weird quoted thingy - replace the double quotes first 259 var tmpString = record.s_RXBufferString.replace(/\"\"/g, "@QUOTE@"); 260 tmpString = tmpString.replace(/\"/g, "@EMBEDQUOTE@"); 261 tmpString = tmpString.replace(/@QUOTE@/g, "\""); 262 tmpString = tmpString.trim(); // remove trailing whitespace 263 tmpString = tmpString.replace(/\\n/g, "\n"); // get rid of encoded newlines 264 tmpString = tmpString.replace(/\/x(\d\d\d\d)/g, "\u$1"); // and convert Java unicode to JS unicode for some common separators 265 var tmpObj = tmpString.parseNVP(" ", "=", "\""); 266 if (typeof tmpObj.s_RXBufferString != "undefined") { 267 tmpObj.s_RXBufferString = tmpObj.s_RXBufferString.replace(/@EMBEDQUOTE@/g, "\""); 268 } 269 if (typeof tmpObj.s_Body != "undefined") { 270 tmpObj.s_Body = tmpObj.s_Body.replace(/@EMBEDQUOTE@/g, "\""); 271 } 272 break; 273 // End of Connector Dump switch 274 } 275 for (var key in tmpObj) { 276 // replace input map with replayed values 277 if (tmpObj.hasOwnProperty(key)) { 278 var value = String(tmpObj[key]); 279 // Put anything starting with "s_" or "i_" into the record. 280 if (key.length >= 2 && key.charAt(1) == '_' && (key.charAt( 0) == 's' || key.charAt( 0) == 'i')) { 281 record[key] = value; 282 } else { 283 switch (key) { 284 case "CONNECTION_METHOD": 285 case "CONNECTION_MODE": 286 case "o": 287 case "Row_Left": 288 case "Application": 289 case "CurrentFile": 290 case "SourceIP": 291 case "QueryID": 292 record[key] = value; 293 break; 294 default: 295 record.RXMap[key] = value; 296 break; 297 } 298 } 299 } 300 } 301 } // End of Replay processing 302 303 if (record.CONNECTION_METHOD == "SNMP") 304 { 305 // The current implementation of SNMP has a subtle bug that renders the indexes of two arrays into 306 // named properties. We're going to convert the named properties into appropriate arrays in JS to 307 // make the upstream consumption of that data match the documentation of the connector. 308 if ((typeof (record.s_Trap_Value) == "undefined") || (typeof (record.s_Trap_OID) == "undefined")) 309 { 310 // declare these as arrays in all cases to make sure we have 311 record.s_Trap_Value = []; 312 record.s_Trap_OID = []; 313 if (record.i_Trap_Vars > 0) 314 { 315 for (i=0; i < record.i_Trap_Vars; i++) 316 { 317 record.s_Trap_Value[i] = record["s_Trap_Value["+i+"]"]; 318 record.s_Trap_OID[i] = record["s_Trap_OID["+i+"]"]; 319 } 320 } 321 } 322 } 323 324 /* Databases are a bit special, mostly because we have to query them interactively. The general way that the Connector 325 * works is that it will send us a special "offset" record when it is first started to tell us where we left off, 326 * then we send it a query, then the Connector gets a batch of data which it feeds to us one record at a time. 327 * If the query is a normal SQL query, the Row_Left variable will count down from "batchsize" to 0. 328 * If it's a stored procedure, I think right now it counts up. 329 * When we see the last record, we need to re-issue the query. 330 * We will also get a Row_Left=-1 record when a query returns no data (unlike other Connectors, which block). 331 */ 332 if (record.CONNECTION_METHOD == "DATABASE") { // this is a DB Connector 333 // First, check if we have seen any data from this particular event source before 334 // If not, allocate a new SQLQuery object and associate it with the event source; initialize with the default query and parsers. 335 if (typeof instance.QUERIES[record.s_RV24] == "undefined") { 336 if(typeof instance.CONFIG.params.Query_Variant != "undefined") { 337 instance.QUERIES[record.s_RV24] = new SQLQuery("sqlquery" + instance.CONFIG.params.Query_Variant + ".base"); 338 } else { 339 instance.QUERIES[record.s_RV24] = new SQLQuery(); 340 } 341 342 instance.QUERIES[record.s_RV24].addParser(this.Parser); 343 if (instance.CONFIG.params.SubOffsets == true) { instance.QUERIES[record.s_RV24].addSubParser(this.SubParser); } 344 345 // Special support for a test query specified in a parameter; DELETE PARAM FROM RELEASE COLLECTORS 346 if(typeof instance.CONFIG.params.Test_Query != "undefined" && instance.CONFIG.params.Test_Query != "") { 347 instance.QUERIES[record.s_RV24].baseQuery = instance.CONFIG.params.Test_Query; 348 } 349 } 350 351 // Next, handle special records that should trigger new queries or other special behavior 352 switch(record.Row_Left) { 353 case "-2": 354 instance.QUERIES[record.s_RV24].setOffset(record.connectorData.offset); 355 // This handles a case where multiple -2 records are sitting in the Connector queue and have to be consumed before 356 // we get to the results from our query 357 if (instance.QUERIES[record.s_RV24].queryDelay > -1) { 358 conn.send(instance.QUERIES[record.s_RV24]); 359 instance.QUERIES[record.s_RV24].queryDelay = -1; 360 } 361 // Send the initial query to the DB 362 record.CONNECTION_ERROR = "OFFSET_NOT_EVENT"; 363 return record; 364 case "-1": 365 // This means that no data was returned from our query, which means that we should pause, then query again 366 // We keep the status and schedule for the next query in the SQLQuery object 367 instance.QUERIES[record.s_RV24].needQuery = true; 368 if (instance.QUERIES[record.s_RV24].queryDelay > 0 && instance.QUERIES[record.s_RV24].queryDelay < 33) { 369 instance.QUERIES[record.s_RV24].queryDelay=instance.QUERIES[record.s_RV24].queryDelay * 2; 370 } else { 371 instance.QUERIES[record.s_RV24].queryDelay = 1; 372 } 373 instance.QUERIES[record.s_RV24].queryScheduled = new Date().addSeconds(instance.QUERIES[record.s_RV24].queryDelay * 2); 374 instance.QUERIES[record.s_RV24].noNewData = true; 375 instance.QUERIES[record.s_RV24].fullBatch = false; 376 record.CONNECTION_ERROR = "NODATA"; 377 return record; 378 case "0": 379 instance.QUERIES[record.s_RV24].needQuery = true 380 default: 381 // Note since there's no break, this is also executed for case 0 382 instance.QUERIES[record.s_RV24].setOffset(instance.QUERIES[record.s_RV24].Parser(record)); 383 instance.QUERIES[record.s_RV24].queryDelay = 0; 384 break; 385 } 386 387 // Special suboffset handling 388 if ( instance.QUERIES[record.s_RV24].SubOffsets == true ) { 389 // Check if we got a full batch back and it's the first record in the batch 390 if ( record.CONNECTION_MODE == "sp" ) { // Handle stored procedures, which count the wrong way 391 if (record.Row_Left >= instance.QUERIES[record.s_RV24].maxRows) { 392 instance.QUERIES[record.s_RV24].fullBatch = true; 393 } 394 if (record.Row_Left == 1) { 395 instance.QUERIES[record.s_RV24].baseoffset = instance.QUERIES[record.s_RV24].Parser(record); 396 } 397 } else { 398 if (record.Row_Left >= (instance.QUERIES[record.s_RV24].maxRows-1) ) { // This will only work for regular (non-SP) queries 399 instance.QUERIES[record.s_RV24].fullBatch = true; 400 instance.QUERIES[record.s_RV24].baseoffset = instance.QUERIES[record.s_RV24].Parser(record); 401 } 402 } 403 // If this record has been seen before, suppress it 404 if ( instance.QUERIES[record.s_RV24].CACHE.hasOwnProperty(instance.QUERIES[record.s_RV24].SubParser(record)) ) { 405 record.CONNECTION_ERROR = "ALREADY_SEEN"; // Should short-circuit parsing 406 } else { 407 instance.QUERIES[record.s_RV24].noNewData = false; // Got new data in this batch 408 } 409 // If we processed the entire batch without any new data, then we should pause as we got no data 410 if (record.Row_Left == 0 && instance.QUERIES[record.s_RV24].noNewData) { 411 instance.QUERIES[record.s_RV24].needQuery = true; 412 if (instance.QUERIES[record.s_RV24].queryDelay > 0 && instance.QUERIES[record.s_RV24].queryDelay < 33) { 413 instance.QUERIES[record.s_RV24].queryDelay=instance.QUERIES[record.s_RV24].queryDelay * 2; 414 } else { 415 instance.QUERIES[record.s_RV24].queryDelay = 1; 416 } 417 instance.QUERIES[record.s_RV24].queryScheduled = new Date().addSeconds(instance.QUERIES[record.s_RV24].queryDelay * 2); 418 // If the offset didn't go up, then maxRows is too small 419 if (instance.QUERIES[record.s_RV24].fullBatch) { 420 instance.CONFIG.scriptContext.clearError(instance.UUID); 421 if ( instance.QUERIES[record.s_RV24].maxRows < instance.CONFIG.params.batchLimit ) { 422 instance.QUERIES[record.s_RV24].maxRows = instance.QUERIES[record.s_RV24].maxRows * 2; 423 } else { 424 log("MaxRows query size too large to handle!", 5); 425 instance.CONFIG.scriptContext.addError(instance.UUID, "MaxRows query size too large to handle!"); 426 ScriptEngineUtil.setStopFlag(instance.UUID); 427 } 428 instance.QUERIES[record.s_RV24].fullBatch = false; 429 log("MaxRows parameter is too small for this database source!", 3); 430 } 431 } 432 } 433 } 434 435 return record; 436 }; 437 438 /** 439 * This method sends data to the attached Connector. 440 * Note that if this function is used the Collector can support attachment 441 * of only a single Connector. It handles both sending strings to Process Connectors 442 * and SQL Queries to DB Connectors. 443 * 444 * @param {Object} msg Message to send to the Connector 445 * @return {Boolean} Status of send attempt 446 * @see SQLQuery 447 */ 448 this.send = function(msg) { 449 // Ideally I'd like a return result in case the message could not be sent 450 var data = msg; 451 if ( msg instanceof SQLQuery ) { 452 data = msg.buildQuery(); 453 // Reset the flags for the next batch 454 msg.noNewData=true; 455 msg.fullBatch=false; 456 } 457 return instance.CONFIG.scriptContext.sendTXData(data); 458 }; 459 460 /** 461 * This method frees up resources held by this object. 462 */ 463 this.cleanup = function() { 464 if ( this.readThread != null ) { 465 /* Interrupt the thread so that it exists now, rather than 466 * blocking on reading from the queue. 467 */ 468 this.readThread.interrupt(); 469 this.readThread = null; 470 } 471 } 472 473 /** 474 * The DBQuery object is used to hold the default SQL query that will be issued 475 * against a database source. It is only used if the Connector is a DB Connector, 476 * and will be initialized on receipt of the first record. 477 * @see SQLQuery 478 */ 479 this.DBQuery = null; 480 } 481 482