1 /* 2 * Copyright © [2008-2009] Novell, Inc. All Rights Reserved. 3 * 4 * USE AND REDISTRIBUTION OF THIS WORK IS SUBJECT TO THE DEVELOPER LICENSE AGREEMENT 5 * OR OTHER AGREEMENT THROUGH WHICH NOVELL, INC. MAKES THE WORK AVAILABLE. THIS WORK 6 * MAY NOT BE ADAPTED WITHOUT NOVELL'S PRIOR WRITTEN CONSENT. 7 * 8 * NOVELL PROVIDES THE WORK "AS IS," WITHOUT ANY EXPRESS OR IMPLIED WARRANTY, 9 * INCLUDING WITHOUT LIMITATION THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR 10 * A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. NOVELL, THE AUTHORS OF THE WORK, AND THE 11 * OWNERS OF COPYRIGHT IN THE WORK ARE NOT LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER 12 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT, OR OTHERWISE, ARISING FROM, OUT OF, 13 * OR IN CONNECTION WITH THE WORK OR THE USE OR OTHER DEALINGS IN THE WORK. 14 */ 15 16 /** 17 * @fileoverview 18 * This file defines the Connector class, which represents the Connector(s) that pass 19 * input data to the Collector. 20 * @name Connector Class 21 */ 22 23 /** 24 * This class represents the Connector(s) passing data to the Collector. 25 * Connectors actually feed a single inbound queue of event data, so you cannot 26 * address them individually. This has implications for scenarios where you must 27 * send data to the Connector - in this case you can only have a single Connector 28 * per Collector (DB Connector, for example). 29 * The template handles most things to do with reading and writing to the Connector, 30 * but in some unusual circumstances you may need to use this class. 31 * @author Novell Engineering 32 * @version 6.1 33 * @constructor 34 * @param {String} uuid UUID of the Connector in ESM 35 */ 36 function Connector(uuid) { 37 38 if (typeof uuid != "undefined" && typeof ESM != "undefined") { 39 this.UUID = uuid; 40 this.connectorObject = ESM.groupForId(this.UUID); 41 } 42 43 /** 44 * Starts this Connector in ESM. 45 */ 46 this.start = function() { 47 if( typeof this.connectorObject != "undefined" ) { 48 this.connectorObject.start(); 49 } 50 }; 51 52 /** 53 * Stops this Connector in ESM. 54 */ 55 this.stop = function() { 56 if( typeof this.connectorObject != "undefined" ) { 57 this.connectorObject.stop(); 58 } 59 }; 60 61 /** 62 * Adds a default parser that will automatically be called to determine the current offset. 63 * The parser should be defined as a function that can be attached to a SQLQuery. 64 * instance.PARSER is provided as a convenient place to store pre-defined parser routines. 65 * Any time a new event source is discovered and a SQLQuery is allocated for it, this default parser 66 * will be attached to it. 67 * <p> 68 * Example: 69 * <pre> 70 * // ...in initialize() method 71 * instance.PARSER.getOffset = function(input) { 72 * return input.RXMap.RecNumber; 73 * ... 74 * }; 75 * conn.addParser(instance.PARSER.getOffset); 76 * </pre> 77 * Note that you could simply create a single parser that will be assigned by default to all SQLQuery objects, 78 * but you can also predefine several parsers and dynamically attach them directly to SQLQuery objects; see 79 * the SQLQuery object API for details. 80 * @see SQLQuery 81 * @param {Function} parser - The parser function to attach to this session 82 * @return {Boolean} Result 83 */ 84 this.addParser = function( parser ) { 85 if ( typeof parser != 'function' ) { return false; } 86 this.Parser = parser; 87 return true; 88 }; 89 90 /** 91 * The parser used to pull the latest offset out of the last database record. 92 */ 93 this.Parser = function(input) { 94 return true; 95 }; 96 97 /** 98 * Enable suboffset handling for SQLQuery objects and add a handler that will be automatically called to determine the suboffset. 99 * Many databases don't use an incremental event ID that can be guaranteed to be unique per event, and properly increment over time. 100 * To handle this scenario, we commonly query based on a timestamp offset, but in many cases the resolution of the timestamp is such that 101 * we may get multiple events for any given offset. Since we can't guarantee that we will get all the events assigned to a single offset 102 * (either because we hit MaxRows or because some of the events hadn't been generated yet), we need to query the same offset again until we 103 * see the next offset in the received data (which implies that we've passed into the next timeslice). 104 * The resulting issue, however, is that we may get duplicate events in subsequent queries which we must filter out be keeping track of 105 * which events we've already processed. To do this, we need to have a way to uniquely identify each event by some field or combination of fields, 106 * but this value need not be incremental. 107 * This feature uses an internal cache of processed events, plus a registered function to extract the unique value from processed events. 108 * @example 109 * <p> 110 * Example: 111 * <pre> 112 * // ...in initialize() method 113 * instance.PARSER.getSubOffset = function(input) { 114 * return input.RXMap.RecID; 115 * ... 116 * }; 117 * conn.addSubParser(instance.PARSER.getSubOffset); 118 * </pre> 119 * Note that if you define a subparser, it will be automatically attached to all SQLQuery objects that are allocated for event sources. 120 * If you have some event sources that use suboffsets and some that don't, you'll manually have to disable suboffsets for those SQLQuery objects 121 * that don't use suboffsets (see SQLQuery API). 122 * @param {Function} parser - The parser function to attach to this session 123 * @return {Boolean} Result 124 * @see SQLQuery 125 */ 126 this.addSubParser = function( subparser ) { 127 if ( typeof subparser != 'function' ) { return false; } 128 this.SubParser = subparser; 129 instance.CONFIG.params.SubOffsets = true; 130 return true; 131 } 132 133 /** 134 * The parser used to pull the latest suboffset out of the last database record. 135 */ 136 this.SubParser = function(input) { 137 return true; 138 }; 139 140 141 if ( typeof scriptEnv != "undefined" ) { // This is an Action - short circuit 142 return true; 143 } 144 145 if ( instance.CONFIG.params.Conn_Retry != "none" ) { 146 // Set up queue and invoke separate thread to feed the queue with ScriptEngineContext.readData() 147 var queue = new java.util.concurrent.SynchronousQueue(); 148 this.readThread = java.lang.Thread(function() { 149 var ctx = instance.CONFIG.scriptContext; 150 while(ctx.getKeepRunning()) { 151 queue.put(ctx.readData()); 152 } 153 }); 154 this.readThread.setName("collector-read-thread-"+this.UUID); 155 this.readThread.setDaemon(true); 156 this.readThread.start(); 157 158 this.retries = 1; // Initial retries for falloff 159 160 function getData(secs){ 161 if (typeof secs == "undefined") { 162 return queue.poll(0, java.util.concurrent.TimeUnit.SECONDS); 163 } 164 if (secs > 0) { 165 return queue.poll(secs, java.util.concurrent.TimeUnit.SECONDS); 166 } 167 else 168 if (secs == 0) { 169 return queue.take(); 170 } 171 } 172 } 173 174 175 /** 176 * This method reads the next set of input data from the Connector queue. 177 * Connector.read() returns a Record object that is a map of the input data received plus Connector 178 * meta-information. 179 * @return {Record} The record received from the Connector queue, basically a hash map 180 */ 181 this.read = function() { 182 var record = new Record(); 183 try { 184 if (instance.CONFIG.params.Conn_Retry != "none") { 185 if (instance.CONFIG.params.Conn_Retry == "fixed") { 186 record.connectorData = getData(instance.CONFIG.params.RX_Timeout); 187 } else { 188 for (var i=0; i < this.retries; i++) { 189 // short retry to ensure that we check if we should keep running 190 record.connectorData = getData(5); 191 // Check if the Collector was stopped 192 if(!ScriptEngineUtil.keepRunning() || record.connectorData != null) { 193 break; 194 } 195 // else loop - no data yet 196 } 197 } 198 } 199 200 if ( record.connectorData === null || typeof record.CONNECTION_ERROR != "undefined" ) { 201 /* If we didn't get any data, or the data we got was a notification that another 202 * query was needed, then set NODATA on the record. 203 */ 204 record.CONNECTION_ERROR = record.CONNECTION_ERROR ? record.CONNECTION_ERROR : "NODATA"; 205 if (this.retries < 24) { 206 /* If the last max number of retries we did was less than 24, then 207 * set max number of retries higher so that we'll retry more next time. 208 */ 209 this.retries = this.retries * 2; 210 } 211 return record; 212 } 213 214 // We got data! 215 this.retries = 1; // we got data so reset retries to 1 216 var connectorUnmodData = record.connectorData.getUnmodifiableData(); 217 var entries = connectorUnmodData.entrySet().toArray(); 218 for (var j = entries.length-1; j > -1; --j) { // DGC Check this!!! 219 var key = String(entries[j].key); 220 var value = String(entries[j].value); 221 222 // Put anything starting with "s_" or "i_" into the record. 223 if (key.length >= 2 && key.charAt(1) == '_' && (key.charAt( 0) == 's' || key.charAt( 0) == 'i')) { 224 record[key] = value; 225 } else { 226 switch (key) { 227 case "CONNECTION_METHOD": 228 case "CONNECTION_MODE": 229 case "o": 230 case "Row_Left": 231 case "Application": 232 case "CurrentFile": 233 case "SourceIP": 234 case "QueryID": 235 record[key] = value; 236 break; 237 default: 238 record.RXMap[key] = value; 239 break; 240 } 241 } 242 243 } 244 } catch(err) { 245 record.CONNECTION_ERROR = err; 246 log("Error reading next connector data object: "+err); 247 return record; 248 } 249 250 // Handle re-read of files created by Connectors ("Replay" mode) 251 if (typeof record.CONNECTION_METHOD != 'undefined' && record.CONNECTION_MODE == "Replay") { 252 switch (record.s_RXBufferString[0]) { 253 case "{": // New-style JSON dump 254 var tmpObj = JSON.parse(record.s_RXBufferString); 255 break; 256 default: // old-style dump 257 // this is a weird quoted thingy - replace the double quotes first 258 var tmpString = record.s_RXBufferString.replace(/\"\"/g, "@QUOTE@"); 259 tmpString = tmpString.replace(/\"/g, "@EMBEDQUOTE@"); 260 tmpString = tmpString.replace(/@QUOTE@/g, "\""); 261 tmpString = tmpString.trim(); // remove trailing whitespace 262 tmpString = tmpString.replace(/\\n/g, "\n"); // get rid of encoded newlines 263 tmpString = tmpString.replace(/\/x(\d\d\d\d)/g, "\u$1"); // and convert Java unicode to JS unicode for some common separators 264 var tmpObj = tmpString.parseNVP(" ", "=", "\""); 265 if (typeof tmpObj.s_RXBufferString != "undefined") { 266 tmpObj.s_RXBufferString = tmpObj.s_RXBufferString.replace(/@EMBEDQUOTE@/g, "\""); 267 } 268 if (typeof tmpObj.s_Body != "undefined") { 269 tmpObj.s_Body = tmpObj.s_Body.replace(/@EMBEDQUOTE@/g, "\""); 270 } 271 break; 272 // End of Connector Dump switch 273 } 274 for (var key in tmpObj) { 275 // replace input map with replayed values 276 if (tmpObj.hasOwnProperty(key)) { 277 var value = String(tmpObj[key]); 278 // Put anything starting with "s_" or "i_" into the record. 279 if (key.length >= 2 && key.charAt(1) == '_' && (key.charAt( 0) == 's' || key.charAt( 0) == 'i')) { 280 record[key] = value; 281 } else { 282 switch (key) { 283 case "CONNECTION_METHOD": 284 case "CONNECTION_MODE": 285 case "o": 286 case "Row_Left": 287 case "Application": 288 case "CurrentFile": 289 case "SourceIP": 290 case "QueryID": 291 record[key] = value; 292 break; 293 default: 294 record.RXMap[key] = value; 295 break; 296 } 297 } 298 } 299 } 300 } // End of Replay processing 301 302 /* Databases are a bit special, mostly because we have to query them interactively. The general way that the Connector 303 * works is that it will send us a special "offset" record when it is first started to tell us where we left off, 304 * then we send it a query, then the Connector gets a batch of data which it feeds to us one record at a time. 305 * If the query is a normal SQL query, the Row_Left variable will count down from "batchsize" to 0. 306 * If it's a stored procedure, I think right now it counts up. 307 * When we see the last record, we need to re-issue the query. 308 * We will also get a Row_Left=-1 record when a query returns no data (unlike other Connectors, which block). 309 */ 310 if (record.CONNECTION_METHOD == "DATABASE") { // this is a DB Connector 311 // First, check if we have seen any data from this particular event source before 312 // If not, allocate a new SQLQuery object and associate it with the event source; initialize with the default query and parsers. 313 if (typeof instance.QUERIES[record.s_RV24] == "undefined") { 314 if(typeof instance.CONFIG.params.Query_Variant != "undefined") { 315 instance.QUERIES[record.s_RV24] = new SQLQuery("sqlquery" + instance.CONFIG.params.Query_Variant + ".base"); 316 } else { 317 instance.QUERIES[record.s_RV24] = new SQLQuery(); 318 } 319 320 instance.QUERIES[record.s_RV24].addParser(this.Parser); 321 if (instance.CONFIG.params.SubOffsets == true) { instance.QUERIES[record.s_RV24].addSubParser(this.SubParser); } 322 323 // Special support for a test query specified in a parameter; DELETE PARAM FROM RELEASE COLLECTORS 324 if(typeof instance.CONFIG.params.Test_Query != "undefined" && instance.CONFIG.params.Test_Query != "") { 325 instance.QUERIES[record.s_RV24].baseQuery = instance.CONFIG.params.Test_Query; 326 } 327 } 328 329 // Next, handle special records that should trigger new queries or other special behavior 330 switch(record.Row_Left) { 331 case "-2": 332 instance.QUERIES[record.s_RV24].setOffset(record.connectorData.offset); 333 // This handles a case where multiple -2 records are sitting in the Connector queue and have to be consumed before 334 // we get to the results from our query 335 if (instance.QUERIES[record.s_RV24].queryDelay > -1) { 336 conn.send(instance.QUERIES[record.s_RV24]); 337 instance.QUERIES[record.s_RV24].queryDelay = -1; 338 } 339 // Send the initial query to the DB 340 record.CONNECTION_ERROR = "OFFSET_NOT_EVENT"; 341 return record; 342 case "-1": 343 // This means that no data was returned from our query, which means that we should pause, then query again 344 // We keep the status and schedule for the next query in the SQLQuery object 345 instance.QUERIES[record.s_RV24].needQuery = true; 346 if (instance.QUERIES[record.s_RV24].queryDelay > 0 && instance.QUERIES[record.s_RV24].queryDelay < 33) { 347 instance.QUERIES[record.s_RV24].queryDelay=instance.QUERIES[record.s_RV24].queryDelay * 2; 348 } else { 349 instance.QUERIES[record.s_RV24].queryDelay = 1; 350 } 351 instance.QUERIES[record.s_RV24].queryScheduled = new Date().addSeconds(instance.QUERIES[record.s_RV24].queryDelay * 2); 352 instance.QUERIES[record.s_RV24].noNewData = true; 353 instance.QUERIES[record.s_RV24].fullBatch = false; 354 record.CONNECTION_ERROR = "NODATA"; 355 return record; 356 case "0": 357 instance.QUERIES[record.s_RV24].needQuery = true 358 default: 359 // Note since there's no break, this is also executed for case 0 360 instance.QUERIES[record.s_RV24].setOffset(instance.QUERIES[record.s_RV24].Parser(record)); 361 instance.QUERIES[record.s_RV24].queryDelay = 0; 362 break; 363 } 364 365 // Special suboffset handling 366 if ( instance.QUERIES[record.s_RV24].SubOffsets == true ) { 367 // Check if we got a full batch back and it's the first record in the batch 368 if ( record.CONNECTION_MODE == "sp" ) { // Handle stored procedures, which count the wrong way 369 if (record.Row_Left >= instance.QUERIES[record.s_RV24].maxRows) { 370 instance.QUERIES[record.s_RV24].fullBatch = true; 371 } 372 if (record.Row_Left == 1) { 373 instance.QUERIES[record.s_RV24].baseoffset = instance.QUERIES[record.s_RV24].Parser(record); 374 } 375 } else { 376 if (record.Row_Left >= (instance.QUERIES[record.s_RV24].maxRows-1) ) { // This will only work for regular (non-SP) queries 377 instance.QUERIES[record.s_RV24].fullBatch = true; 378 instance.QUERIES[record.s_RV24].baseoffset = instance.QUERIES[record.s_RV24].Parser(record); 379 } 380 } 381 // If this record has been seen before, suppress it 382 if ( instance.QUERIES[record.s_RV24].CACHE.hasOwnProperty(instance.QUERIES[record.s_RV24].SubParser(record)) ) { 383 record.CONNECTION_ERROR = "ALREADY_SEEN"; // Should short-circuit parsing 384 } else { 385 instance.QUERIES[record.s_RV24].noNewData = false; // Got new data in this batch 386 } 387 // If we processed the entire batch without any new data, then we should pause as we got no data 388 if (record.Row_Left == 0 && instance.QUERIES[record.s_RV24].noNewData) { 389 instance.QUERIES[record.s_RV24].needQuery = true; 390 if (instance.QUERIES[record.s_RV24].queryDelay > 0 && instance.QUERIES[record.s_RV24].queryDelay < 33) { 391 instance.QUERIES[record.s_RV24].queryDelay=instance.QUERIES[record.s_RV24].queryDelay * 2; 392 } else { 393 instance.QUERIES[record.s_RV24].queryDelay = 1; 394 } 395 instance.QUERIES[record.s_RV24].queryScheduled = new Date().addSeconds(instance.QUERIES[record.s_RV24].queryDelay * 2); 396 // If the offset didn't go up, then maxRows is too small 397 if (instance.QUERIES[record.s_RV24].fullBatch) { 398 instance.CONFIG.scriptContext.clearError(instance.UUID); 399 if ( instance.QUERIES[record.s_RV24].maxRows < 5000 ) { 400 instance.QUERIES[record.s_RV24].maxRows = instance.QUERIES[record.s_RV24].maxRows * 2; 401 } else { 402 log("MaxRows query size too large to handle!", 5); 403 instance.CONFIG.scriptContext.addError(instance.UUID, "MaxRows query size too large to handle!"); 404 ScriptEngineUtil.setStopFlag(instance.UUID); 405 } 406 instance.QUERIES[record.s_RV24].fullBatch = false; 407 log("MaxRows parameter is too small for this database source!", 3); 408 } 409 } 410 } 411 } 412 413 return record; 414 }; 415 416 /** 417 * This method sends data to the attached Connector. 418 * Note that if this function is used the Collector can support attachment 419 * of only a single Connector. It handles both sending strings to Process Connectors 420 * and SQL Queries to DB Connectors. 421 * 422 * @param {Object} msg Message to send to the Connector 423 * @return {Boolean} Status of send attempt 424 * @see SQLQuery 425 */ 426 this.send = function(msg) { 427 // Ideally I'd like a return result in case the message could not be sent 428 var data = msg; 429 if ( msg instanceof SQLQuery ) { 430 data = msg.buildQuery(); 431 // Reset the noNewData flag for the next batch 432 msg.noNewData=true; 433 } 434 return instance.CONFIG.scriptContext.sendTXData(data); 435 }; 436 437 /** 438 * This method frees up resources held by this object. 439 */ 440 this.cleanup = function() { 441 if ( this.readThread != null ) { 442 /* Interrupt the thread so that it exists now, rather than 443 * blocking on reading from the queue. 444 */ 445 this.readThread.interrupt(); 446 this.readThread = null; 447 } 448 } 449 450 /** 451 * The DBQuery object is used to hold the default SQL query that will be issued 452 * against a database source. It is only used if the Connector is a DB Connector, 453 * and will be initialized on receipt of the first record. 454 * @see SQLQuery 455 */ 456 this.DBQuery = null; 457 } 458 459