[Pkg-nagios-changes] [SCM] UNNAMED PROJECT branch, debian/master, updated. 810edbdd3feedbfe37f4a65bee50b57b2f60fa2a
Gerhard Lausser
gerhard.lausser at consol.de
Tue Feb 28 22:18:36 UTC 2012
The following commit has been merged in the debian/master branch:
commit 3745e9afb92571c38d8bd3e1736591c38cde431a
Author: Gerhard Lausser <gerhard.lausser at consol.de>
Date: Sun Feb 5 02:53:01 2012 +0100
First steps of the implementation of a cache for the livestatus module. (Disabled by default and i don't tell you how to enable it yet)
diff --git a/shinken/modules/livestatus_broker/livestatus.py b/shinken/modules/livestatus_broker/livestatus.py
index b62febd..adce32e 100644
--- a/shinken/modules/livestatus_broker/livestatus.py
+++ b/shinken/modules/livestatus_broker/livestatus.py
@@ -31,8 +31,9 @@ class LiveStatus(object):
"""A class that represents the status of all objects in the broker
"""
- def __init__(self, datamgr, db, pnp_path, return_queue):
+ def __init__(self, datamgr, query_cache, db, pnp_path, return_queue):
self.datamgr = datamgr
+ self.query_cache = query_cache
self.db = db
self.pnp_path = pnp_path
self.return_queue = return_queue
@@ -45,7 +46,7 @@ class LiveStatus(object):
handles the execution of the request and formatting of the result.
"""
- request = LiveStatusRequest(data, self.datamgr, self.db, self.pnp_path, self.return_queue, self.counters)
+ request = LiveStatusRequest(data, self.datamgr, self.query_cache, self.db, self.pnp_path, self.return_queue, self.counters)
request.parse_input(data)
if sorted([q.my_type for q in request.queries]) == ['command', 'query', 'wait']:
# The Multisite way
diff --git a/shinken/modules/livestatus_broker/livestatus_broker.py b/shinken/modules/livestatus_broker/livestatus_broker.py
index aefc3f9..8c0a5ec 100644
--- a/shinken/modules/livestatus_broker/livestatus_broker.py
+++ b/shinken/modules/livestatus_broker/livestatus_broker.py
@@ -79,6 +79,7 @@ class LiveStatus_broker(BaseModule, Daemon):
self.pnp_path = getattr(modconf, 'pnp_path', '')
self.debug = getattr(modconf, 'debug', None)
self.debug_queries = (getattr(modconf, 'debug_queries', '0') == '1')
+ self.use_query_cache = (getattr(modconf, 'query_cache', '0') == '1')
# This is an "artificial" module which is used when an old-style
# shinken-specific.cfg without a separate logstore-module is found.
@@ -151,6 +152,10 @@ class LiveStatus_broker(BaseModule, Daemon):
self.rg = LiveStatusRegenerator()
self.datamgr = datamgr
datamgr.load(self.rg)
+ self.query_cache = LiveStatusQueryCache()
+ if not use_query_cache:
+ self.query_cache.disable()
+ self.rg.register_cache(self.query_cache)
try:
#import cProfile
@@ -336,7 +341,7 @@ class LiveStatus_broker(BaseModule, Daemon):
def manage_lql_thread(self):
print "Livestatus query thread started"
# This is the main object of this broker where the action takes place
- self.livestatus = LiveStatus(self.datamgr, self.db, self.pnp_path, self.from_q)
+ self.livestatus = LiveStatus(self.datamgr, self.query_cache, self.db, self.pnp_path, self.from_q)
backlog = 5
size = 8192
diff --git a/shinken/modules/livestatus_broker/livestatus_query.py b/shinken/modules/livestatus_broker/livestatus_query.py
index 46011d4..6ce109c 100644
--- a/shinken/modules/livestatus_broker/livestatus_query.py
+++ b/shinken/modules/livestatus_broker/livestatus_query.py
@@ -36,9 +36,10 @@ class LiveStatusQuery(object):
my_type = 'query'
- def __init__(self, datamgr, db, pnp_path, return_queue, counters):
+ def __init__(self, datamgr, query_cache, db, pnp_path, return_queue, counters):
# Runtime data form the global LiveStatus object
self.datamgr = datamgr
+ self.query_cache = query_cache
self.db = db
self.pnp_path = pnp_path
self.return_queue = return_queue
@@ -46,6 +47,7 @@ class LiveStatusQuery(object):
# Private attributes for this specific request
self.response = LiveStatusResponse()
+ self.raw_data = ''
self.table = None
self.columns = []
self.filtercolumns = []
@@ -85,7 +87,6 @@ class LiveStatusQuery(object):
def split_option(self, line, splits=1):
"""Like split_commands, but converts numbers to int data type"""
- #x = [int(i) if i.isdigit() else i for i in [token.strip() for token in re.split(r"[\s]+", line, splits)]]
x = map (lambda i: (i.isdigit() and int(i)) or i, [token.strip() for token in re.split(r"[\s]+", line, splits)])
return x
@@ -111,6 +112,7 @@ class LiveStatusQuery(object):
sets the attributes of the request object
"""
+ self.raw_data = data
for line in data.splitlines():
line = line.strip()
# Tools like NagVis send KEYWORK:option, and we prefer to have
@@ -241,6 +243,12 @@ class LiveStatusQuery(object):
if not self.table:
return []
+ # Ask the cache if this request was already answered under the same
+ # circumstances.
+ cacheable, cached_response = self.query_cache.get_cached_query(self.raw_data)
+ if cached_response:
+ return cached_response
+
# Make columns unique
self.filtercolumns = list(set(self.filtercolumns))
self.prefiltercolumns = list(set(self.prefiltercolumns))
@@ -311,7 +319,12 @@ class LiveStatusQuery(object):
traceback.print_exc(32)
print "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
result = []
-
+
+ if cacheable:
+ # We cannot cache generators, so we must first read them into a list
+ result = [r for r in result]
+ self.query_cache.cache_query(self.raw_data, result)
+
return result
diff --git a/shinken/modules/livestatus_broker/livestatus_query_cache.py b/shinken/modules/livestatus_broker/livestatus_query_cache.py
new file mode 100644
index 0000000..6286267
--- /dev/null
+++ b/shinken/modules/livestatus_broker/livestatus_query_cache.py
@@ -0,0 +1,143 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2009-2012:
+# Gabes Jean, naparuba at gmail.com
+# Gerhard Lausser, Gerhard.Lausser at consol.de
+# Gregory Starck, g.starck at gmail.com
+# Hartmut Goebel, h.goebel at goebel-consult.de
+#
+# This file is part of Shinken.
+#
+# Shinken is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Shinken is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Shinken. If not, see <http://www.gnu.org/licenses/>.
+
+
+import time
+import re
+
+from livestatus_query import LiveStatusQuery
+from livestatus_wait_query import LiveStatusWaitQuery
+from livestatus_command_query import LiveStatusCommandQuery
+
+CACHE_GLOBAL_STATS = 0
+CACHE_GLOBAL_STATS_WITH_STATETYPE = 1
+CACHE_HOST_STATS = 2
+CACHE_SERVICE_STATS = 3
+
+class LiveStatusQueryCache:
+
+ """A class describing a livestatus query cache."""
+
+ def __init__(self):
+ self.categories = [{}, {}, {}]
+ self.enabled = True
+
+ def disable(self):
+ self.enabled = False
+
+ def invalidate_category(self, category):
+ """
+ Throw away all cached results of a certain class.
+ For example, if there is a state change, we must recalculate
+ the data for the tactical overview.
+ """
+ try:
+ print "i wipe sub-cache", category
+ self.categories[category] = {}
+ except Exception:
+ pass
+
+ def wipeout(self):
+ if not self.enabled:
+ return
+ for cat in range(len(self.categories)):
+ print "WIPEOUT CAT", cat
+ self.categories[cat] = {}
+
+ def get_cached_query(self, data):
+ if not self.enabled:
+ return (False, [])
+ print "I SEARCH THE CACHE FOR", data
+ cacheable, category, key = self.strip_query(data)
+ if self.categories[category].get(key, []):
+ print "CACHE HIT"
+ return (cacheable, self.categories[category].get(key, []))
+
+ def strip_query(self, data):
+ cacheable = True
+ category = CACHE_GLOBAL_STATS
+ for line in [l.strip() for l in data.splitlines()]:
+ if ':' in line and not ' ' in line:
+ line = line.replace(':', ': ')
+ keyword = line.split(' ')[0].rstrip(':')
+ if len(line) == 0:
+ pass
+ if keyword in ('Localtime'):
+ # i dont't like this Localtime, but we usually can ignore it
+ pass
+ elif keyword == 'Columns':
+ _, columns = re.split(r"[\s]+", line, 1)
+ if [c for c in re.compile(r'\s+').split(columns) if c.endswith('state_type')]:
+ category = CACHE_GLOBAL_STATS_WITH_STATETYPE
+ elif keyword == 'Stats':
+ pass
+ elif keyword == 'Filter':
+ try:
+ _, attribute, operator, reference = re.split(r"[\s]+", line, 3)
+ except:
+ _, attribute, operator = re.split(r"[\s]+", line, 2)
+ reference = ''
+ if attribute == 'time':
+ # That's a showstopper. We can't cache time-critical
+ # informations, because within one second a lot of things
+ # can change.
+ cacheable = False
+ if cacheable:
+ return (cacheable, category, hash(data))
+ else:
+ return (False, None, None)
+
+
+
+ def find_query(self, data):
+ pass
+
+ def query_hash(self, data):
+ pass
+
+ def query_hash_with_time(self, data):
+ pass
+
+ def cache_query(self, data, result):
+ """Puts the result of a livestatus query into the cache."""
+
+ if not self.enabled:
+ return
+ cacheable, category, key = self.strip_query(data)
+ print "I PUT IN THE CACHE FOR", key
+ self.categories[category][key] = result
+
+ def judge_situation(self, brok, obj):
+ if not self.enabled:
+ return
+ try:
+ if brok.data['state_id'] != obj.state_id:
+ print "DETECTED STATECHANGE", obj
+ self.invalidate_category(CACHE_GLOBAL_STATS)
+ if brok.data['state_type_id'] != obj.state_type_id:
+ print "DETECTED STATETYPECHANGE", obj
+ self.invalidate_category(CACHE_GLOBAL_STATS_WITH_STATETYPE)
+ print obj.state_id, obj.state_type_id, brok.data['state_id'], brok.data['state_type_id']
+ except Exception:
+ pass
+
diff --git a/shinken/modules/livestatus_broker/livestatus_regenerator.py b/shinken/modules/livestatus_broker/livestatus_regenerator.py
index a47386b..a985ff3 100644
--- a/shinken/modules/livestatus_broker/livestatus_regenerator.py
+++ b/shinken/modules/livestatus_broker/livestatus_regenerator.py
@@ -64,6 +64,9 @@ class LiveStatusRegenerator(Regenerator):
setattr(self.hostgroups, '__itersorted__', types.MethodType(itersorted, self.hostgroups))
setattr(self.contactgroups, '__itersorted__', types.MethodType(itersorted, self.contactgroups))
+ # Everything is new now. We should clean the cache
+ self.cache.wipeout()
+
def manage_initial_contact_status_brok(self, b):
"""overwrite it, because the original method deletes some values"""
@@ -115,4 +118,12 @@ class LiveStatusRegenerator(Regenerator):
self.contacts.create_reversed_list()
self.notificationways.create_reversed_list()
+ def register_cache(self, cache):
+ self.cache = cache
+
+ def before_after_hook(self, brok, obj):
+ self.cache.judge_situation(brok, obj)
+
+
+
diff --git a/shinken/modules/livestatus_broker/livestatus_request.py b/shinken/modules/livestatus_broker/livestatus_request.py
index 142acae..fe3571b 100644
--- a/shinken/modules/livestatus_broker/livestatus_request.py
+++ b/shinken/modules/livestatus_broker/livestatus_request.py
@@ -33,10 +33,11 @@ class LiveStatusRequest:
"""A class describing a livestatus request."""
- def __init__(self, data, datamgr, db, pnp_path, return_queue, counters):
+ def __init__(self, data, datamgr, query_cache, db, pnp_path, return_queue, counters):
self.data = data
# Runtime data form the global LiveStatus object
self.datamgr = datamgr
+ self.query_cache = query_cache
self.db = db
self.pnp_path = pnp_path
self.return_queue = return_queue
@@ -84,6 +85,6 @@ class LiveStatusRequest:
query.parse_input('\n'.join(wait_cmds))
self.queries.append(query)
if len(query_cmds) > 0:
- query = LiveStatusQuery(self.datamgr, self.db, self.pnp_path, self.return_queue, self.counters)
+ query = LiveStatusQuery(self.datamgr, self.query_cache, self.db, self.pnp_path, self.return_queue, self.counters)
query.parse_input('\n'.join(query_cmds))
self.queries.append(query)
diff --git a/test/shinken_test.py b/test/shinken_test.py
index 41eff95..28ad18e 100755
--- a/test/shinken_test.py
+++ b/test/shinken_test.py
@@ -52,6 +52,7 @@ from shinken.modules import livestatus_broker
from shinken.modules.livestatus_broker import LiveStatus_broker
from shinken.modules.livestatus_broker.livestatus import LiveStatus
from shinken.modules.livestatus_broker.livestatus_regenerator import LiveStatusRegenerator
+from shinken.modules.livestatus_broker.livestatus_query_cache import LiveStatusQueryCache
from shinken.misc.datamanager import datamgr
livestatus_modconf = Module()
@@ -403,11 +404,14 @@ class ShinkenTest(unittest.TestCase):
self.livestatus_broker.rg = LiveStatusRegenerator()
self.livestatus_broker.datamgr = datamgr
datamgr.load(self.livestatus_broker.rg)
+ self.livestatus_broker.query_cache = LiveStatusQueryCache()
+ self.livestatus_broker.query_cache.disable()
+ self.livestatus_broker.rg.register_cache(self.livestatus_broker.query_cache)
#--- livestatus_broker.main
self.livestatus_broker.init()
self.livestatus_broker.db = self.livestatus_broker.modules_manager.instances[0]
- self.livestatus_broker.livestatus = LiveStatus(self.livestatus_broker.datamgr, self.livestatus_broker.db, self.livestatus_broker.pnp_path, self.livestatus_broker.from_q)
+ self.livestatus_broker.livestatus = LiveStatus(self.livestatus_broker.datamgr, self.livestatus_broker.query_cache, self.livestatus_broker.db, self.livestatus_broker.pnp_path, self.livestatus_broker.from_q)
#--- livestatus_broker.do_main
self.livestatus_broker.db.open()
diff --git a/test/test_livestatus_cache.py b/test/test_livestatus_cache.py
new file mode 100644
index 0000000..f30846d
--- /dev/null
+++ b/test/test_livestatus_cache.py
@@ -0,0 +1,158 @@
+from shinken_test import *
+
+class TestConfig(ShinkenTest):
+ def update_broker(self, dodeepcopy=False):
+ #The brok should be manage in the good order
+ ids = self.sched.broks.keys()
+ ids.sort()
+ for brok_id in ids:
+ brok = self.sched.broks[brok_id]
+ #print "Managing a brok type", brok.type, "of id", brok_id
+ #if brok.type == 'update_service_status':
+ # print "Problem?", brok.data['is_problem']
+ if dodeepcopy:
+ brok = copy.deepcopy(brok)
+ self.livestatus_broker.manage_brok(brok)
+ self.sched.broks = {}
+
+ pass
+
+class TestConfigBig(TestConfig):
+ def setUp(self):
+ start_setUp = time.time()
+ self.setup_with_file('etc/nagios_5r_100h_2000s.cfg')
+ self.testid = str(os.getpid() + random.randint(1, 1000))
+ self.init_livestatus()
+ print "Cleaning old broks?"
+ self.sched.fill_initial_broks()
+ self.update_broker()
+ print "************* Overall Setup:", time.time() - start_setUp
+ # add use_aggressive_host_checking so we can mix exit codes 1 and 2
+ # but still get DOWN state
+ host = self.sched.hosts.find_by_name("test_host_000")
+ host.__class__.use_aggressive_host_checking = 1
+
+ def tearDown(self):
+ self.livestatus_broker.db.commit()
+ self.livestatus_broker.db.close()
+ if os.path.exists(self.livelogs):
+ os.remove(self.livelogs)
+ if os.path.exists(self.livelogs+"-journal"):
+ os.remove(self.livelogs+"-journal")
+ if os.path.exists(self.livestatus_broker.pnp_path):
+ shutil.rmtree(self.livestatus_broker.pnp_path)
+ if os.path.exists('var/nagios.log'):
+ os.remove('var/nagios.log')
+ if os.path.exists('var/retention.dat'):
+ os.remove('var/retention.dat')
+ if os.path.exists('var/status.dat'):
+ os.remove('var/status.dat')
+ self.livestatus_broker = None
+
+
+
+ def test_stats(self):
+ self.print_header()
+ now = time.time()
+ objlist = []
+ for host in self.sched.hosts:
+ objlist.append([host, 0, 'UP'])
+ for service in self.sched.services:
+ objlist.append([service, 0, 'OK'])
+ self.scheduler_loop(1, objlist)
+ self.update_broker()
+ svc1 = self.sched.services.find_srv_by_name_and_hostname("test_host_005", "test_ok_00")
+ print svc1
+ svc2 = self.sched.services.find_srv_by_name_and_hostname("test_host_005", "test_ok_15")
+ print svc2
+ svc3 = self.sched.services.find_srv_by_name_and_hostname("test_host_005", "test_ok_16")
+ print svc3
+ svc4 = self.sched.services.find_srv_by_name_and_hostname("test_host_007", "test_ok_05")
+ print svc4
+ svc5 = self.sched.services.find_srv_by_name_and_hostname("test_host_007", "test_ok_11")
+ svc6 = self.sched.services.find_srv_by_name_and_hostname("test_host_025", "test_ok_01")
+ svc7 = self.sched.services.find_srv_by_name_and_hostname("test_host_025", "test_ok_03")
+
+ request = """GET services
+Filter: description = test_ok_11
+Filter: host_name = test_host_007
+Columns: host_name description state state_type"""
+ beforeresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+ print "before", beforeresponse
+
+ self.scheduler_loop(1, [[svc1, 1, 'W'], [svc2, 1, 'W'], [svc3, 1, 'W'], [svc4, 2, 'C'], [svc5, 3, 'U'], [svc6, 2, 'C'], [svc7, 2, 'C']])
+ self.update_broker()
+ # 1993O, 3xW, 3xC, 1xU
+ request = """GET services
+Filter: contacts >= test_contact
+Stats: state != 9999
+Stats: state = 0
+Stats: state = 1
+Stats: state = 2
+Stats: state = 3"""
+ response, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+ print 'query_6_______________\n%s\n%s\n' % (request, response)
+ self.assert_(response == '2000;1993;3;3;1\n')
+
+ # Now we play with the cache
+ request = """GET services
+Filter: description = test_ok_11
+Filter: host_name = test_host_007
+Columns: host_name description state state_type"""
+ afterresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+ print "after", afterresponse
+ self.assert_(beforeresponse != afterresponse)
+ request = """GET services
+Filter: description = test_ok_11
+Filter: host_name = test_host_007
+Columns: host_name description state state_type"""
+ repeatedresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+ print "repeated", repeatedresponse
+ self.assert_(afterresponse == repeatedresponse)
+ self.scheduler_loop(2, [[svc5, 2, 'C']])
+ self.update_broker()
+ request = """GET services
+Filter: description = test_ok_11
+Filter: host_name = test_host_007
+Columns: host_name description state state_type"""
+ notcachedresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+ print "notcached", notcachedresponse
+ self.scheduler_loop(2, [[svc5, 0, 'O']])
+ self.update_broker()
+ # 1994O, 3xW, 3xC, 0xU
+ request = """GET services
+Filter: description = test_ok_11
+Filter: host_name = test_host_007
+Columns: host_name description state state_type"""
+ againnotcachedresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+ print "againnotcached", againnotcachedresponse
+ self.assert_(notcachedresponse != againnotcachedresponse)
+ request = """GET services
+Filter: description = test_ok_11
+Filter: host_name = test_host_007
+Columns: host_name description state state_type"""
+ finallycachedresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+ print "finallycached", finallycachedresponse
+ self.assert_(againnotcachedresponse == finallycachedresponse)
+
+ request = """GET services
+Filter: contacts >= test_contact
+Stats: state != 9999
+Stats: state = 0
+Stats: state = 1
+Stats: state = 2
+Stats: state = 3"""
+ response, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+ print 'query_6_______________\n%s\n%s\n' % (request, response)
+ self.assert_(response == '2000;1994;3;3;0\n')
+
+if __name__ == '__main__':
+ #import cProfile
+ command = """unittest.main()"""
+ unittest.main()
+ #cProfile.runctx( command, globals(), locals(), filename="/tmp/livestatus.profile" )
+
+ #allsuite = unittest.TestLoader.loadTestsFromModule(TestConfig)
+ #unittest.TextTestRunner(verbosity=2).run(allsuite)
+
+
diff --git a/test/test_livestatus_db.py b/test/test_livestatus_db.py
index b79c3db..8e5f57e 100755
--- a/test/test_livestatus_db.py
+++ b/test/test_livestatus_db.py
@@ -727,6 +727,9 @@ class TestConfigNoLogstore(TestConfig):
self.livestatus_broker.rg = LiveStatusRegenerator()
self.livestatus_broker.datamgr = datamgr
datamgr.load(self.livestatus_broker.rg)
+ self.livestatus_broker.query_cache = LiveStatusQueryCache()
+ self.livestatus_broker.query_cache.disable()
+ self.livestatus_broker.rg.register_cache(self.livestatus_broker.query_cache)
#--- livestatus_broker.main
#--- livestatus_broker.do_main
@@ -735,7 +738,7 @@ class TestConfigNoLogstore(TestConfig):
#--- livestatus_broker.do_main
#--- livestatus_broker.manage_lql_thread
- self.livestatus_broker.livestatus = LiveStatus(self.livestatus_broker.datamgr, self.livestatus_broker.db, self.livestatus_broker.pnp_path, self.livestatus_broker.from_q)
+ self.livestatus_broker.livestatus = LiveStatus(self.livestatus_broker.datamgr, self.livestatus_broker.query_cache, self.livestatus_broker.db, self.livestatus_broker.pnp_path, self.livestatus_broker.from_q)
#--- livestatus_broker.manage_lql_thread
--
UNNAMED PROJECT
More information about the Pkg-nagios-changes
mailing list