[Pkg-nagios-changes] [SCM] UNNAMED PROJECT branch, debian/master, updated. 810edbdd3feedbfe37f4a65bee50b57b2f60fa2a

Gerhard Lausser gerhard.lausser at consol.de
Tue Feb 28 22:18:36 UTC 2012


The following commit has been merged in the debian/master branch:
commit 3745e9afb92571c38d8bd3e1736591c38cde431a
Author: Gerhard Lausser <gerhard.lausser at consol.de>
Date:   Sun Feb 5 02:53:01 2012 +0100

    First steps of the implementation of a cache for the livestatus module. (Disabled by default and i don't tell you how to enable it yet)

diff --git a/shinken/modules/livestatus_broker/livestatus.py b/shinken/modules/livestatus_broker/livestatus.py
index b62febd..adce32e 100644
--- a/shinken/modules/livestatus_broker/livestatus.py
+++ b/shinken/modules/livestatus_broker/livestatus.py
@@ -31,8 +31,9 @@ class LiveStatus(object):
     """A class that represents the status of all objects in the broker
 
     """
-    def __init__(self, datamgr, db, pnp_path, return_queue):
+    def __init__(self, datamgr, query_cache, db, pnp_path, return_queue):
         self.datamgr = datamgr
+        self.query_cache = query_cache
         self.db = db
         self.pnp_path = pnp_path
         self.return_queue = return_queue
@@ -45,7 +46,7 @@ class LiveStatus(object):
         handles the execution of the request and formatting of the result.
 
         """
-        request = LiveStatusRequest(data, self.datamgr, self.db, self.pnp_path, self.return_queue, self.counters)
+        request = LiveStatusRequest(data, self.datamgr, self.query_cache, self.db, self.pnp_path, self.return_queue, self.counters)
         request.parse_input(data)
         if sorted([q.my_type for q in request.queries]) == ['command', 'query', 'wait']:
             # The Multisite way
diff --git a/shinken/modules/livestatus_broker/livestatus_broker.py b/shinken/modules/livestatus_broker/livestatus_broker.py
index aefc3f9..8c0a5ec 100644
--- a/shinken/modules/livestatus_broker/livestatus_broker.py
+++ b/shinken/modules/livestatus_broker/livestatus_broker.py
@@ -79,6 +79,7 @@ class LiveStatus_broker(BaseModule, Daemon):
         self.pnp_path = getattr(modconf, 'pnp_path', '')
         self.debug = getattr(modconf, 'debug', None)
         self.debug_queries = (getattr(modconf, 'debug_queries', '0') == '1')
+        self.use_query_cache = (getattr(modconf, 'query_cache', '0') == '1')
 
         #  This is an "artificial" module which is used when an old-style
         #  shinken-specific.cfg without a separate logstore-module is found.
@@ -151,6 +152,10 @@ class LiveStatus_broker(BaseModule, Daemon):
         self.rg = LiveStatusRegenerator()
         self.datamgr = datamgr
         datamgr.load(self.rg)
+        self.query_cache = LiveStatusQueryCache()
+        if not use_query_cache:
+            self.query_cache.disable()
+        self.rg.register_cache(self.query_cache)
 
         try:
             #import cProfile
@@ -336,7 +341,7 @@ class LiveStatus_broker(BaseModule, Daemon):
     def manage_lql_thread(self):
         print "Livestatus query thread started"
         # This is the main object of this broker where the action takes place
-        self.livestatus = LiveStatus(self.datamgr, self.db, self.pnp_path, self.from_q)
+        self.livestatus = LiveStatus(self.datamgr, self.query_cache, self.db, self.pnp_path, self.from_q)
 
         backlog = 5
         size = 8192
diff --git a/shinken/modules/livestatus_broker/livestatus_query.py b/shinken/modules/livestatus_broker/livestatus_query.py
index 46011d4..6ce109c 100644
--- a/shinken/modules/livestatus_broker/livestatus_query.py
+++ b/shinken/modules/livestatus_broker/livestatus_query.py
@@ -36,9 +36,10 @@ class LiveStatusQuery(object):
 
     my_type = 'query'
 
-    def __init__(self, datamgr, db, pnp_path, return_queue, counters):
+    def __init__(self, datamgr, query_cache, db, pnp_path, return_queue, counters):
         # Runtime data form the global LiveStatus object
         self.datamgr = datamgr
+        self.query_cache = query_cache
         self.db = db
         self.pnp_path = pnp_path
         self.return_queue = return_queue
@@ -46,6 +47,7 @@ class LiveStatusQuery(object):
 
         # Private attributes for this specific request
         self.response = LiveStatusResponse()
+        self.raw_data = ''
         self.table = None
         self.columns = []
         self.filtercolumns = []
@@ -85,7 +87,6 @@ class LiveStatusQuery(object):
 
     def split_option(self, line, splits=1):
         """Like split_commands, but converts numbers to int data type"""
-        #x = [int(i) if i.isdigit() else i for i in [token.strip() for token in re.split(r"[\s]+", line, splits)]]
         x = map (lambda i: (i.isdigit() and int(i)) or i, [token.strip() for token in re.split(r"[\s]+", line, splits)])
         return x
 
@@ -111,6 +112,7 @@ class LiveStatusQuery(object):
         sets the attributes of the request object
         
         """
+        self.raw_data = data
         for line in data.splitlines():
             line = line.strip()
             # Tools like NagVis send KEYWORK:option, and we prefer to have
@@ -241,6 +243,12 @@ class LiveStatusQuery(object):
         if not self.table:
             return []
 
+        # Ask the cache if this request was already answered under the same
+        # circumstances.
+        cacheable, cached_response = self.query_cache.get_cached_query(self.raw_data)
+        if cached_response:
+            return cached_response
+
         # Make columns unique
         self.filtercolumns = list(set(self.filtercolumns))
         self.prefiltercolumns = list(set(self.prefiltercolumns))
@@ -311,7 +319,12 @@ class LiveStatusQuery(object):
             traceback.print_exc(32) 
             print "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
             result = []
-        
+
+        if cacheable:
+            # We cannot cache generators, so we must first read them into a list
+            result = [r for r in result]
+            self.query_cache.cache_query(self.raw_data, result)
+            
         return result
 
     
diff --git a/shinken/modules/livestatus_broker/livestatus_query_cache.py b/shinken/modules/livestatus_broker/livestatus_query_cache.py
new file mode 100644
index 0000000..6286267
--- /dev/null
+++ b/shinken/modules/livestatus_broker/livestatus_query_cache.py
@@ -0,0 +1,143 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2009-2012:
+#     Gabes Jean, naparuba at gmail.com
+#     Gerhard Lausser, Gerhard.Lausser at consol.de
+#     Gregory Starck, g.starck at gmail.com
+#     Hartmut Goebel, h.goebel at goebel-consult.de
+#
+# This file is part of Shinken.
+#
+# Shinken is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Shinken is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Shinken.  If not, see <http://www.gnu.org/licenses/>.
+
+
+import time
+import re
+
+from livestatus_query import LiveStatusQuery
+from livestatus_wait_query import LiveStatusWaitQuery
+from livestatus_command_query import LiveStatusCommandQuery
+
+CACHE_GLOBAL_STATS = 0
+CACHE_GLOBAL_STATS_WITH_STATETYPE = 1
+CACHE_HOST_STATS = 2
+CACHE_SERVICE_STATS = 3
+
+class LiveStatusQueryCache:
+
+    """A class describing a livestatus query cache."""
+
+    def __init__(self):
+        self.categories = [{}, {}, {}]
+        self.enabled = True
+
+    def disable(self):
+        self.enabled = False
+
+    def invalidate_category(self, category):
+        """
+        Throw away all cached results of a certain class.
+        For example, if there is a state change, we must recalculate
+        the data for the tactical overview.
+        """
+        try:
+            print "i wipe sub-cache", category
+            self.categories[category] = {}
+        except Exception:
+            pass
+
+    def wipeout(self):
+        if not self.enabled:
+            return
+        for cat in range(len(self.categories)):
+            print "WIPEOUT CAT", cat
+            self.categories[cat] = {}
+
+    def get_cached_query(self, data):
+        if not self.enabled:
+            return (False, [])
+        print "I SEARCH THE CACHE FOR", data
+        cacheable, category, key = self.strip_query(data)
+        if self.categories[category].get(key, []):
+            print "CACHE HIT"
+        return (cacheable, self.categories[category].get(key, []))
+
+    def strip_query(self, data):
+        cacheable = True
+        category = CACHE_GLOBAL_STATS
+        for line in [l.strip() for l in data.splitlines()]:
+            if ':' in line and not ' ' in line:
+                line = line.replace(':', ': ')
+            keyword = line.split(' ')[0].rstrip(':')
+            if len(line) == 0:
+                pass
+            if keyword in ('Localtime'):
+                # i dont't like this Localtime, but we usually can ignore it
+                pass
+            elif keyword == 'Columns':
+                _, columns = re.split(r"[\s]+", line, 1)
+                if [c for c in re.compile(r'\s+').split(columns) if c.endswith('state_type')]:
+                    category = CACHE_GLOBAL_STATS_WITH_STATETYPE
+            elif keyword == 'Stats':
+                pass
+            elif keyword == 'Filter':
+                try:
+                    _, attribute, operator, reference = re.split(r"[\s]+", line, 3)
+                except:
+                    _, attribute, operator = re.split(r"[\s]+", line, 2)
+                    reference = ''
+                if attribute == 'time':
+                    # That's a showstopper. We can't cache time-critical
+                    # informations, because within one second a lot of things
+                    # can change.
+                    cacheable = False
+        if cacheable:
+            return (cacheable, category, hash(data))
+        else:
+            return (False, None, None)
+
+            
+
+    def find_query(self, data):
+        pass
+
+    def query_hash(self, data):
+        pass
+
+    def query_hash_with_time(self, data):
+        pass
+
+    def cache_query(self, data, result):
+        """Puts the result of a livestatus query into the cache."""
+
+        if not self.enabled:
+            return
+        cacheable, category, key = self.strip_query(data)
+        print "I PUT IN THE CACHE FOR", key
+        self.categories[category][key] = result
+
+    def judge_situation(self, brok, obj):
+        if not self.enabled:
+            return
+        try:
+            if brok.data['state_id'] != obj.state_id:
+                print "DETECTED STATECHANGE", obj
+                self.invalidate_category(CACHE_GLOBAL_STATS)
+            if brok.data['state_type_id'] != obj.state_type_id:
+                print "DETECTED STATETYPECHANGE", obj
+                self.invalidate_category(CACHE_GLOBAL_STATS_WITH_STATETYPE)
+            print obj.state_id, obj.state_type_id, brok.data['state_id'], brok.data['state_type_id']
+        except Exception:
+            pass
+
diff --git a/shinken/modules/livestatus_broker/livestatus_regenerator.py b/shinken/modules/livestatus_broker/livestatus_regenerator.py
index a47386b..a985ff3 100644
--- a/shinken/modules/livestatus_broker/livestatus_regenerator.py
+++ b/shinken/modules/livestatus_broker/livestatus_regenerator.py
@@ -64,6 +64,9 @@ class LiveStatusRegenerator(Regenerator):
         setattr(self.hostgroups, '__itersorted__', types.MethodType(itersorted, self.hostgroups))
         setattr(self.contactgroups, '__itersorted__', types.MethodType(itersorted, self.contactgroups))
 
+        # Everything is new now. We should clean the cache
+        self.cache.wipeout()
+
 
     def manage_initial_contact_status_brok(self, b):
         """overwrite it, because the original method deletes some values"""
@@ -115,4 +118,12 @@ class LiveStatusRegenerator(Regenerator):
         self.contacts.create_reversed_list()
         self.notificationways.create_reversed_list()
 
+    def register_cache(self, cache):
+        self.cache = cache
+
+    def before_after_hook(self, brok, obj):
+        self.cache.judge_situation(brok, obj)
+
+
+
 
diff --git a/shinken/modules/livestatus_broker/livestatus_request.py b/shinken/modules/livestatus_broker/livestatus_request.py
index 142acae..fe3571b 100644
--- a/shinken/modules/livestatus_broker/livestatus_request.py
+++ b/shinken/modules/livestatus_broker/livestatus_request.py
@@ -33,10 +33,11 @@ class LiveStatusRequest:
 
     """A class describing a livestatus request."""
 
-    def __init__(self, data, datamgr, db, pnp_path, return_queue, counters):
+    def __init__(self, data, datamgr, query_cache, db, pnp_path, return_queue, counters):
         self.data = data
         # Runtime data form the global LiveStatus object
         self.datamgr = datamgr
+        self.query_cache = query_cache
         self.db = db
         self.pnp_path = pnp_path
         self.return_queue = return_queue
@@ -84,6 +85,6 @@ class LiveStatusRequest:
             query.parse_input('\n'.join(wait_cmds))
             self.queries.append(query)
         if len(query_cmds) > 0:
-            query = LiveStatusQuery(self.datamgr, self.db, self.pnp_path, self.return_queue, self.counters)
+            query = LiveStatusQuery(self.datamgr, self.query_cache, self.db, self.pnp_path, self.return_queue, self.counters)
             query.parse_input('\n'.join(query_cmds))
             self.queries.append(query)
diff --git a/test/shinken_test.py b/test/shinken_test.py
index 41eff95..28ad18e 100755
--- a/test/shinken_test.py
+++ b/test/shinken_test.py
@@ -52,6 +52,7 @@ from shinken.modules import livestatus_broker
 from shinken.modules.livestatus_broker import LiveStatus_broker
 from shinken.modules.livestatus_broker.livestatus import LiveStatus
 from shinken.modules.livestatus_broker.livestatus_regenerator import LiveStatusRegenerator
+from shinken.modules.livestatus_broker.livestatus_query_cache import LiveStatusQueryCache
 from shinken.misc.datamanager import datamgr
 
 livestatus_modconf = Module()
@@ -403,11 +404,14 @@ class ShinkenTest(unittest.TestCase):
         self.livestatus_broker.rg = LiveStatusRegenerator()
         self.livestatus_broker.datamgr = datamgr
         datamgr.load(self.livestatus_broker.rg)
+        self.livestatus_broker.query_cache = LiveStatusQueryCache()
+        self.livestatus_broker.query_cache.disable()
+        self.livestatus_broker.rg.register_cache(self.livestatus_broker.query_cache)
         #--- livestatus_broker.main
 
         self.livestatus_broker.init()
         self.livestatus_broker.db = self.livestatus_broker.modules_manager.instances[0]
-        self.livestatus_broker.livestatus = LiveStatus(self.livestatus_broker.datamgr, self.livestatus_broker.db, self.livestatus_broker.pnp_path, self.livestatus_broker.from_q)
+        self.livestatus_broker.livestatus = LiveStatus(self.livestatus_broker.datamgr, self.livestatus_broker.query_cache, self.livestatus_broker.db, self.livestatus_broker.pnp_path, self.livestatus_broker.from_q)
 
         #--- livestatus_broker.do_main
         self.livestatus_broker.db.open()
diff --git a/test/test_livestatus_cache.py b/test/test_livestatus_cache.py
new file mode 100644
index 0000000..f30846d
--- /dev/null
+++ b/test/test_livestatus_cache.py
@@ -0,0 +1,158 @@
+from shinken_test import *
+
+class TestConfig(ShinkenTest):
+    def update_broker(self, dodeepcopy=False):
+        #The brok should be manage in the good order
+        ids = self.sched.broks.keys()
+        ids.sort()
+        for brok_id in ids:
+            brok = self.sched.broks[brok_id]
+            #print "Managing a brok type", brok.type, "of id", brok_id
+            #if brok.type == 'update_service_status':
+            #    print "Problem?", brok.data['is_problem']
+            if dodeepcopy:
+                brok = copy.deepcopy(brok)
+            self.livestatus_broker.manage_brok(brok)
+        self.sched.broks = {}
+
+    pass
+
+class TestConfigBig(TestConfig):
+    def setUp(self):
+        start_setUp = time.time()
+        self.setup_with_file('etc/nagios_5r_100h_2000s.cfg')
+        self.testid = str(os.getpid() + random.randint(1, 1000))
+        self.init_livestatus()
+        print "Cleaning old broks?"
+        self.sched.fill_initial_broks()
+        self.update_broker()
+        print "************* Overall Setup:", time.time() - start_setUp
+        # add use_aggressive_host_checking so we can mix exit codes 1 and 2
+        # but still get DOWN state
+        host = self.sched.hosts.find_by_name("test_host_000")
+        host.__class__.use_aggressive_host_checking = 1
+
+    def tearDown(self):
+        self.livestatus_broker.db.commit()
+        self.livestatus_broker.db.close()
+        if os.path.exists(self.livelogs):
+            os.remove(self.livelogs)
+        if os.path.exists(self.livelogs+"-journal"):
+            os.remove(self.livelogs+"-journal")
+        if os.path.exists(self.livestatus_broker.pnp_path):
+            shutil.rmtree(self.livestatus_broker.pnp_path)
+        if os.path.exists('var/nagios.log'):
+            os.remove('var/nagios.log')
+        if os.path.exists('var/retention.dat'):
+            os.remove('var/retention.dat')
+        if os.path.exists('var/status.dat'):
+            os.remove('var/status.dat')
+        self.livestatus_broker = None
+
+
+
+    def test_stats(self):
+        self.print_header()
+        now = time.time()
+        objlist = []
+        for host in self.sched.hosts:
+            objlist.append([host, 0, 'UP'])
+        for service in self.sched.services:
+            objlist.append([service, 0, 'OK'])
+        self.scheduler_loop(1, objlist)
+        self.update_broker()
+        svc1 = self.sched.services.find_srv_by_name_and_hostname("test_host_005", "test_ok_00")
+        print svc1
+        svc2 = self.sched.services.find_srv_by_name_and_hostname("test_host_005", "test_ok_15")
+        print svc2
+        svc3 = self.sched.services.find_srv_by_name_and_hostname("test_host_005", "test_ok_16")
+        print svc3
+        svc4 = self.sched.services.find_srv_by_name_and_hostname("test_host_007", "test_ok_05")
+        print svc4
+        svc5 = self.sched.services.find_srv_by_name_and_hostname("test_host_007", "test_ok_11")
+        svc6 = self.sched.services.find_srv_by_name_and_hostname("test_host_025", "test_ok_01")
+        svc7 = self.sched.services.find_srv_by_name_and_hostname("test_host_025", "test_ok_03")
+
+        request = """GET services
+Filter: description = test_ok_11
+Filter: host_name = test_host_007
+Columns: host_name description state state_type"""
+        beforeresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+        print "before", beforeresponse
+
+        self.scheduler_loop(1, [[svc1, 1, 'W'], [svc2, 1, 'W'], [svc3, 1, 'W'], [svc4, 2, 'C'], [svc5, 3, 'U'], [svc6, 2, 'C'], [svc7, 2, 'C']])
+        self.update_broker()
+        # 1993O, 3xW, 3xC, 1xU
+        request = """GET services
+Filter: contacts >= test_contact
+Stats: state != 9999
+Stats: state = 0
+Stats: state = 1
+Stats: state = 2
+Stats: state = 3"""
+        response, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+        print 'query_6_______________\n%s\n%s\n' % (request, response)
+        self.assert_(response == '2000;1993;3;3;1\n')
+
+        # Now we play with the cache
+        request = """GET services
+Filter: description = test_ok_11
+Filter: host_name = test_host_007
+Columns: host_name description state state_type"""
+        afterresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+        print "after", afterresponse
+        self.assert_(beforeresponse != afterresponse)
+        request = """GET services
+Filter: description = test_ok_11
+Filter: host_name = test_host_007
+Columns: host_name description state state_type"""
+        repeatedresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+        print "repeated", repeatedresponse
+        self.assert_(afterresponse == repeatedresponse)
+        self.scheduler_loop(2, [[svc5, 2, 'C']])
+        self.update_broker()
+        request = """GET services
+Filter: description = test_ok_11
+Filter: host_name = test_host_007
+Columns: host_name description state state_type"""
+        notcachedresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+        print "notcached", notcachedresponse
+        self.scheduler_loop(2, [[svc5, 0, 'O']])
+        self.update_broker()
+        # 1994O, 3xW, 3xC, 0xU
+        request = """GET services
+Filter: description = test_ok_11
+Filter: host_name = test_host_007
+Columns: host_name description state state_type"""
+        againnotcachedresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+        print "againnotcached", againnotcachedresponse
+        self.assert_(notcachedresponse != againnotcachedresponse)
+        request = """GET services
+Filter: description = test_ok_11
+Filter: host_name = test_host_007
+Columns: host_name description state state_type"""
+        finallycachedresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+        print "finallycached", finallycachedresponse
+        self.assert_(againnotcachedresponse == finallycachedresponse)
+
+        request = """GET services
+Filter: contacts >= test_contact
+Stats: state != 9999
+Stats: state = 0
+Stats: state = 1
+Stats: state = 2
+Stats: state = 3"""
+        response, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+        print 'query_6_______________\n%s\n%s\n' % (request, response)
+        self.assert_(response == '2000;1994;3;3;0\n')
+
+if __name__ == '__main__':
+    #import cProfile
+    command = """unittest.main()"""
+    unittest.main()
+    #cProfile.runctx( command, globals(), locals(), filename="/tmp/livestatus.profile" )
+
+    #allsuite = unittest.TestLoader.loadTestsFromModule(TestConfig)
+    #unittest.TextTestRunner(verbosity=2).run(allsuite)
+
+
diff --git a/test/test_livestatus_db.py b/test/test_livestatus_db.py
index b79c3db..8e5f57e 100755
--- a/test/test_livestatus_db.py
+++ b/test/test_livestatus_db.py
@@ -727,6 +727,9 @@ class TestConfigNoLogstore(TestConfig):
         self.livestatus_broker.rg = LiveStatusRegenerator()
         self.livestatus_broker.datamgr = datamgr
         datamgr.load(self.livestatus_broker.rg)
+        self.livestatus_broker.query_cache = LiveStatusQueryCache()
+        self.livestatus_broker.query_cache.disable()
+        self.livestatus_broker.rg.register_cache(self.livestatus_broker.query_cache)
         #--- livestatus_broker.main
 
         #--- livestatus_broker.do_main
@@ -735,7 +738,7 @@ class TestConfigNoLogstore(TestConfig):
         #--- livestatus_broker.do_main
 
         #--- livestatus_broker.manage_lql_thread
-        self.livestatus_broker.livestatus = LiveStatus(self.livestatus_broker.datamgr, self.livestatus_broker.db, self.livestatus_broker.pnp_path, self.livestatus_broker.from_q)
+        self.livestatus_broker.livestatus = LiveStatus(self.livestatus_broker.datamgr, self.livestatus_broker.query_cache, self.livestatus_broker.db, self.livestatus_broker.pnp_path, self.livestatus_broker.from_q)
         #--- livestatus_broker.manage_lql_thread
 
 

-- 
UNNAMED PROJECT



More information about the Pkg-nagios-changes mailing list