[Pkg-nagios-changes] [SCM] UNNAMED PROJECT branch, debian/master, updated. 810edbdd3feedbfe37f4a65bee50b57b2f60fa2a
Gerhard Lausser
gerhard.lausser at consol.de
Tue Feb 28 22:20:17 UTC 2012
The following commit has been merged in the debian/master branch:
commit 0bbd34420a20050061ff3739c060a65ab71fa482
Author: Gerhard Lausser <gerhard.lausser at consol.de>
Date: Sun Feb 12 01:09:43 2012 +0100
Fix some bugs in the livestatus cache and make a good test case for the host/service-history check
diff --git a/shinken/modules/livestatus_broker/livestatus_query.py b/shinken/modules/livestatus_broker/livestatus_query.py
index 6ce109c..c38c919 100644
--- a/shinken/modules/livestatus_broker/livestatus_query.py
+++ b/shinken/modules/livestatus_broker/livestatus_query.py
@@ -245,9 +245,11 @@ class LiveStatusQuery(object):
# Ask the cache if this request was already answered under the same
# circumstances.
- cacheable, cached_response = self.query_cache.get_cached_query(self.raw_data)
- if cached_response:
- return cached_response
+ cacheable, cache_hit, cached_response = self.query_cache.get_cached_query(self.raw_data)
+ if cache_hit:
+ self.columns = cached_response['columns']
+ self.response.columnheaders = cached_response['columnheaders']
+ return cached_response['result']
# Make columns unique
self.filtercolumns = list(set(self.filtercolumns))
@@ -320,10 +322,16 @@ class LiveStatusQuery(object):
print "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"
result = []
- if cacheable:
+ if cacheable and not cache_hit:
# We cannot cache generators, so we must first read them into a list
result = [r for r in result]
- self.query_cache.cache_query(self.raw_data, result)
+ # Especially for stats requests also the columns and headers
+ # are modified, so we need to save them too.
+ self.query_cache.cache_query(self.raw_data, {
+ 'result': result,
+ 'columns': self.columns,
+ 'columnheaders': self.response.columnheaders,
+ })
return result
diff --git a/shinken/modules/livestatus_broker/livestatus_query_cache.py b/shinken/modules/livestatus_broker/livestatus_query_cache.py
index 1ab0b57..86da341 100644
--- a/shinken/modules/livestatus_broker/livestatus_query_cache.py
+++ b/shinken/modules/livestatus_broker/livestatus_query_cache.py
@@ -78,6 +78,9 @@ class Counter(dict):
def __missing__(self, key):
return 0
+class LFUCacheMiss(Exception):
+ pass
+
class LFU(object):
"""
This class implements a dictionary which has a limited number of elements.
@@ -102,10 +105,12 @@ class LFU(object):
self.use_count[key] += 1
try:
result = self.storage[key]
+ print "CACHE HIT"
self.hits += 1
except KeyError:
result = []
self.misses += 1
+ raise LFUCacheMiss
return result
def put(self, key, data):
@@ -115,6 +120,17 @@ class LFU(object):
del self.storage[key], self.use_count[key]
pass
+ def __str__(self):
+ text = 'LFU-------------------\n'
+ try:
+ text += 'hit rate %.2f%%\n' % (100 * self.hits / (self.hits + self.misses))
+ except ZeroDivisionError:
+ text += 'hit rate 0%\n'
+ for k in self.storage:
+ text += 'key %10s (%d used)\n' % (str(k), self.use_count[k])
+ return text
+
+
class QueryData(object):
"""
This class implements a more "machine-readable" form of a livestatus query.
@@ -134,7 +150,7 @@ class QueryData(object):
self.client_localtime = int(time.time())
self.stats_columns = [f[1] for f in self.structured_data if f[0] == 'Stats']
self.filter_columns = [f[1] for f in self.structured_data if f[0] == 'Filter']
- self.columns = [f[1] for f in self.structured_data if f[0] == 'Columns'][0]
+ self.columns = [x for f in self.structured_data if f[0] == 'Columns' for x in f[1]]
self.categorize()
def __str__(self):
@@ -292,6 +308,11 @@ class QueryData(object):
self.category = CACHE_GLOBAL_STATS
elif self.is_a_closed_chapter():
self.category = CACHE_IRREVERSIBLE_HISTORY
+ elif self.table == 'services' and not self.is_stats and has_not_more_than(self.columns, ['host_name', 'description', 'state', 'state_type']):
+ self.category = CACHE_SERVICE_STATS
+ else:
+ pass
+ print "i cannot cache this", self
@@ -326,7 +347,7 @@ class LiveStatusQueryCache(object):
the data for the tactical overview.
"""
try:
- print "i wipe sub-cache", category
+ #print "i wipe sub-cache", category
self.categories[category].clear()
except Exception:
pass
@@ -335,17 +356,17 @@ class LiveStatusQueryCache(object):
if not self.enabled:
return
for cat in range(len(self.categories)):
- print "WIPEOUT CAT", cat
self.categories[cat].clear()
def get_cached_query(self, data):
if not self.enabled:
return (False, [])
query = QueryData(data)
- print "I SEARCH THE CACHE FOR", query.category, query.key, data
- if self.categories[query.category].get(query.key):
- print "CACHE HIT"
- return (query.category != CACHE_IMPOSSIBLE, self.categories[query.category].get(query.key))
+ #print "I SEARCH THE CACHE FOR", query.category, query.key, data
+ try:
+ return (query.category != CACHE_IMPOSSIBLE, True, self.categories[query.category].get(query.key))
+ except LFUCacheMiss:
+ return (query.category != CACHE_IMPOSSIBLE, False, [])
def cache_query(self, data, result):
"""Puts the result of a livestatus query into the cache."""
@@ -353,7 +374,7 @@ class LiveStatusQueryCache(object):
if not self.enabled:
return
query = QueryData(data)
- print "I PUT IN THE CACHE FOR", query.category, query.key
+ #print "I PUT IN THE CACHE FOR", query.category, query.key
self.categories[query.category].put(query.key, result)
def impact_assessment(self, brok, obj):
@@ -367,9 +388,11 @@ class LiveStatusQueryCache(object):
if brok.data['state_id'] != obj.state_id:
print "DETECTED STATECHANGE", obj
self.invalidate_category(CACHE_GLOBAL_STATS)
+ self.invalidate_category(CACHE_SERVICE_STATS)
if brok.data['state_type_id'] != obj.state_type_id:
print "DETECTED STATETYPECHANGE", obj
self.invalidate_category(CACHE_GLOBAL_STATS_WITH_STATETYPE)
+ self.invalidate_category(CACHE_SERVICE_STATS)
print obj.state_id, obj.state_type_id, brok.data['state_id'], brok.data['state_type_id']
except Exception:
pass
diff --git a/shinken/modules/logstore_sqlite.py b/shinken/modules/logstore_sqlite.py
index 6c7be8f..ad3a0e6 100644
--- a/shinken/modules/logstore_sqlite.py
+++ b/shinken/modules/logstore_sqlite.py
@@ -397,7 +397,6 @@ class LiveStatusLogStoreSqlite(BaseModule):
# finalize the filter stacks
#self.sql_time_filter_stack.and_elements(self.sql_time_filter_stack.qsize())
self.sql_filter_stack.and_elements(self.sql_filter_stack.qsize())
- self.use_aggressive_sql = True
if self.use_aggressive_sql:
# Be aggressive, get preselected data from sqlite and do less
# filtering in python. But: only a subset of Filter:-attributes
diff --git a/test/test_livestatus_cache.py b/test/test_livestatus_cache.py
index ac63459..7faf698 100644
--- a/test/test_livestatus_cache.py
+++ b/test/test_livestatus_cache.py
@@ -1,4 +1,10 @@
from shinken_test import *
+import datetime
+
+def set_to_midnight(dt):
+ midnight = datetime.time(0)
+ return datetime.datetime.combine(dt.date(), midnight)
+
class TestConfig(ShinkenTest):
def update_broker(self, dodeepcopy=False):
@@ -40,6 +46,8 @@ class TestConfigBig(TestConfig):
os.remove(self.livelogs)
if os.path.exists(self.livelogs+"-journal"):
os.remove(self.livelogs+"-journal")
+ for arch in os.listdir('tmp/archives'):
+ os.remove('tmp/archives/' + arch)
if os.path.exists(self.livestatus_broker.pnp_path):
shutil.rmtree(self.livestatus_broker.pnp_path)
if os.path.exists('var/nagios.log'):
@@ -84,54 +92,36 @@ Columns: host_name description state state_type"""
self.scheduler_loop(1, [[svc1, 1, 'W'], [svc2, 1, 'W'], [svc3, 1, 'W'], [svc4, 2, 'C'], [svc5, 3, 'U'], [svc6, 2, 'C'], [svc7, 2, 'C']])
self.update_broker()
# 1993O, 3xW, 3xC, 1xU
- request = """GET services
+ statsrequest = """GET services
Filter: contacts >= test_contact
Stats: state != 9999
Stats: state = 0
Stats: state = 1
Stats: state = 2
Stats: state = 3"""
- response, keepalive = self.livestatus_broker.livestatus.handle_request(request)
- print 'query_6_______________\n%s\n%s\n' % (request, response)
+ response, keepalive = self.livestatus_broker.livestatus.handle_request(statsrequest)
+ print 'query_6_______________\n%s\n%s\n' % (statsrequest, response)
self.assert_(response == '2000;1993;3;3;1\n')
# Now we play with the cache
- request = """GET services
-Filter: description = test_ok_11
-Filter: host_name = test_host_007
-Columns: host_name description state state_type"""
afterresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
print "after", afterresponse
self.assert_(beforeresponse != afterresponse)
- request = """GET services
-Filter: description = test_ok_11
-Filter: host_name = test_host_007
-Columns: host_name description state state_type"""
repeatedresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
print "repeated", repeatedresponse
self.assert_(afterresponse == repeatedresponse)
+
self.scheduler_loop(2, [[svc5, 2, 'C']])
self.update_broker()
- request = """GET services
-Filter: description = test_ok_11
-Filter: host_name = test_host_007
-Columns: host_name description state state_type"""
notcachedresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
print "notcached", notcachedresponse
+
self.scheduler_loop(2, [[svc5, 0, 'O']])
self.update_broker()
# 1994O, 3xW, 3xC, 0xU
- request = """GET services
-Filter: description = test_ok_11
-Filter: host_name = test_host_007
-Columns: host_name description state state_type"""
againnotcachedresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
print "againnotcached", againnotcachedresponse
self.assert_(notcachedresponse != againnotcachedresponse)
- request = """GET services
-Filter: description = test_ok_11
-Filter: host_name = test_host_007
-Columns: host_name description state state_type"""
finallycachedresponse, keepalive = self.livestatus_broker.livestatus.handle_request(request)
print "finallycached", finallycachedresponse
self.assert_(againnotcachedresponse == finallycachedresponse)
@@ -143,12 +133,16 @@ Stats: state = 0
Stats: state = 1
Stats: state = 2
Stats: state = 3"""
- response, keepalive = self.livestatus_broker.livestatus.handle_request(request)
- print 'query_6_______________\n%s\n%s\n' % (request, response)
+ response, keepalive = self.livestatus_broker.livestatus.handle_request(statsrequest)
+ print 'query_6_______________\n%s\n%s\n' % (statsrequest, response)
+ response, keepalive = self.livestatus_broker.livestatus.handle_request(statsrequest)
+ print 'query_6_______________\n%s\n%s\n' % (statsrequest, response)
self.assert_(response == '2000;1994;3;3;0\n')
def test_a_long_history(self):
#return
+ print datetime.datetime.now()
+ print datetime.datetime.today()
test_host_005 = self.sched.hosts.find_by_name("test_host_005")
test_host_099 = self.sched.hosts.find_by_name("test_host_099")
test_ok_00 = self.sched.services.find_srv_by_name_and_hostname("test_host_005", "test_ok_00")
@@ -156,13 +150,42 @@ Stats: state = 3"""
test_ok_04 = self.sched.services.find_srv_by_name_and_hostname("test_host_005", "test_ok_04")
test_ok_16 = self.sched.services.find_srv_by_name_and_hostname("test_host_005", "test_ok_16")
test_ok_99 = self.sched.services.find_srv_by_name_and_hostname("test_host_099", "test_ok_01")
- time_warp(-1 * 20 * 24 * 3600)
- starttime = time.time()
- numlogs = self.livestatus_broker.db.execute("SELECT COUNT(*) FROM logs")
- if numlogs[0][0] == 0:
- # run silently
- old_stdout = sys.stdout
- sys.stdout = open(os.devnull, "w")
+ days = 4
+
+ etime = time.time()
+ print "now it is", time.ctime(etime)
+ print "now it is", time.gmtime(etime)
+ etime_midnight = (etime - (etime % 86400)) + time.altzone
+ print "midnight was", time.ctime(etime_midnight)
+ print "midnight was", time.gmtime(etime_midnight)
+ query_start = etime_midnight - (days - 1) * 86400
+ query_end = etime_midnight
+ print "query_start", time.ctime(query_start)
+ print "query_end ", time.ctime(query_end)
+
+ # |----------|----------|----------|----------|----------|---x
+ # etime
+ # etime_midnight
+ # ---x------
+ # etime - 4 days
+ # |---
+ # query_start
+ #
+ # ............................................
+ # events in the log database ranging till now
+ #
+ # |________________________________|
+ # events which will be read from db
+ #
+ loops = int(86400 / 192)
+ time_warp(-1 * days * 86400)
+ print "warp back to", time.ctime(time.time())
+ # run silently
+ old_stdout = sys.stdout
+ sys.stdout = open(os.devnull, "w")
+ should_be = 0
+ for day in xrange(days):
+ sys.stderr.write("day %d now it is %s i run %d loops\n" % (day, time.ctime(time.time()), loops))
self.scheduler_loop(2, [
[test_ok_00, 0, "OK"],
[test_ok_01, 0, "OK"],
@@ -171,9 +194,8 @@ Stats: state = 3"""
[test_ok_99, 0, "OK"],
])
self.update_broker()
- should_be = 0
#for i in xrange(3600 * 24 * 7):
- for i in xrange(10000):
+ for i in xrange(loops):
if i % 10000 == 0:
sys.stderr.write(str(i))
if i % 399 == 0:
@@ -184,7 +206,9 @@ Stats: state = 3"""
[test_ok_16, 1, "WARN"],
[test_ok_99, 2, "CRIT"],
])
- should_be += 3
+ if int(time.time()) >= query_start and int(time.time()) <= query_end:
+ should_be += 3
+ sys.stderr.write("now it should be %s\n" % should_be)
time.sleep(62)
if i % 399 == 0:
self.scheduler_loop(1, [
@@ -194,34 +218,37 @@ Stats: state = 3"""
[test_ok_16, 0, "OK"],
[test_ok_99, 0, "OK"],
])
- should_be += 1
+ if int(time.time()) >= query_start and int(time.time()) <= query_end:
+ should_be += 1
+ sys.stderr.write("now it should be %s\n" % should_be)
time.sleep(2)
- if i % 199 == 0:
+ if i % 9 == 0:
self.scheduler_loop(3, [
[test_ok_00, 1, "WARN"],
[test_ok_01, 2, "CRIT"],
])
+
time.sleep(62)
- if i % 199 == 0:
+ if i % 9 == 0:
self.scheduler_loop(1, [
[test_ok_00, 0, "OK"],
[test_ok_01, 0, "OK"],
])
time.sleep(2)
- if i % 299 == 0:
+ if i % 9 == 0:
self.scheduler_loop(3, [
[test_host_005, 2, "DOWN"],
])
- if i % 19 == 0:
+ if i % 2 == 0:
self.scheduler_loop(3, [
[test_host_099, 2, "DOWN"],
])
time.sleep(62)
- if i % 299 == 0:
+ if i % 9 == 0:
self.scheduler_loop(3, [
[test_host_005, 0, "UP"],
])
- if i % 19 == 0:
+ if i % 2 == 0:
self.scheduler_loop(3, [
[test_host_099, 0, "UP"],
])
@@ -230,25 +257,21 @@ Stats: state = 3"""
if i % 1000 == 0:
self.livestatus_broker.db.commit()
endtime = time.time()
- sys.stdout.close()
- sys.stdout = old_stdout
self.livestatus_broker.db.commit()
- else:
- should_be = numlogs[0][0]
- xxx = self.livestatus_broker.db.execute("SELECT min(time), max(time) FROM logs")
- print xxx
- starttime, endtime = [self.livestatus_broker.db.execute("SELECT min(time), max(time) FROM logs")][0][0]
+ sys.stderr.write("day %d end it is %s\n" % (day, time.ctime(time.time())))
+ sys.stdout.close()
+ sys.stdout = old_stdout
+ self.livestatus_broker.db.commit_and_rotate_log_db()
+ numlogs = self.livestatus_broker.db.execute("SELECT count(*) FROM logs")
+ print "numlogs is", numlogs
# now we have a lot of events
# find type = HOST ALERT for test_host_005
- q = int((endtime - starttime) / 8)
- starttime += q
- endtime -= q
request = """GET log
Columns: class time type state host_name service_description plugin_output message options contact_name command_name state_type current_host_groups current_service_groups
-Filter: time >= """ + str(int(starttime)) + """
-Filter: time <= """ + str(int(endtime)) + """
+Filter: time >= """ + str(int(query_start)) + """
+Filter: time <= """ + str(int(query_end)) + """
Filter: type = SERVICE ALERT
And: 1
Filter: type = HOST ALERT
@@ -269,18 +292,54 @@ OutputFormat: json"""
fake_time_sleep = time.sleep
time.time = original_time_time
time.sleep = original_time_sleep
+ print self.livestatus_broker.db.database_file
print request
+ print "query 1 --------------------------------------------------"
+ tic = time.time()
response, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+ tac = time.time()
+ elapsed1 = tac - tic
pyresponse = eval(response)
- print "number of records", len(pyresponse)
+ print "pyresponse", len(pyresponse)
print "should be", should_be
- print "now with cache"
+ self.assert_(len(pyresponse) == should_be)
+ print "query 2 cache---------------------------------------------"
+ tic = time.time()
response, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+ tac = time.time()
+ elapsed2 = tac - tic
pyresponse = eval(response)
- print "number of records", len(pyresponse)
- print "should be", should_be
- time.time = fake_time_time
- time.sleep = fake_time_sleep
+ self.assert_(len(pyresponse) == should_be)
+ print "clear the cache"
+ print "use aggressive sql"
+ print "query 3 --------------------------------------------------"
+ self.livestatus_broker.query_cache.wipeout()
+ self.livestatus_broker.db.use_aggressive_sql = True
+ tic = time.time()
+ response, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+ tac = time.time()
+ elapsed3 = tac - tic
+ pyresponse = eval(response)
+ self.assert_(len(pyresponse) == should_be)
+ print "query 4 cache---------------------------------------------"
+ tic = time.time()
+ response, keepalive = self.livestatus_broker.livestatus.handle_request(request)
+ tac = time.time()
+ elapsed4 = tac - tic
+ pyresponse = eval(response)
+ self.assert_(len(pyresponse) == should_be)
+ print "elapsed1", elapsed1
+ print "elapsed2", elapsed2
+ print "elapsed3", elapsed3
+ print "elapsed4", elapsed4
+ self.assert_(elapsed2 < elapsed1 / 10)
+ self.assert_(elapsed3 < elapsed1)
+ self.assert_(elapsed4 < elapsed3 / 2)
+
+ #time.time = fake_time_time
+ #time.sleep = fake_time_sleep
+
+
if __name__ == '__main__':
#import cProfile
--
UNNAMED PROJECT
More information about the Pkg-nagios-changes
mailing list