[Pkg-nagios-changes] [SCM] UNNAMED PROJECT branch, debian/master, updated. 810edbdd3feedbfe37f4a65bee50b57b2f60fa2a

Gerhard Lausser gerhard.lausser at consol.de
Tue Feb 28 22:20:17 UTC 2012


The following commit has been merged in the debian/master branch:
commit 0bbd34420a20050061ff3739c060a65ab71fa482
Author: Gerhard Lausser <gerhard.lausser at consol.de>
Date:   Sun Feb 12 01:09:43 2012 +0100

    Fix some bugs in the livestatus cache and make a good test case for the host/service-history check

diff --git a/shinken/modules/livestatus_broker/livestatus_query.py b/shinken/modules/livestatus_broker/livestatus_query.py
index 6ce109c..c38c919 100644
--- a/shinken/modules/livestatus_broker/livestatus_query.py
+++ b/shinken/modules/livestatus_broker/livestatus_query.py
@@ -245,9 +245,11 @@ class LiveStatusQuery(object):
 
         # Ask the cache if this request was already answered under the same
         # circumstances.
-        cacheable, cached_response = self.query_cache.get_cached_query(self.raw_data)
-        if cached_response:
-            return cached_response
+        cacheable, cache_hit, cached_response = self.query_cache.get_cached_query(self.raw_data)
+        if cache_hit:
+            self.columns = cached_response['columns']
+            self.response.columnheaders = cached_response['columnheaders']
+            return cached_response['result']
 
         # Make columns unique
         self.filtercolumns = list(set(self.filtercolumns))
@@ -320,10 +322,16 @@ class LiveStatusQuery(object):
             print "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
             result = []
 
-        if cacheable:
+        if cacheable and not cache_hit:
             # We cannot cache generators, so we must first read them into a list
             result = [r for r in result]
-            self.query_cache.cache_query(self.raw_data, result)
+            # Especially for stats requests also the columns and headers
+            # are modified, so we need to save them too.
+            self.query_cache.cache_query(self.raw_data, {
+                'result': result,
+                'columns': self.columns,
+                'columnheaders': self.response.columnheaders,
+            })
             
         return result
 
diff --git a/shinken/modules/livestatus_broker/livestatus_query_cache.py b/shinken/modules/livestatus_broker/livestatus_query_cache.py
index 1ab0b57..86da341 100644
--- a/shinken/modules/livestatus_broker/livestatus_query_cache.py
+++ b/shinken/modules/livestatus_broker/livestatus_query_cache.py
@@ -78,6 +78,9 @@ class Counter(dict):
     def __missing__(self, key):
         return 0
 
+class LFUCacheMiss(Exception):
+    pass
+
 class LFU(object):
     """
     This class implements a dictionary which has a limited number of elements.
@@ -102,10 +105,12 @@ class LFU(object):
         self.use_count[key] += 1
         try:
             result = self.storage[key]
+            print "CACHE HIT"
             self.hits += 1
         except KeyError:
             result = []
             self.misses += 1
+            raise LFUCacheMiss
         return result
 
     def put(self, key, data):
@@ -115,6 +120,17 @@ class LFU(object):
                 del self.storage[key], self.use_count[key]
         pass
 
+    def __str__(self):
+        text = 'LFU-------------------\n'
+        try:
+            text += 'hit rate %.2f%%\n' % (100 * self.hits / (self.hits + self.misses))
+        except ZeroDivisionError:
+            text += 'hit rate 0%\n'
+        for k in self.storage:
+            text += 'key %10s (%d used)\n' % (str(k), self.use_count[k])
+        return text
+        
+
 class QueryData(object):
     """
     This class implements a more "machine-readable" form of a livestatus query.
@@ -134,7 +150,7 @@ class QueryData(object):
         self.client_localtime = int(time.time())
         self.stats_columns = [f[1] for f in self.structured_data if f[0] == 'Stats']
         self.filter_columns = [f[1] for f in self.structured_data if f[0] == 'Filter']
-        self.columns = [f[1] for f in self.structured_data if f[0] == 'Columns'][0]
+        self.columns = [x for f in self.structured_data if f[0] == 'Columns' for x in f[1]]
         self.categorize()
 
     def __str__(self):
@@ -292,6 +308,11 @@ class QueryData(object):
             self.category = CACHE_GLOBAL_STATS
         elif self.is_a_closed_chapter():
             self.category = CACHE_IRREVERSIBLE_HISTORY
+        elif self.table == 'services' and not self.is_stats and has_not_more_than(self.columns, ['host_name', 'description', 'state', 'state_type']):
+            self.category = CACHE_SERVICE_STATS
+        else:
+            pass
+            print "i cannot cache this", self
 
 
 
@@ -326,7 +347,7 @@ class LiveStatusQueryCache(object):
         the data for the tactical overview.
         """
         try:
-            print "i wipe sub-cache", category
+            #print "i wipe sub-cache", category
             self.categories[category].clear()
         except Exception:
             pass
@@ -335,17 +356,17 @@ class LiveStatusQueryCache(object):
         if not self.enabled:
             return
         for cat in range(len(self.categories)):
-            print "WIPEOUT CAT", cat
             self.categories[cat].clear()
 
     def get_cached_query(self, data):
         if not self.enabled:
             return (False, [])
         query = QueryData(data)
-        print "I SEARCH THE CACHE FOR", query.category, query.key, data
-        if self.categories[query.category].get(query.key):
-            print "CACHE HIT"
-        return (query.category != CACHE_IMPOSSIBLE, self.categories[query.category].get(query.key))
+        #print "I SEARCH THE CACHE FOR", query.category, query.key, data
+        try:
+            return (query.category != CACHE_IMPOSSIBLE, True, self.categories[query.category].get(query.key))
+        except LFUCacheMiss:
+            return (query.category != CACHE_IMPOSSIBLE, False, [])
 
     def cache_query(self, data, result):
         """Puts the result of a livestatus query into the cache."""
@@ -353,7 +374,7 @@ class LiveStatusQueryCache(object):
         if not self.enabled:
             return
         query = QueryData(data)
-        print "I PUT IN THE CACHE FOR", query.category, query.key
+        #print "I PUT IN THE CACHE FOR", query.category, query.key
         self.categories[query.category].put(query.key, result)
 
     def impact_assessment(self, brok, obj):
@@ -367,9 +388,11 @@ class LiveStatusQueryCache(object):
             if brok.data['state_id'] != obj.state_id:
                 print "DETECTED STATECHANGE", obj
                 self.invalidate_category(CACHE_GLOBAL_STATS)
+                self.invalidate_category(CACHE_SERVICE_STATS)
             if brok.data['state_type_id'] != obj.state_type_id:
                 print "DETECTED STATETYPECHANGE", obj
                 self.invalidate_category(CACHE_GLOBAL_STATS_WITH_STATETYPE)
+                self.invalidate_category(CACHE_SERVICE_STATS)
             print obj.state_id, obj.state_type_id, brok.data['state_id'], brok.data['state_type_id']
         except Exception:
             pass
diff --git a/shinken/modules/logstore_sqlite.py b/shinken/modules/logstore_sqlite.py
index 6c7be8f..ad3a0e6 100644
--- a/shinken/modules/logstore_sqlite.py
+++ b/shinken/modules/logstore_sqlite.py
@@ -397,7 +397,6 @@ class LiveStatusLogStoreSqlite(BaseModule):
         # finalize the filter stacks
 	#self.sql_time_filter_stack.and_elements(self.sql_time_filter_stack.qsize())
 	self.sql_filter_stack.and_elements(self.sql_filter_stack.qsize())
-        self.use_aggressive_sql = True
         if self.use_aggressive_sql:
             # Be aggressive, get preselected data from sqlite and do less
             # filtering in python. But: only a subset of Filter:-attributes
diff --git a/test/test_livestatus_cache.py b/test/test_livestatus_cache.py
index ac63459..7faf698 100644
--- a/test/test_livestatus_cache.py
+++ b/test/test_livestatus_cache.py
@@ -1,4 +1,10 @@
 from shinken_test import *
+import datetime
+
+def set_to_midnight(dt): 
+    midnight = datetime.time(0) 
+    return datetime.datetime.combine(dt.date(), midnight) 
+
 
 class TestConfig(ShinkenTest):
     def update_broker(self, dodeepcopy=False):
@@ -40,6 +46,8 @@ class TestConfigBig(TestConfig):
             os.remove(self.livelogs)
         if os.path.exists(self.livelogs+"-journal"):
             os.remove(self.livelogs+"-journal")
+        for arch in os.listdir('tmp/archives'):
+            os.remove('tmp/archives/' + arch)
         if os.path.exists(self.livestatus_broker.pnp_path):
             shutil.rmtree(self.livestatus_broker.pnp_path)
         if os.path.exists('var/nagios.log'):
@@ -84,54 +92,36 @@ Columns: host_name description state state_type"""
         self.scheduler_loop(1, [[svc1, 1, 'W'], [svc2, 1, 'W'], [svc3, 1, 'W'], [svc4, 2, 'C'], [svc5, 3, 'U'], [svc6, 2, 'C'], [svc7, 2, 'C']])
         self.update_broker()
         # 1993O, 3xW, 3xC, 1xU
-        request = """GET services
+        statsrequest = """GET services
 Filter: contacts >= test_contact
 Stats: state != 9999
 Stats: state = 0
 Stats: state = 1
 Stats: state = 2
 Stats: state = 3"""
-        response, keepalive = self.livestatus_broker.livestatus.handle_request(request)
-        print 'query_6_______________\n%s\n%s\n' % (request, response)
+        response, keepalive = self.livestatus_broker.livestatus.handle_request(statsrequest)
+        print 'query_6_______________\n%s\n%s\n' % (statsrequest, response)
         self.assert_(response == '2000;1993;3;3;1\n')
 
         # Now we play with the cache
-        request = """GET services
-Filter: description = test_ok_11
-Filter: host_name = test_host_007
-Columns: host_name description state state_type"""
         afterresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
         print "after", afterresponse
         self.assert_(beforeresponse != afterresponse)
-        request = """GET services
-Filter: description = test_ok_11
-Filter: host_name = test_host_007
-Columns: host_name description state state_type"""
         repeatedresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
         print "repeated", repeatedresponse
         self.assert_(afterresponse == repeatedresponse)
+
         self.scheduler_loop(2, [[svc5, 2, 'C']])
         self.update_broker()
-        request = """GET services
-Filter: description = test_ok_11
-Filter: host_name = test_host_007
-Columns: host_name description state state_type"""
         notcachedresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
         print "notcached", notcachedresponse
+
         self.scheduler_loop(2, [[svc5, 0, 'O']])
         self.update_broker()
         # 1994O, 3xW, 3xC, 0xU
-        request = """GET services
-Filter: description = test_ok_11
-Filter: host_name = test_host_007
-Columns: host_name description state state_type"""
         againnotcachedresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
         print "againnotcached", againnotcachedresponse
         self.assert_(notcachedresponse != againnotcachedresponse)
-        request = """GET services
-Filter: description = test_ok_11
-Filter: host_name = test_host_007
-Columns: host_name description state state_type"""
         finallycachedresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
         print "finallycached", finallycachedresponse
         self.assert_(againnotcachedresponse == finallycachedresponse)
@@ -143,12 +133,16 @@ Stats: state = 0
 Stats: state = 1
 Stats: state = 2
 Stats: state = 3"""
-        response, keepalive = self.livestatus_broker.livestatus.handle_request(request)
-        print 'query_6_______________\n%s\n%s\n' % (request, response)
+        response, keepalive = self.livestatus_broker.livestatus.handle_request(statsrequest)
+        print 'query_6_______________\n%s\n%s\n' % (statsrequest, response)
+        response, keepalive = self.livestatus_broker.livestatus.handle_request(statsrequest)
+        print 'query_6_______________\n%s\n%s\n' % (statsrequest, response)
         self.assert_(response == '2000;1994;3;3;0\n')
 
     def test_a_long_history(self):
         #return
+        print datetime.datetime.now()
+        print datetime.datetime.today()
         test_host_005 = self.sched.hosts.find_by_name("test_host_005")
         test_host_099 = self.sched.hosts.find_by_name("test_host_099")
         test_ok_00 = self.sched.services.find_srv_by_name_and_hostname("test_host_005", "test_ok_00")
@@ -156,13 +150,42 @@ Stats: state = 3"""
         test_ok_04 = self.sched.services.find_srv_by_name_and_hostname("test_host_005", "test_ok_04")
         test_ok_16 = self.sched.services.find_srv_by_name_and_hostname("test_host_005", "test_ok_16")
         test_ok_99 = self.sched.services.find_srv_by_name_and_hostname("test_host_099", "test_ok_01")
-        time_warp(-1 * 20 * 24 * 3600)
-        starttime = time.time()
-        numlogs = self.livestatus_broker.db.execute("SELECT COUNT(*) FROM logs")
-        if numlogs[0][0] == 0:
-            # run silently
-            old_stdout = sys.stdout
-            sys.stdout = open(os.devnull, "w")
+        days = 4
+
+        etime = time.time()
+        print "now it is", time.ctime(etime)
+        print "now it is", time.gmtime(etime)
+        etime_midnight = (etime - (etime % 86400)) + time.altzone
+        print "midnight was", time.ctime(etime_midnight)
+        print "midnight was", time.gmtime(etime_midnight)
+        query_start = etime_midnight - (days - 1) * 86400
+        query_end = etime_midnight
+        print "query_start", time.ctime(query_start)
+        print "query_end ", time.ctime(query_end)
+
+        # |----------|----------|----------|----------|----------|---x
+        #                                                            etime
+        #                                                        etime_midnight
+        #             ---x------
+        #                etime -  4 days
+        #                       |---
+        #                       query_start
+        #                      
+        #                ............................................
+        #                events in the log database ranging till now
+        #
+        #                       |________________________________|
+        #                       events which will be read from db
+        #
+        loops = int(86400 / 192)
+        time_warp(-1 * days * 86400)
+        print "warp back to", time.ctime(time.time())
+        # run silently
+        old_stdout = sys.stdout
+        sys.stdout = open(os.devnull, "w")
+        should_be = 0
+        for day in xrange(days):
+            sys.stderr.write("day %d now it is %s i run %d loops\n" % (day, time.ctime(time.time()), loops))
             self.scheduler_loop(2, [
                 [test_ok_00, 0, "OK"],
                 [test_ok_01, 0, "OK"],
@@ -171,9 +194,8 @@ Stats: state = 3"""
                 [test_ok_99, 0, "OK"],
             ])
             self.update_broker()
-            should_be = 0
             #for i in xrange(3600 * 24 * 7):
-            for i in xrange(10000):
+            for i in xrange(loops):
                 if i % 10000 == 0:
                     sys.stderr.write(str(i))
                 if i % 399 == 0:
@@ -184,7 +206,9 @@ Stats: state = 3"""
                         [test_ok_16, 1, "WARN"],
                         [test_ok_99, 2, "CRIT"],
                     ])
-                    should_be += 3
+                    if int(time.time()) >= query_start and int(time.time()) <= query_end:
+                        should_be += 3
+                        sys.stderr.write("now it should be %s\n" % should_be)
                 time.sleep(62)
                 if i % 399 == 0:
                     self.scheduler_loop(1, [
@@ -194,34 +218,37 @@ Stats: state = 3"""
                         [test_ok_16, 0, "OK"],
                         [test_ok_99, 0, "OK"],
                     ])
-                    should_be += 1
+                    if int(time.time()) >= query_start and int(time.time()) <= query_end:
+                        should_be += 1
+                        sys.stderr.write("now it should be %s\n" % should_be)
                 time.sleep(2)
-                if i % 199 == 0:
+                if i % 9 == 0:
                     self.scheduler_loop(3, [
                         [test_ok_00, 1, "WARN"],
                         [test_ok_01, 2, "CRIT"],
                     ])
+    
                 time.sleep(62)
-                if i % 199 == 0:
+                if i % 9 == 0:
                     self.scheduler_loop(1, [
                         [test_ok_00, 0, "OK"],
                         [test_ok_01, 0, "OK"],
                     ])
                 time.sleep(2)
-                if i % 299 == 0:
+                if i % 9 == 0:
                     self.scheduler_loop(3, [
                         [test_host_005, 2, "DOWN"],
                     ])
-                if i % 19 == 0:
+                if i % 2 == 0:
                     self.scheduler_loop(3, [
                         [test_host_099, 2, "DOWN"],
                     ])
                 time.sleep(62)
-                if i % 299 == 0:
+                if i % 9 == 0:
                     self.scheduler_loop(3, [
                         [test_host_005, 0, "UP"],
                     ])
-                if i % 19 == 0:
+                if i % 2 == 0:
                     self.scheduler_loop(3, [
                         [test_host_099, 0, "UP"],
                     ])
@@ -230,25 +257,21 @@ Stats: state = 3"""
                 if i % 1000 == 0:
                     self.livestatus_broker.db.commit()
             endtime = time.time()
-            sys.stdout.close()
-            sys.stdout = old_stdout
             self.livestatus_broker.db.commit()
-        else:
-            should_be = numlogs[0][0]
-            xxx = self.livestatus_broker.db.execute("SELECT min(time), max(time) FROM logs")
-            print xxx
-            starttime, endtime = [self.livestatus_broker.db.execute("SELECT min(time), max(time) FROM logs")][0][0]
+            sys.stderr.write("day %d end it is %s\n" % (day, time.ctime(time.time())))
+        sys.stdout.close()
+        sys.stdout = old_stdout
+        self.livestatus_broker.db.commit_and_rotate_log_db()
+        numlogs = self.livestatus_broker.db.execute("SELECT count(*) FROM logs")
+        print "numlogs is", numlogs
 
 
         # now we have a lot of events
         # find type = HOST ALERT for test_host_005
-        q = int((endtime - starttime) / 8)
-        starttime += q
-        endtime -= q
         request = """GET log
 Columns: class time type state host_name service_description plugin_output message options contact_name command_name state_type current_host_groups current_service_groups
-Filter: time >= """ + str(int(starttime)) + """
-Filter: time <= """ + str(int(endtime)) + """
+Filter: time >= """ + str(int(query_start)) + """
+Filter: time <= """ + str(int(query_end)) + """
 Filter: type = SERVICE ALERT
 And: 1
 Filter: type = HOST ALERT
@@ -269,18 +292,54 @@ OutputFormat: json"""
         fake_time_sleep = time.sleep
         time.time = original_time_time
         time.sleep = original_time_sleep
+        print self.livestatus_broker.db.database_file
         print request
+        print "query 1 --------------------------------------------------"
+        tic = time.time()
         response, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+        tac = time.time()
+        elapsed1 = tac - tic
         pyresponse = eval(response)
-        print "number of records", len(pyresponse)
+        print "pyresponse", len(pyresponse)
         print "should be", should_be
-        print "now with cache"
+        self.assert_(len(pyresponse) == should_be)
+        print "query 2 cache---------------------------------------------"
+        tic = time.time()
         response, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+        tac = time.time()
+        elapsed2 = tac - tic
         pyresponse = eval(response)
-        print "number of records", len(pyresponse)
-        print "should be", should_be
-        time.time = fake_time_time
-        time.sleep = fake_time_sleep
+        self.assert_(len(pyresponse) == should_be)
+        print "clear the cache"
+        print "use aggressive sql"
+        print "query 3 --------------------------------------------------"
+        self.livestatus_broker.query_cache.wipeout()
+        self.livestatus_broker.db.use_aggressive_sql = True
+        tic = time.time()
+        response, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+        tac = time.time()
+        elapsed3 = tac - tic
+        pyresponse = eval(response)
+        self.assert_(len(pyresponse) == should_be)
+        print "query 4 cache---------------------------------------------"
+        tic = time.time()
+        response, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+        tac = time.time()
+        elapsed4 = tac - tic
+        pyresponse = eval(response)
+        self.assert_(len(pyresponse) == should_be)
+        print "elapsed1", elapsed1
+        print "elapsed2", elapsed2
+        print "elapsed3", elapsed3
+        print "elapsed4", elapsed4
+        self.assert_(elapsed2 < elapsed1 / 10)
+        self.assert_(elapsed3 < elapsed1)
+        self.assert_(elapsed4 < elapsed3 / 2)
+
+        #time.time = fake_time_time
+        #time.sleep = fake_time_sleep
+
+       
 
 if __name__ == '__main__':
     #import cProfile

-- 
UNNAMED PROJECT



More information about the Pkg-nagios-changes mailing list