[Pkg-nagios-changes] [SCM] UNNAMED PROJECT branch, debian/master, updated. 810edbdd3feedbfe37f4a65bee50b57b2f60fa2a
Gerhard Lausser
gerhard.lausser at consol.de
Tue Feb 28 22:18:57 UTC 2012
The following commit has been merged in the debian/master branch:
commit ca85edb5981870ea0aa61d2c2bae2a90af5c082f
Author: Gerhard Lausser <gerhard.lausser at consol.de>
Date: Mon Feb 6 02:47:28 2012 +0100
Add a lfu to the livestatus cache
diff --git a/shinken/modules/livestatus_broker/livestatus_query_cache.py b/shinken/modules/livestatus_broker/livestatus_query_cache.py
index 6286267..f9faa30 100644
--- a/shinken/modules/livestatus_broker/livestatus_query_cache.py
+++ b/shinken/modules/livestatus_broker/livestatus_query_cache.py
@@ -24,6 +24,12 @@
import time
import re
+import collections
+import functools
+from itertools import ifilterfalse
+from heapq import nsmallest
+from operator import itemgetter
+
from livestatus_query import LiveStatusQuery
from livestatus_wait_query import LiveStatusWaitQuery
@@ -34,12 +40,51 @@ CACHE_GLOBAL_STATS_WITH_STATETYPE = 1
CACHE_HOST_STATS = 2
CACHE_SERVICE_STATS = 3
+class Counter(dict):
+ 'Mapping where default values are zero'
+ def __missing__(self, key):
+ return 0
+
+class LFU(object):
+ def __init__(self, maxsize=100):
+ self.storage = {}
+ self.maxsize = maxsize
+ self.hits = self.misses = 0
+ self.use_count = Counter() # times each key has been accessed
+ self.kwd_mark = object() # separate positional and keyword args
+
+ def clear(self):
+ self.storage = {}
+ self.use_count = Counter()
+ self.hits = self.misses = 0
+
+ def get(self, key):
+ self.use_count[key] += 1
+ try:
+ result = self.storage[key]
+ self.hits += 1
+ except KeyError:
+ result = []
+ self.misses += 1
+ return result
+
+ def put(self, key, data):
+ self.storage[key] = data
+ if len(self.storage) > self.maxsize:
+ for key, _ in nsmallest(maxsize // 10, self.use_count.iteritems(), key=itemgetter(1)):
+ del self.storage[key], self.use_count[key]
+ pass
+
class LiveStatusQueryCache:
"""A class describing a livestatus query cache."""
def __init__(self):
- self.categories = [{}, {}, {}]
+ self.categories = []
+ # CACHE_GLOBAL_STATS
+ self.categories.append(LFU())
+ # CACHE_GLOBAL_STATS_WITH_STATETYPE
+ self.categories.append(LFU())
self.enabled = True
def disable(self):
@@ -53,7 +98,7 @@ class LiveStatusQueryCache:
"""
try:
print "i wipe sub-cache", category
- self.categories[category] = {}
+ self.categories[category].clear()
except Exception:
pass
@@ -62,48 +107,139 @@ class LiveStatusQueryCache:
return
for cat in range(len(self.categories)):
print "WIPEOUT CAT", cat
- self.categories[cat] = {}
+ self.categories[cat].clear()
def get_cached_query(self, data):
if not self.enabled:
return (False, [])
print "I SEARCH THE CACHE FOR", data
cacheable, category, key = self.strip_query(data)
- if self.categories[category].get(key, []):
+ if self.categories[category].get(key):
print "CACHE HIT"
- return (cacheable, self.categories[category].get(key, []))
+ return (cacheable, self.categories[category].get(key))
- def strip_query(self, data):
- cacheable = True
- category = CACHE_GLOBAL_STATS
- for line in [l.strip() for l in data.splitlines()]:
+ def split_command(self, line, splits=1):
+ """Create a list from the words of a line"""
+ return line.split(' ', splits)
+
+ def split_option(self, line, splits=1):
+ """Like split_commands, but converts numbers to int data type"""
+ x = map (lambda i: (i.isdigit() and int(i)) or i, [token.strip() for token in re.split(r"[\s]+", line, splits)])
+ return x
+
+ def split_option_with_columns(self, line):
+ """Split a line in a command and a list of words"""
+ cmd, columns = self.split_option(line)
+ return cmd, [c for c in re.compile(r'\s+').split(columns)]
+
+ def strip_table_from_column(self, column):
+ """Cut off the table name, because it is possible
+ to say service_state instead of state"""
+ bygroupmatch = re.compile('(\w+)by.*group').search(self.table)
+ if bygroupmatch:
+ return re.sub(re.sub('s$', '', bygroupmatch.group(1)) + '_', '', column, 1)
+ else:
+ return re.sub(re.sub('s$', '', self.table) + '_', '', column, 1)
+
+
+ def prepare_data(self, data):
+ """
+ Reformat the lines of a query so that they are a list of tuples
+ where the first element is the keyword
+ """
+ formal_line = []
+ for line in data.splitlines():
+ line = line.strip()
+ # Tools like NagVis send KEYWORK:option, and we prefer to have
+ # a space following the :
if ':' in line and not ' ' in line:
line = line.replace(':', ': ')
keyword = line.split(' ')[0].rstrip(':')
- if len(line) == 0:
- pass
- if keyword in ('Localtime'):
- # i dont't like this Localtime, but we usually can ignore it
- pass
- elif keyword == 'Columns':
- _, columns = re.split(r"[\s]+", line, 1)
- if [c for c in re.compile(r'\s+').split(columns) if c.endswith('state_type')]:
- category = CACHE_GLOBAL_STATS_WITH_STATETYPE
- elif keyword == 'Stats':
- pass
+ if keyword == 'GET':
+ formal_line.append((keyword, self.split_command(line)[1]))
+ elif keyword == 'Columns': # Get the names of the desired columns
+ _, columns = self.split_option_with_columns(line)
+ formal_line.append((keyword, columns))
+ elif keyword == 'ResponseHeader':
+ _, responseheader = self.split_option(line)
+ formal_line.append((keyword, responseheader))
+ elif keyword == 'OutputFormat':
+ _, outputformat = self.split_option(line)
+ formal_line.append((keyword, outputformat))
+ elif keyword == 'KeepAlive':
+ _, keepalive = self.split_option(line)
+ formal_line.append((keyword, keepalive))
+ elif keyword == 'ColumnHeaders':
+ _, columnheaders = self.split_option(line)
+ formal_line.append((keyword, columnheaders))
+ elif keyword == 'Limit':
+ _, limit = self.split_option(line)
+ formal_line.append((keyword, limit))
elif keyword == 'Filter':
try:
_, attribute, operator, reference = re.split(r"[\s]+", line, 3)
except:
_, attribute, operator = re.split(r"[\s]+", line, 2)
reference = ''
- if attribute == 'time':
- # That's a showstopper. We can't cache time-critical
- # informations, because within one second a lot of things
- # can change.
- cacheable = False
+ formal_line.append((keyword, attribute, operator, reference))
+ elif keyword == 'And':
+ _, andnum = self.split_option(line)
+ formal_line.append((keyword, andnum))
+ elif keyword == 'Or':
+ _, ornum = self.split_option(line)
+ formal_line.append((keyword, ornum))
+ elif keyword == 'StatsGroupBy':
+ _, columns = self.split_option_with_columns(line)
+ formal_line.append((keyword, columns))
+ elif keyword == 'Stats':
+ try:
+ _, attribute, operator, reference = self.split_option(line, 3)
+ if attribute in ['sum', 'min', 'max', 'avg', 'std'] and reference.startswith('as '):
+ attribute, operator = operator, attribute
+ elif attribute in ['sum', 'min', 'max', 'avg', 'std'] and reference == '=':
+ attribute, operator = operator, attribute
+ reference = ''
+ except:
+ _, attribute, operator = self.split_option(line, 3)
+ if attribute in ['sum', 'min', 'max', 'avg', 'std']:
+ attribute, operator = operator, attribute
+ reference = ''
+ formal_line.append((keyword, attribute, operator, reference))
+ elif keyword == 'StatsAnd':
+ _, andnum = self.split_option(line)
+ formal_line.append((keyword, andnum))
+ elif keyword == 'StatsOr':
+ _, ornum = self.split_option(line)
+ formal_line.append((keyword, ornum))
+ elif keyword == 'Separators':
+ _, sep1, sep2, sep3, sep4 = line.split(' ', 5)
+ formal_line.append((keyword, sep1, sep2, sep3, sep4))
+ elif keyword == 'Localtime':
+ _, client_localtime = self.split_option(line)
+ # NO # formal_line.append((keyword, client_localtime))
+ else:
+ print "Received a line of input which i can't handle : '%s'" % line
+ formal_line.append((keyword, 'Received a line of input which i can\'t handle: %s' % line))
+ return formal_line
+
+
+
+ def strip_query(self, data):
+ cacheable = True
+ category = CACHE_GLOBAL_STATS
+ formal_data = self.prepare_data(data)
+ if 'Columns' in [f[0] for f in formal_data]:
+ if [c for c in [f[1] for f in formal_data if f[0] == 'Columns'][0] if c.endswith('state_type')]:
+ category = CACHE_GLOBAL_STATS_WITH_STATETYPE
+ if 'Filter' in [f[0] for f in formal_data]:
+ if 'time' in [f[1] for f in formal_data if f[0] == 'Filter']:
+ # That's a showstopper. We can't cache time-critical
+ # informations, because within one second a lot of things
+ # can change.
+ cacheable = False
+
if cacheable:
- return (cacheable, category, hash(data))
+ return (cacheable, category, hash(str(formal_data)))
else:
return (False, None, None)
@@ -123,11 +259,16 @@ class LiveStatusQueryCache:
if not self.enabled:
return
- cacheable, category, key = self.strip_query(data)
+ _, category, key = self.strip_query(data)
print "I PUT IN THE CACHE FOR", key
- self.categories[category][key] = result
+ self.categories[category].put(key, result)
+ print self.prepare_data(data)
- def judge_situation(self, brok, obj):
+ def impact_assessment(self, brok, obj):
+ """
+ Find out if there are changes to the object which will affect
+ the validity of cached data.
+ """
if not self.enabled:
return
try:
diff --git a/shinken/modules/livestatus_broker/livestatus_regenerator.py b/shinken/modules/livestatus_broker/livestatus_regenerator.py
index a985ff3..ac49dbc 100644
--- a/shinken/modules/livestatus_broker/livestatus_regenerator.py
+++ b/shinken/modules/livestatus_broker/livestatus_regenerator.py
@@ -122,8 +122,4 @@ class LiveStatusRegenerator(Regenerator):
self.cache = cache
def before_after_hook(self, brok, obj):
- self.cache.judge_situation(brok, obj)
-
-
-
-
+ self.cache.impact_assessment(brok, obj)
diff --git a/test/test_livestatus_cache.py b/test/test_livestatus_cache.py
index f30846d..8565b4b 100644
--- a/test/test_livestatus_cache.py
+++ b/test/test_livestatus_cache.py
@@ -23,6 +23,7 @@ class TestConfigBig(TestConfig):
self.setup_with_file('etc/nagios_5r_100h_2000s.cfg')
self.testid = str(os.getpid() + random.randint(1, 1000))
self.init_livestatus()
+ self.livestatus_broker.query_cache.enabled = True
print "Cleaning old broks?"
self.sched.fill_initial_broks()
self.update_broker()
--
UNNAMED PROJECT
More information about the Pkg-nagios-changes
mailing list