[Pkg-nagios-changes] [SCM] UNNAMED PROJECT branch, debian/master, updated. 810edbdd3feedbfe37f4a65bee50b57b2f60fa2a

Gerhard Lausser gerhard.lausser at consol.de
Tue Feb 28 22:18:57 UTC 2012


The following commit has been merged in the debian/master branch:
commit ca85edb5981870ea0aa61d2c2bae2a90af5c082f
Author: Gerhard Lausser <gerhard.lausser at consol.de>
Date:   Mon Feb 6 02:47:28 2012 +0100

    Add a lfu to the livestatus cache

diff --git a/shinken/modules/livestatus_broker/livestatus_query_cache.py b/shinken/modules/livestatus_broker/livestatus_query_cache.py
index 6286267..f9faa30 100644
--- a/shinken/modules/livestatus_broker/livestatus_query_cache.py
+++ b/shinken/modules/livestatus_broker/livestatus_query_cache.py
@@ -24,6 +24,12 @@
 
 import time
 import re
+import collections
+import functools
+from itertools import ifilterfalse
+from heapq import nsmallest
+from operator import itemgetter
+
 
 from livestatus_query import LiveStatusQuery
 from livestatus_wait_query import LiveStatusWaitQuery
@@ -34,12 +40,51 @@ CACHE_GLOBAL_STATS_WITH_STATETYPE = 1
 CACHE_HOST_STATS = 2
 CACHE_SERVICE_STATS = 3
 
+class Counter(dict):
+    'Mapping where default values are zero'
+    def __missing__(self, key):
+        return 0
+
+class LFU(object):
+    def __init__(self, maxsize=100):
+        self.storage = {}
+        self.maxsize = maxsize
+        self.hits = self.misses = 0
+        self.use_count = Counter()           # times each key has been accessed
+        self.kwd_mark = object()             # separate positional and keyword args
+
+    def clear(self):
+        self.storage = {}
+        self.use_count = Counter()
+        self.hits = self.misses = 0
+
+    def get(self, key):
+        self.use_count[key] += 1
+        try:
+            result = self.storage[key]
+            self.hits += 1
+        except KeyError:
+            result = []
+            self.misses += 1
+        return result
+
+    def put(self, key, data):
+        self.storage[key] = data
+        if len(self.storage) > self.maxsize:
+            for key, _ in nsmallest(maxsize // 10, self.use_count.iteritems(), key=itemgetter(1)):
+                del self.storage[key], self.use_count[key]
+        pass
+
 class LiveStatusQueryCache:
 
     """A class describing a livestatus query cache."""
 
     def __init__(self):
-        self.categories = [{}, {}, {}]
+        self.categories = []
+        # CACHE_GLOBAL_STATS
+        self.categories.append(LFU())
+        # CACHE_GLOBAL_STATS_WITH_STATETYPE
+        self.categories.append(LFU())
         self.enabled = True
 
     def disable(self):
@@ -53,7 +98,7 @@ class LiveStatusQueryCache:
         """
         try:
             print "i wipe sub-cache", category
-            self.categories[category] = {}
+            self.categories[category].clear()
         except Exception:
             pass
 
@@ -62,48 +107,139 @@ class LiveStatusQueryCache:
             return
         for cat in range(len(self.categories)):
             print "WIPEOUT CAT", cat
-            self.categories[cat] = {}
+            self.categories[cat].clear()
 
     def get_cached_query(self, data):
         if not self.enabled:
             return (False, [])
         print "I SEARCH THE CACHE FOR", data
         cacheable, category, key = self.strip_query(data)
-        if self.categories[category].get(key, []):
+        if self.categories[category].get(key):
             print "CACHE HIT"
-        return (cacheable, self.categories[category].get(key, []))
+        return (cacheable, self.categories[category].get(key))
 
-    def strip_query(self, data):
-        cacheable = True
-        category = CACHE_GLOBAL_STATS
-        for line in [l.strip() for l in data.splitlines()]:
+    def split_command(self, line, splits=1):
+        """Create a list from the words of a line"""
+        return line.split(' ', splits)
+
+    def split_option(self, line, splits=1):
+        """Like split_commands, but converts numbers to int data type"""
+        x = map (lambda i: (i.isdigit() and int(i)) or i, [token.strip() for token in re.split(r"[\s]+", line, splits)])
+        return x
+
+    def split_option_with_columns(self, line):
+        """Split a line in a command and a list of words"""
+        cmd, columns = self.split_option(line)
+        return cmd, [c for c in re.compile(r'\s+').split(columns)]
+
+    def strip_table_from_column(self, column):
+        """Cut off the table name, because it is possible
+        to say service_state instead of state"""
+        bygroupmatch = re.compile('(\w+)by.*group').search(self.table)
+        if bygroupmatch:
+            return re.sub(re.sub('s$', '', bygroupmatch.group(1)) + '_', '', column, 1)
+        else:
+            return re.sub(re.sub('s$', '', self.table) + '_', '', column, 1)
+
+
+    def prepare_data(self, data):
+        """
+        Reformat the lines of a query so that they are a list of tuples
+        where the first element is the keyword
+        """
+        formal_line = []
+        for line in data.splitlines():
+            line = line.strip()
+            # Tools like NagVis send KEYWORK:option, and we prefer to have
+            # a space following the :
             if ':' in line and not ' ' in line:
                 line = line.replace(':', ': ')
             keyword = line.split(' ')[0].rstrip(':')
-            if len(line) == 0:
-                pass
-            if keyword in ('Localtime'):
-                # i dont't like this Localtime, but we usually can ignore it
-                pass
-            elif keyword == 'Columns':
-                _, columns = re.split(r"[\s]+", line, 1)
-                if [c for c in re.compile(r'\s+').split(columns) if c.endswith('state_type')]:
-                    category = CACHE_GLOBAL_STATS_WITH_STATETYPE
-            elif keyword == 'Stats':
-                pass
+            if keyword == 'GET':
+                formal_line.append((keyword, self.split_command(line)[1]))
+            elif keyword == 'Columns': # Get the names of the desired columns
+                _, columns = self.split_option_with_columns(line)
+                formal_line.append((keyword, columns))
+            elif keyword == 'ResponseHeader':
+                _, responseheader = self.split_option(line)
+                formal_line.append((keyword, responseheader))
+            elif keyword == 'OutputFormat':
+                _, outputformat = self.split_option(line)
+                formal_line.append((keyword, outputformat))
+            elif keyword == 'KeepAlive':
+                _, keepalive = self.split_option(line)
+                formal_line.append((keyword, keepalive))
+            elif keyword == 'ColumnHeaders':
+                _, columnheaders = self.split_option(line)
+                formal_line.append((keyword, columnheaders))
+            elif keyword == 'Limit':
+                _, limit = self.split_option(line)
+                formal_line.append((keyword, limit))
             elif keyword == 'Filter':
                 try:
                     _, attribute, operator, reference = re.split(r"[\s]+", line, 3)
                 except:
                     _, attribute, operator = re.split(r"[\s]+", line, 2)
                     reference = ''
-                if attribute == 'time':
-                    # That's a showstopper. We can't cache time-critical
-                    # informations, because within one second a lot of things
-                    # can change.
-                    cacheable = False
+                formal_line.append((keyword, attribute, operator, reference))
+            elif keyword == 'And':
+                _, andnum = self.split_option(line)
+                formal_line.append((keyword, andnum))
+            elif keyword == 'Or':
+                _, ornum = self.split_option(line)
+                formal_line.append((keyword, ornum))
+            elif keyword == 'StatsGroupBy':
+                _, columns = self.split_option_with_columns(line)
+                formal_line.append((keyword, columns))
+            elif keyword == 'Stats':
+                try:
+                    _, attribute, operator, reference = self.split_option(line, 3)
+                    if attribute in ['sum', 'min', 'max', 'avg', 'std'] and reference.startswith('as '):
+                        attribute, operator = operator, attribute
+                    elif attribute in ['sum', 'min', 'max', 'avg', 'std'] and reference == '=':
+                        attribute, operator = operator, attribute
+                        reference = ''
+                except:
+                    _, attribute, operator = self.split_option(line, 3)
+                    if attribute in ['sum', 'min', 'max', 'avg', 'std']:
+                        attribute, operator = operator, attribute
+                    reference = ''
+                formal_line.append((keyword, attribute, operator, reference))
+            elif keyword == 'StatsAnd':
+                _, andnum = self.split_option(line)
+                formal_line.append((keyword, andnum))
+            elif keyword == 'StatsOr':
+                _, ornum = self.split_option(line)
+                formal_line.append((keyword, ornum))
+            elif keyword == 'Separators':
+                _, sep1, sep2, sep3, sep4 = line.split(' ', 5)
+                formal_line.append((keyword, sep1, sep2, sep3, sep4))
+            elif keyword == 'Localtime':
+                _, client_localtime = self.split_option(line)
+                # NO # formal_line.append((keyword, client_localtime))
+            else:
+                print "Received a line of input which i can't handle : '%s'" % line
+                formal_line.append((keyword, 'Received a line of input which i can\'t handle: %s' % line))
+        return formal_line
+
+
+
+    def strip_query(self, data):
+        cacheable = True
+        category = CACHE_GLOBAL_STATS
+        formal_data = self.prepare_data(data)
+        if 'Columns' in [f[0] for f in formal_data]:
+            if [c for c in [f[1] for f in formal_data if f[0] == 'Columns'][0] if c.endswith('state_type')]:
+                category = CACHE_GLOBAL_STATS_WITH_STATETYPE
+        if 'Filter' in [f[0] for f in formal_data]:
+            if 'time' in [f[1] for f in formal_data if f[0] == 'Filter']:
+                # That's a showstopper. We can't cache time-critical
+                # informations, because within one second a lot of things
+                # can change.
+                cacheable = False
+     
         if cacheable:
-            return (cacheable, category, hash(data))
+            return (cacheable, category, hash(str(formal_data)))
         else:
             return (False, None, None)
 
@@ -123,11 +259,16 @@ class LiveStatusQueryCache:
 
         if not self.enabled:
             return
-        cacheable, category, key = self.strip_query(data)
+        _, category, key = self.strip_query(data)
         print "I PUT IN THE CACHE FOR", key
-        self.categories[category][key] = result
+        self.categories[category].put(key, result)
+        print self.prepare_data(data)
 
-    def judge_situation(self, brok, obj):
+    def impact_assessment(self, brok, obj):
+        """
+        Find out if there are changes to the object which will affect
+        the validity of cached data.
+        """
         if not self.enabled:
             return
         try:
diff --git a/shinken/modules/livestatus_broker/livestatus_regenerator.py b/shinken/modules/livestatus_broker/livestatus_regenerator.py
index a985ff3..ac49dbc 100644
--- a/shinken/modules/livestatus_broker/livestatus_regenerator.py
+++ b/shinken/modules/livestatus_broker/livestatus_regenerator.py
@@ -122,8 +122,4 @@ class LiveStatusRegenerator(Regenerator):
         self.cache = cache
 
     def before_after_hook(self, brok, obj):
-        self.cache.judge_situation(brok, obj)
-
-
-
-
+        self.cache.impact_assessment(brok, obj)
diff --git a/test/test_livestatus_cache.py b/test/test_livestatus_cache.py
index f30846d..8565b4b 100644
--- a/test/test_livestatus_cache.py
+++ b/test/test_livestatus_cache.py
@@ -23,6 +23,7 @@ class TestConfigBig(TestConfig):
         self.setup_with_file('etc/nagios_5r_100h_2000s.cfg')
         self.testid = str(os.getpid() + random.randint(1, 1000))
         self.init_livestatus()
+        self.livestatus_broker.query_cache.enabled = True
         print "Cleaning old broks?"
         self.sched.fill_initial_broks()
         self.update_broker()

-- 
UNNAMED PROJECT



More information about the Pkg-nagios-changes mailing list