[Pkg-nagios-changes] [SCM] UNNAMED PROJECT branch, debian/master, updated. 810edbdd3feedbfe37f4a65bee50b57b2f60fa2a

Gerhard Lausser gerhard.lausser at consol.de
Tue Feb 28 22:15:13 UTC 2012


The following commit has been merged in the debian/master branch:
commit f76d6b967f10f13c05c2258972f07a8f9ad9f1d1
Author: Gerhard Lausser <gerhard.lausser at consol.de>
Date:   Sat Jan 21 15:26:09 2012 +0100

    ADD two new modules for storing livestatus logs. sqlite and mongodb

diff --git a/shinken/modules/logstore_mongodb.py b/shinken/modules/logstore_mongodb.py
new file mode 100644
index 0000000..8d2e6cc
--- /dev/null
+++ b/shinken/modules/logstore_mongodb.py
@@ -0,0 +1,277 @@
+# import von modules/livestatus_logstore
+
+
+"""
+This class is for attaching a sqlite database to a livestatus broker module.
+It is one possibility for an exchangeable storage for log broks
+"""
+
+import os
+import time
+import datetime
+import re
+import pymongo
+from shinken.objects.service import Service
+from livestatus_broker.livestatus_stack import LiveStatusStack
+from livestatus_broker.mapping import LOGCLASS_ALERT, LOGCLASS_PROGRAM, LOGCLASS_NOTIFICATION, LOGCLASS_PASSIVECHECK, LOGCLASS_COMMAND, LOGCLASS_STATE, LOGCLASS_INVALID, LOGOBJECT_INFO, LOGOBJECT_HOST, LOGOBJECT_SERVICE, Logline
+
+from pymongo import Connection
+from pymongo.errors import AutoReconnect
+
+
+from shinken.basemodule import BaseModule
+from shinken.objects.module import Module
+
+properties = {
+    'daemons' : ['livestatus'],
+    'type' : 'logstore_mongodb',
+    'external' : False,
+    'phases' : ['running'],
+    }
+
+
+#called by the plugin manager
+def get_instance(plugin):
+    print "Get an LogStore MongoDB module for plugin %s" % plugin.get_name()
+    instance = LiveStatusLogStoreMongoDB(plugin)
+    return instance
+
+def row_factory(cursor, row):
+    """Handler for the sqlite fetch method."""
+    return Logline(cursor.description, row)
+
+
+class LiveStatusLogStoreError(Exception):
+    pass
+
+
+class LiveStatusLogStoreMongoDB(BaseModule):
+
+    def __init__(self, modconf):
+        BaseModule.__init__(self, modconf)
+        self.plugins = []
+        # mongodb://host1,host2,host3/?safe=true;w=2;wtimeoutMS=2000
+        self.mongodb_uri = getattr(modconf, 'mongodb_uri', None)
+        self.database = getattr(modconf, 'database', 'logs')
+        self.use_aggressive_sql = True
+        max_logs_age = getattr(modconf, 'max_logs_age', '365')
+        maxmatch = re.match(r'^(\d+)([dwm]*)$', max_logs_age)
+        if maxmatch is None:
+            print 'Warning : wrong format for max_logs_age. Must be <number>[d|w|m|y] or <number> and not %s' % max_logs_age
+            return None
+        else:
+            if not maxmatch.group(2):
+                self.max_logs_age = int(maxmatch.group(1))
+            elif maxmatch.group(2) == 'd':
+                self.max_logs_age = int(maxmatch.group(1))
+            elif maxmatch.group(2) == 'w':
+                self.max_logs_age = int(maxmatch.group(1)) * 7
+            elif maxmatch.group(2) == 'm':
+                self.max_logs_age = int(maxmatch.group(1)) * 31
+            elif maxmatch.group(2) == 'y':
+                self.max_logs_age = int(maxmatch.group(1)) * 365
+
+        # This stack is used to create a full-blown select-statement
+        self.mongo_filter_stack = LiveStatusMongoStack()
+        # This stack is used to create a minimal select-statement which
+        # selects only by time >= and time <=
+        self.mongo_time_filter_stack = LiveStatusMongoStack()
+        self.is_connected = False
+        # Now sleep one second, so that won't get lineno collisions with the last second
+        time.sleep(1)
+        self.lineno = 0
+
+    def load(self, app):
+        self.app = app
+
+    def init(self):
+        pass
+
+    def open(self):
+        print "open LiveStatusLogStoreMongoDB ok"
+        try:
+            self.conn = pymongo.Connection(self.mongodb_uri, fsync=True)
+            self.db = self.conn[self.database]
+            self.is_connected = True
+        except AutoReconnect, exp:
+            # now what, ha?
+            print "LiveStatusLogStoreMongoDB.AutoReconnect", exp
+            raise
+            pass
+
+    def close(self):
+        self.conn.disconnect()
+
+    def commit(self):
+        pass
+
+    def do_i_need_this_manage_brok(self, brok):
+        """ Look for a manager function for a brok, and call it """
+        manage = getattr(self, 'manage_' + brok.type + '_brok', None)
+        if manage:
+            return manage(brok)
+
+    def manage_log_brok(self, b):
+        data = b.data
+        line = data['log']
+        try:
+            logline = Logline(line=line)
+            values = logline.as_dict()
+        except Exception, exp:
+            print "Unexpected error:", exp
+        try:
+            if logline.logclass != LOGCLASS_INVALID:
+                self.db.logs.insert(values)
+        except Exception, exp:
+            print "An error occurred:", exp
+            print "DATABASE ERROR!!!!!!!!!!!!!!!!!"
+        #FIXME need access to this#self.livestatus.count_event('log_message')
+
+    def add_filter(self, operator, attribute, reference):
+	if attribute == 'time':
+	    self.mongo_time_filter_stack.put_stack(self.make_mongo_filter(operator, attribute, reference))
+	self.mongo_filter_stack.put_stack(self.make_mongo_filter(operator, attribute, reference))
+
+    def add_filter_and(self, andnum):
+	self.mongo_filter_stack.and_elements(andnum)
+
+    def add_filter_or(self, ornum):
+	self.mongo_filter_stack.or_elements(ornum)
+
+    def get_live_data_log(self):
+        """Like get_live_data, but for log objects"""
+        # finalize the filter stacks
+	self.mongo_time_filter_stack.and_elements(self.mongo_time_filter_stack.qsize())
+	self.mongo_filter_stack.and_elements(self.mongo_filter_stack.qsize())
+        self.use_aggressive_sql = True
+        if self.use_aggressive_sql:
+            # Be aggressive, get preselected data from sqlite and do less
+            # filtering in python. But: only a subset of Filter:-attributes
+            # can be mapped to columns in the logs-table, for the others
+            # we must use "always-true"-clauses. This can result in
+            # funny and potentially ineffective sql-statements
+            mongo_filter_func = self.mongo_filter_stack.get_stack()
+        else:
+            # Be conservative, get everything from the database between
+            # two dates and apply the Filter:-clauses in python
+            mongo_filter_func = self.mongo_time_filter_stack.get_stack()
+        result = []
+        mongo_filter = mongo_filter_func()
+        print "mongo filter is", mongo_filter
+        # We can apply the filterstack here as well. we have columns and filtercolumns.
+        # the only additional step is to enrich log lines with host/service-attributes
+        # A timerange can be useful for a faster preselection of lines
+        filter_element = eval(mongo_filter)
+        print "mongo filter iis", type(filter_element)
+        print "mongo filter iis", filter_element
+        dbresult = []
+        columns = ['logobject', 'attempt', 'logclass', 'command_name', 'comment', 'contact_name', 'host_name', 'lineno', 'message', 'options', 'plugin_output', 'service_description', 'state', 'state_type', 'time', 'type']
+        if not self.is_connected:
+            print "sorry, not connected"
+        else:
+            dbresult = [Logline([(c, ) for c in columns], [x[col] for col in columns]) for x in self.db.logs.find(filter_element)]
+        return dbresult
+
+    def make_mongo_filter(self, operator, attribute, reference):
+        # The filters are text fragments which are put together to form a sql where-condition finally.
+        # Add parameter Class (Host, Service), lookup datatype (default string), convert reference
+        # which attributes are suitable for a sql statement
+        good_attributes = ['time', 'attempt', 'class', 'command_name', 'comment', 'contact_name', 'host_name', 'plugin_output', 'service_description', 'state', 'state_type', 'type']
+        good_operators = ['=', '!=']
+        #  put strings in '' for the query
+        if attribute in ['command_name', 'comment', 'contact_name', 'host_name', 'plugin_output', 'service_description', 'state_type', 'type']:
+            attribute = "'%s'" % attribute
+
+        def eq_filter():
+            if reference == '':
+                return '\'%s\' : \'\'' % (attribute,)
+            else:
+                return '\'%s\' : %s' % (attribute, reference)
+        def ne_filter():
+            if reference == '':
+                return '\'%s\' : { \'$ne\' : '' }' % (attribute,)
+            else:
+                return '\'%s\' : { \'$ne\' : %s }' % (attribute, reference)
+        def gt_filter():
+            return '\'%s\' : { \'$gt\' : %s }' % (attribute, reference)
+        def ge_filter():
+            return '\'%s\' : { \'$gte\' : %s }' % (attribute, reference)
+        def lt_filter():
+            return '\'%s\' : { \'$lt\' : %s }' % (attribute, reference)
+        def le_filter():
+            return '\'%s\' : { \'$lte\' : %s }' % (attribute, reference)
+        def match_filter():
+            return '\'%s\' : { \'$regex\' : \'%s\' }' % (attribute, reference)
+        def no_filter():
+            return '{}'
+        if attribute not in good_attributes:
+            return no_filter
+        if operator == '=':
+            return eq_filter
+        if operator == '>':
+            return gt_filter
+        if operator == '>=':
+            return ge_filter
+        if operator == '<':
+            return lt_filter
+        if operator == '<=':
+            return le_filter
+        if operator == '!=':
+            return ne_filter
+        if operator == '~':
+            return match_filter
+
+
+class LiveStatusMongoStack(LiveStatusStack):
+    """A Lifo queue for filter functions.
+
+    This class inherits either from MyLifoQueue or Queue.LifoQueue
+    whatever is available with the current python version.
+
+    Public functions:
+    and_elements -- takes a certain number (given as argument)
+    of filters from the stack, creates a new filter and puts
+    this filter on the stack. If these filters are lambda functions,
+    the new filter is a boolean and of the underlying filters.
+    If the filters are sql where-conditions, they are also concatenated
+    with and to form a new string containing a more complex where-condition.
+
+    or_elements --- the same, only that the single filters are
+    combined with a logical or.
+
+    """
+
+    def __init__(self, *args, **kw):
+        self.type = 'mongo'
+        self.__class__.__bases__[0].__init__(self, *args, **kw)
+
+    def and_elements(self, num):
+        """Take num filters from the stack, and them and put the result back"""
+        if num > 1:
+            filters = []
+            for _ in range(num):
+                filters.append(self.get_stack())
+            # Take from the stack:
+            # Make a combined anded function
+            # Put it on the stack
+            print "filter is", filters
+            and_clause = lambda: '{\'$and\' : [%s]}' % ', '.join('{ ' + x() + ' }' for x in filters)
+            print "and_elements", and_clause
+            self.put_stack(and_clause)
+
+    def or_elements(self, num):
+        """Take num filters from the stack, or them and put the result back"""
+        if num > 1:
+            filters = []
+            for _ in range(num):
+                filters.append(self.get_stack())
+            or_clause = lambda: '{\'$or\' : [%s]}' % ', '.join('{ ' + x() + ' }' for x in filters)
+            print "or_elements", or_clause
+            self.put_stack(or_clause)
+
+    def get_stack(self):
+        """Return the top element from the stack or a filter which is always true"""
+        if self.qsize() == 0:
+            return lambda: ''
+        else:
+            return self.get()
diff --git a/shinken/modules/logstore_sqlite.py b/shinken/modules/logstore_sqlite.py
new file mode 100644
index 0000000..f22996a
--- /dev/null
+++ b/shinken/modules/logstore_sqlite.py
@@ -0,0 +1,556 @@
+# import von modules/livestatus_logstore
+
+
+"""
+This class is for attaching a sqlite database to a livestatus broker module.
+It is one possibility for an exchangeable storage for log broks
+"""
+
+import os
+import time
+import datetime
+import re
+from shinken.objects.service import Service
+from livestatus_broker.livestatus_stack import LiveStatusStack
+from livestatus_broker.mapping import LOGCLASS_ALERT, LOGCLASS_PROGRAM, LOGCLASS_NOTIFICATION, LOGCLASS_PASSIVECHECK, LOGCLASS_COMMAND, LOGCLASS_STATE, LOGCLASS_INVALID, LOGOBJECT_INFO, LOGOBJECT_HOST, LOGOBJECT_SERVICE, Logline
+
+old_implementation = False
+try:
+    import sqlite3
+except ImportError: # python 2.4 do not have it
+    try:
+        import pysqlite2.dbapi2 as sqlite3 # but need the pysqlite2 install from http://code.google.com/p/pysqlite/downloads/list
+    except ImportError: # python 2.4 do not have it
+        import sqlite as sqlite3 # one last try
+        old_implementation = True
+
+
+
+from shinken.basemodule import BaseModule
+from shinken.objects.module import Module
+
+properties = {
+    'daemons' : ['livestatus'],
+    'type' : 'logstore_sqlite',
+    'external' : False,
+    'phases' : ['running'],
+    }
+
+
+#called by the plugin manager
+def get_instance(plugin):
+    print "Get an LogStore Sqlite module for plugin %s" % plugin.get_name()
+    instance = LiveStatusLogStoreSqlite(plugin)
+    return instance
+
+def row_factory(cursor, row):
+    """Handler for the sqlite fetch method."""
+    return Logline(sqlite_cursor=cursor.description, sqlite_row=row)
+
+
+class LiveStatusLogStoreError(Exception):
+    pass
+
+
+class LiveStatusLogStoreSqlite(BaseModule):
+
+    def __init__(self, modconf):
+        BaseModule.__init__(self, modconf)
+        self.plugins = []
+        self.database_file = getattr(modconf, 'database_file', os.path.join(os.path.abspath('.'), 'var', 'livestatus.db'))
+        self.archive_path = getattr(modconf, 'archive_path', os.path.join(os.path.dirname(self.database_file), 'archives'))
+        try:
+            os.stat(self.archive_path)
+        except:
+            os.mkdir(self.archive_path)
+        max_logs_age = getattr(modconf, 'max_logs_age', '365')
+        maxmatch = re.match(r'^(\d+)([dwm]*)$', max_logs_age)
+        if maxmatch is None:
+            print 'Warning : wrong format for max_logs_age. Must be <number>[d|w|m|y] or <number> and not %s' % max_logs_age
+            return None
+        else:
+            if not maxmatch.group(2):
+                self.max_logs_age = int(maxmatch.group(1))
+            elif maxmatch.group(2) == 'd':
+                self.max_logs_age = int(maxmatch.group(1))
+            elif maxmatch.group(2) == 'w':
+                self.max_logs_age = int(maxmatch.group(1)) * 7
+            elif maxmatch.group(2) == 'm':
+                self.max_logs_age = int(maxmatch.group(1)) * 31
+            elif maxmatch.group(2) == 'y':
+                self.max_logs_age = int(maxmatch.group(1)) * 365
+        self.use_aggressive_sql = (getattr(modconf, 'use_aggressive_sql', '0') == '1')
+
+        # This stack is used to create a full-blown select-statement
+        self.sql_filter_stack = LiveStatusSqlStack()
+        # This stack is used to create a minimal select-statement which
+        # selects only by time >= and time <=
+        self.sql_time_filter_stack = LiveStatusSqlStack()
+
+        # Now sleep one second, so that won't get lineno collisions with the last second
+        time.sleep(1)
+        Logline.lineno = 0
+
+
+    def load(self, app):
+        self.app = app
+
+    def init(self):
+        self.old_implementation = old_implementation
+
+    def open(self):
+        print "open LiveStatusLogStoreSqlite ok"
+        self.dbconn = sqlite3.connect(self.database_file)
+        # Get no problem for utf8 insert
+        self.dbconn.text_factory = str
+        self.dbcursor = self.dbconn.cursor()
+        #self.dbconn.row_factory = row_factory
+        #self.execute("PRAGMA cache_size = 200000")
+        # Create db file and tables if not existing
+        self.prepare_log_db_table()
+        # Start with commit and rotate immediately so the interval timers
+        # get initialized properly
+        now = time.time()
+        self.next_log_db_commit = now
+        self.next_log_db_rotate = now
+        # Immediately archive data. This also splits old-style (storing logs
+        # from more than one day) up into many single-day databases
+        if self.max_logs_age > 0:
+            # open() is also called from log_db_do_archive (with max_logs_age
+            # of 0 though)
+            self.log_db_do_archive()
+
+    def close(self):
+        self.dbconn.commit()
+        self.dbconn.close()
+        self.dbconn = None
+        if self.max_logs_age == 0:
+            # Again, if max_logs_age is 0, we don't care for archives.
+            # If max_logs_age was manually set to 0, we know that we don't
+            # want archives. If it was set by log_db_do_archive(), we don't
+            # want to leave empty directories around.
+            try:
+                os.removedirs(self.archive_path)
+            except:
+                pass
+
+    def commit(self):
+        self.dbconn.commit()
+
+    def prepare_log_db_table(self):
+        # 'attempt', 'class', 'command_name', 'comment', 'contact_name', 'host_name', 'lineno', 'message',
+        # 'options', 'plugin_output', 'service_description', 'state', 'state_type', 'time', 'type',
+        cmd = "CREATE TABLE IF NOT EXISTS logs(logobject INT, attempt INT, class INT, command_name VARCHAR(64), comment VARCHAR(256), contact_name VARCHAR(64), host_name VARCHAR(64), lineno INT, message VARCHAR(512), options VARCHAR(512), plugin_output VARCHAR(256), service_description VARCHAR(64), state INT, state_type VARCHAR(10), time INT, type VARCHAR(64))"
+        self.execute(cmd)
+        cmd = "CREATE INDEX IF NOT EXISTS logs_time ON logs (time)"
+        self.execute(cmd)
+        cmd = "PRAGMA journal_mode=truncate"
+        self.execute(cmd)
+        self.commit()
+
+    def commit_and_rotate_log_db(self):
+        """Submit a commit or rotate the complete database file.
+
+        This function is called whenever the mainloop doesn't handle a request.
+        The database updates are committed every second.
+        Every day at 00:05 the database contents with a timestamp of past days
+        are moved to their own datafiles (one for each day). We wait until 00:05
+        because in a distributed environment even after 00:00 (on the broker host)
+        we might receive data from other hosts with a timestamp dating from yesterday.
+        """
+        now = time.time()
+        if self.next_log_db_commit <= now:
+            self.commit()
+            print "commit....."
+            self.next_log_db_commit = now + 1
+        if self.next_log_db_rotate <= now:
+            print "at %s we rotate the database file" % time.asctime(time.localtime(now))
+            # Take the current database file
+            # Move the messages into daily files
+            self.log_db_do_archive()
+
+            today = datetime.date.today()
+            today0005 = datetime.datetime(today.year, today.month, today.day, 0, 5, 0)
+            if now < time.mktime(today0005.timetuple()):
+                nextrotation = today0005
+            else:
+                nextrotation = today0005 + datetime.timedelta(days=1)
+
+            # See you tomorrow
+            self.next_log_db_rotate = time.mktime(nextrotation.timetuple())
+            print "next rotation at %s " % time.asctime(time.localtime(self.next_log_db_rotate))
+
+
+    def log_db_historic_contents(self):
+        """
+        Find out which time range is covered by the current datafile.
+        Return a list of historic datafiles which can be used to split up
+        the contents of the current datafile.
+        """
+        try:
+            dbresult = self.execute('SELECT MIN(time),MAX(time) FROM logs')
+            mintime = dbresult[0][0]
+            maxtime = dbresult[0][1]
+        except sqlite3.Error, e:
+            print "An error occurred:", e.args[0]
+        if mintime is None:
+            mintime = int(time.time())
+        if maxtime is None:
+            maxtime = int(time.time())
+        return self.log_db_relevant_files(mintime, maxtime, True)
+
+    def log_db_relevant_files(self, mintime, maxtime, preview=False):
+        """
+        Logging data created on different days are stored in separate
+        datafiles. For each day there is one of them.
+        Ths function takes a time range and returns the names of the files
+        where the logging data of the days covered by the time range can
+        be found.
+        If the preview parameter is false, only names of existing files
+        are returned.
+        The result is a list with one element for each day. The elements
+        themselves are lists consisting of the the following items:
+        - A Datetime object
+        - A short string of a day's date in the form db%Y%m%d
+        - The filename of a datafile which contains the loggings of this day.
+        - The unix timestamp of this day's 00:00 (start)
+        - The unix timestamp of the day's 00:00 (end + 1s)
+        Item no.2 can be used as a handle for the ATTACH-statement of sqlite.
+        If the list element describes the current day, item no.2 is "main".
+        """
+        #print time.asctime(time.localtime(mintime))
+        #print time.asctime(time.localtime(maxtime))
+        minday = datetime.datetime.fromtimestamp(mintime)
+        maxday = datetime.datetime.fromtimestamp(maxtime)
+        minday = datetime.datetime(minday.year, minday.month, minday.day, 0, 0, 0)
+        maxday = datetime.datetime(maxday.year, maxday.month, maxday.day, 0, 0, 0)
+        #print time.asctime(time.localtime(time.mktime(minday.timetuple())))
+        #print time.asctime(time.localtime(time.mktime(maxday.timetuple())))
+        result = []
+        today = datetime.date.today()
+        today = datetime.datetime(today.year, today.month, today.day, 0, 0, 0)
+        if maxday >= today:
+            # Only loop until yesterday
+            maxday = today - datetime.timedelta(days=1)
+        thisday = minday
+        while thisday <= maxday:
+            nextday = thisday + datetime.timedelta(days=1)
+            handle = "db" + thisday.strftime("%Y%m%d")
+            archive = os.path.join(self.archive_path, os.path.splitext(os.path.basename(self.database_file))[0] + "-" + thisday.strftime("%Y-%m-%d") + ".db")
+            if os.path.exists(archive) or preview:
+                result.append([thisday, handle, archive, int(time.mktime(thisday.timetuple())), int(time.mktime(nextday.timetuple()))])
+            thisday = nextday
+        if maxtime >= int(time.mktime(today.timetuple())):
+            # Also today's data are relevant, so we add the current database
+            result.append([today, "main", self.database_file, int(time.mktime(today.timetuple())), maxtime])
+        return result
+
+    def log_db_do_archive(self):
+        """
+        In order to limit the datafile's sizes we flush logs dating from
+        before today/00:00 to their own datafiles.
+        """
+        try:
+            os.stat(self.archive_path)
+        except:
+            os.mkdir(self.archive_path)
+        for day in self.log_db_historic_contents():
+            dayobj, handle, archive, starttime, stoptime = day
+            if handle == "main":
+                # Skip archiving of today's contents
+                continue
+            if not os.path.exists(archive):
+                # Create an empty datafile with the logs table
+                #tmpconn = LiveStatusDb(archive, None, 0)
+                #tmpconn.prepare_log_db_table()
+                #tmpconn.close()
+
+		dbmodconf = Module({'module_name' : 'LogStore',
+		    'module_type' : 'logstore_sqlite',
+		    'use_aggressive_sql' : '0',
+		    'database_file' : archive,
+                    'max_logs_age' : '0',
+		})
+                tmpconn = LiveStatusLogStoreSqlite(dbmodconf)
+                tmpconn.open()
+                tmpconn.close()
+
+            self.commit()
+            print "move logs from %s - %s to database %s" % (time.asctime(time.localtime(starttime)), time.asctime(time.localtime(stoptime)), archive)
+            cmd = "ATTACH DATABASE '%s' AS %s" % (archive, handle)
+            self.execute_attach(cmd)
+            cmd = "INSERT INTO %s.logs SELECT * FROM logs WHERE time >= %d AND time < %d" % (handle, starttime, stoptime)
+            self.execute(cmd)
+            cmd = "DELETE FROM logs WHERE time >= %d AND time < %d" % (starttime, stoptime)
+            self.execute(cmd)
+            self.commit()
+            cmd = "DETACH DATABASE %s" % handle
+            self.execute(cmd)
+            # This is necessary to shrink the database file
+            try:
+                self.execute('VACUUM')
+            except sqlite3.DatabaseError, exp:
+                print "WARNING : it seems your database is corrupted. Please recreate it"
+            self.commit()
+
+    def execute(self, cmd, values=None, row_factory=None):
+        try:
+            if values == None:
+                values = []
+            if sqlite3.paramstyle == 'pyformat':
+                matchcount = 0
+                for m in re.finditer(r"\?", cmd):
+                    cmd = re.sub('\\?', '%(' + str(matchcount) + ')s', cmd, 1)
+                    matchcount += 1
+                values = dict(zip([str(x) for x in xrange(len(values))], values))
+            if cmd.startswith("SELECT"):
+                already_had_row_factory = hasattr(self.dbconn, "row_factory")
+                if row_factory != None:
+                    self.dbcursor.close()
+                    if already_had_row_factory:
+                        orig_row_factory = self.dbconn.row_factory
+                    self.dbconn.row_factory = row_factory
+                    # We need to create a new cursor which knows how to row_factory
+                    # Simply setting conn.row_factory and using the old cursor
+                    # would not work
+                    self.dbcursor = self.dbconn.cursor()
+                self.dbcursor.execute(cmd, values)
+                dbresult = self.dbcursor.fetchall()
+                if row_factory != None:
+                    if sqlite3.paramstyle == 'pyformat':
+                        dbresult = [row_factory(self.dbcursor, d) for d in dbresult]
+                    if already_had_row_factory:
+                        self.dbconn.row_factory = orig_row_factory
+                    else:
+                        delattr(self.dbconn, "row_factory")
+                    self.dbcursor.close()
+                    self.dbcursor = self.dbconn.cursor()
+                return [x for x in dbresult]
+                return dbresult
+            else:
+                self.dbcursor.execute(cmd, values)
+        except sqlite3.Error, e:
+            print "execute error", e
+            raise LiveStatusLogStoreError(e)
+
+    def execute_attach(self, cmd):
+        """
+        Python 2.4 and old sqlite implementations show strange behavior.
+        Attaching fails with the error:
+        cannot ATTACH database within transaction
+        That's why the ATTACH statement must be executed in it's own context.
+        """
+        if self.old_implementation:
+            self.commit()
+            orig_autocommit = self.dbconn.autocommit
+            self.dbconn.autocommit = True
+            cursor = self.dbconn.cursor()
+            cursor.execute(cmd)
+            cursor.close()
+            self.dbconn.autocommit = orig_autocommit
+            self.dbconn.commit()
+        else:
+            self.dbcursor.execute(cmd)
+
+
+    def commit(self):
+        self.dbconn.commit()
+
+
+    def manage_brok(self, brok):
+        """ Look for a manager function for a brok, and call it """
+        manage = getattr(self, 'manage_' + brok.type + '_brok', None)
+        if manage:
+            return manage(brok)
+
+    def manage_log_brok(self, b):
+        data = b.data
+        line = data['log']
+        try:
+            logline = Logline(line=line)
+            values = logline.as_tuple()
+        except Exception, exp:
+            print "Unexpected error:", exp
+        try:
+            if logline.logclass != LOGCLASS_INVALID:
+                self.execute('INSERT INTO LOGS VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', values)
+        except LiveStatusLogStoreError, exp:
+            print "An error occurred:", exp.args[0]
+            print "DATABASE ERROR!!!!!!!!!!!!!!!!!"
+        #FIXME need access to this#self.livestatus.count_event('log_message')
+
+    def add_filter(self, operator, attribute, reference):
+        print "isql add", operator, attribute, reference
+	if attribute == 'time':
+	    #self.sql_time_filter_stack.put_stack(self.make_sql_filter(operator, attribute, reference))
+            pass
+	self.sql_filter_stack.put_stack(self.make_sql_filter(operator, attribute, reference))
+
+    def add_filter_and(self, andnum):
+	self.sql_filter_stack.and_elements(andnum)
+
+    def add_filter_or(self, ornum):
+	self.sql_filter_stack.or_elements(ornum)
+
+
+    def get_live_data_log(self):
+        """Like get_live_data, but for log objects"""
+        # finalize the filter stacks
+	#self.sql_time_filter_stack.and_elements(self.sql_time_filter_stack.qsize())
+	self.sql_filter_stack.and_elements(self.sql_filter_stack.qsize())
+        self.use_aggressive_sql = True
+        if self.use_aggressive_sql:
+            # Be aggressive, get preselected data from sqlite and do less
+            # filtering in python. But: only a subset of Filter:-attributes
+            # can be mapped to columns in the logs-table, for the others
+            # we must use "always-true"-clauses. This can result in
+            # funny and potentially ineffective sql-statements
+            sql_filter_func = self.sql_filter_stack.get_stack()
+        else:
+            # Be conservative, get everything from the database between
+            # two dates and apply the Filter:-clauses in python
+            sql_filter_func = self.sql_time_filter_stack.get_stack()
+        result = []
+        # We can apply the filterstack here as well. we have columns and filtercolumns.
+        # the only additional step is to enrich log lines with host/service-attributes
+        # A timerange can be useful for a faster preselection of lines
+        x = sql_filter_func
+        print "x is", x
+        filter_clause, filter_values = sql_filter_func()
+        full_filter_clause = filter_clause
+        matchcount = 0
+        for m in re.finditer(r"\?", full_filter_clause):
+            full_filter_clause = re.sub('\\?', str(filter_values[matchcount]), full_filter_clause, 1)
+            matchcount += 1
+        fromtime = 0
+        totime = int(time.time()) + 1
+        gtpat = re.search(r'^(\(*time|(.*\s+time))\s+>\s+(\d+)', full_filter_clause)
+        gepat = re.search(r'^(\(*time|(.*\s+time))\s+>=\s+(\d+)', full_filter_clause)
+        ltpat = re.search(r'^(\(*time|(.*\s+time))\s+<\s+(\d+)', full_filter_clause)
+        lepat = re.search(r'^(\(*time|(.*\s+time))\s+<=\s+(\d+)', full_filter_clause)
+        if gtpat != None:
+            fromtime = int(gtpat.group(3)) + 1
+        if gepat != None:
+            fromtime = int(gepat.group(3))
+        if ltpat != None:
+            totime = int(ltpat.group(3)) - 1
+        if lepat != None:
+            totime = int(lepat.group(3))
+        # now find the list of datafiles
+        dbresult = []
+        for dateobj, handle, archive, fromtime, totime in self.log_db_relevant_files(fromtime, totime):
+            selectresult = self.select_live_data_log(filter_clause, filter_values, handle, archive, fromtime, totime)
+            dbresult.extend(selectresult)
+        return dbresult
+
+
+    def select_live_data_log(self, filter_clause, filter_values, handle, archive, fromtime, totime):
+        dbresult = []
+        try:
+            if handle == "main":
+                dbresult = self.execute('SELECT * FROM logs WHERE %s' % filter_clause, filter_values, row_factory)
+            else:
+                self.commit()
+                self.execute_attach("ATTACH DATABASE '%s' AS %s" % (archive, handle))
+                dbresult = self.execute('SELECT * FROM %s.logs WHERE %s' % (handle, filter_clause), filter_values, row_factory)
+                self.execute("DETACH DATABASE %s" % handle)
+        except LiveStatusLogStoreError, e:
+            print "An error occurred:", e.args[0]
+        return dbresult
+
+    def make_sql_filter(self, operator, attribute, reference):
+        # The filters are text fragments which are put together to form a sql where-condition finally.
+        # Add parameter Class (Host, Service), lookup datatype (default string), convert reference
+        # which attributes are suitable for a sql statement
+        good_attributes = ['time', 'attempt', 'class', 'command_name', 'comment', 'contact_name', 'host_name', 'plugin_output', 'service_description', 'state', 'state_type', 'type']
+        good_operators = ['=', '!=']
+
+        def eq_filter():
+            if reference == '':
+                return ['%s IS NULL' % attribute, ()]
+            else:
+                return ['%s = ?' % attribute, (reference, )]
+        def ne_filter():
+            if reference == '':
+                return ['%s IS NOT NULL' % attribute, ()]
+            else:
+                return ['%s != ?' % attribute, (reference, )]
+        def gt_filter():
+            return ['%s > ?' % attribute, (reference, )]
+        def ge_filter():
+            return ['%s >= ?' % attribute, (reference, )]
+        def lt_filter():
+            return ['%s < ?' % attribute, (reference, )]
+        def le_filter():
+            return ['%s <= ?' % attribute, (reference, )]
+        def match_filter():
+            return ['%s LIKE ?' % attribute, ('%'+reference+'%', )]
+        def no_filter():
+            return ['1 = 1', ()]
+        if attribute not in good_attributes:
+            return no_filter
+        if operator == '=':
+            return eq_filter
+        if operator == '>':
+            return gt_filter
+        if operator == '>=':
+            return ge_filter
+        if operator == '<':
+            return lt_filter
+        if operator == '<=':
+            return le_filter
+        if operator == '!=':
+            return ne_filter
+        if operator == '~':
+            return match_filter
+
+
+class LiveStatusSqlStack(LiveStatusStack):
+
+    def __init__(self, *args, **kw):
+        self.type = 'sql'
+        self.__class__.__bases__[0].__init__(self, *args, **kw)
+
+    def not_elements(self):
+        top_filter = self.get_stack()
+        negate_clause = '(NOT ' + top_filter()[0] + ')'
+        negate_values = top_filter()[1]
+        negate_filter = lambda: [negate_clause, negate_values]
+        print "not_element", negate_clause
+        self.put_stack(negate_filter)
+
+    def and_elements(self, num):
+        """Take num filters from the stack, and them and put the result back"""
+        print "this is sql and_elements", num, self.qsize()
+        if num > 1:
+            filters = []
+            for _ in range(num):
+                filters.append(self.get_stack())
+            print "now i and", [x() for x in filters]
+            # Take from the stack:
+            # Make a combined anded function
+            # Put it on the stack
+            and_clause = '(' + (' AND ').join([x()[0] for x in filters]) + ')'
+            and_values = reduce(lambda x, y: x+y, [ x()[1] for x in filters ])
+            and_filter = lambda : [and_clause, and_values]
+            #  print "and_elements", and_clause, and_values
+            self.put_stack(and_filter)
+
+    def or_elements(self, num):
+        """Take num filters from the stack, or them and put the result back"""
+        if num > 1:
+            filters = []
+            for _ in range(num):
+                filters.append(self.get_stack())
+            or_clause = '(' + (' OR ').join([ x()[0] for x in filters ]) + ')'
+            or_values = reduce(lambda x, y: x+y, [ x()[1] for x in filters ])
+            or_filter = lambda : [or_clause, or_values]
+            #  print "or_elements", or_clause
+            self.put_stack(or_filter)
+
+    def get_stack(self):
+        """Return the top element from the stack or a filter which is always true"""
+        if self.qsize() == 0:
+            return lambda : ["1 = ?", [1]]
+        else:
+            return self.get()

-- 
UNNAMED PROJECT



More information about the Pkg-nagios-changes mailing list