[Secure-testing-commits] r14614 - lib/python

Florian Weimer fw at alioth.debian.org
Thu May 6 13:57:39 UTC 2010


Author: fw
Date: 2010-05-06 13:57:35 +0000 (Thu, 06 May 2010)
New Revision: 14614

Added:
   lib/python/xpickle.py
Log:
lib/python/xpickle.py: pickle helper


Added: lib/python/xpickle.py
===================================================================
--- lib/python/xpickle.py	                        (rev 0)
+++ lib/python/xpickle.py	2010-05-06 13:57:35 UTC (rev 14614)
@@ -0,0 +1,124 @@
+# xpickle -- pickle helpers
+# Copyright (C) 2010 Florian Weimer <fw at deneb.enyo.de>
+# 
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# 
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+from __future__ import with_statement
+
+import errno
+import os.path
+import cPickle as pickle
+import tempfile
+
+EXTENSION = '.xpck'
+
+def safeunlink(path):
+    """Removes the file.
+    No exception is thrown if the file does not exist."""
+    try:
+        os.unlink(path)
+    except OSError, e:
+        if e.errno != errno.ENOENT:
+            raise e
+
+def replacefile(path, action):
+    """Calls the action to replace the file at path.
+
+    The action is called with two arguments, the path to the temporary
+    file, and an open file object for that temporary file.  On success,
+    the temporary file is renmaed as the original file, atomically
+    replacing it.  The return value is the value returned by the action."""
+    t_fd, t_name = tempfile.mkstemp(suffix='.tmp', dir=os.path.dirname(path))
+    try:
+        t = os.fdopen(t_fd, "w")
+        try:
+            result = action(t_name, t)
+        finally:
+            t.close()
+        os.rename(t_name, path)
+        t_name = None
+    finally:
+        if t_name is not None:
+            safeunlink(t_name)
+    return result
+    
+def _wraploader(typ, parser):
+    # Format of the top-most object in the picke:
+    #
+    #   ((type, size, mtime, inode), payload)
+    #
+    # The first element is used to check for up-to-date-ness.
+
+    def safeload(path):
+        try:
+            with file(path + EXTENSION) as f:
+                return (pickle.load(f), True)
+        except (EOFError, IOError, pickle.PickleError):
+            return (None, False)
+
+    def check(data, st):
+        try:
+            obj = data[1]
+            if data[0] == (typ, st.st_size, st.st_mtime, st.st_ino):
+                return (obj, True)
+        except (IndexError, TypeError):
+            pass
+        return (None, False)
+
+    def reparse(path, st):
+        with file(path) as f:
+            obj = parser(path, f)
+        data = pickle.dumps(
+            ((typ, st.st_size, st.st_mtime, st.st_ino), obj), -1)
+        replacefile(path + EXTENSION, lambda name, f: f.write(data))
+        return obj
+
+    def loader(path):
+        st = os.stat(path)
+        xpck = path + EXTENSION
+        data, success = safeload(path)
+        if success:
+            obj, success = check(data, st)
+            if success:
+                return obj
+        return reparse(path, st)
+    loader.__doc__ = parser.__doc__
+    return loader
+
+def loader(file_type):
+    """Adds disk-based memoization to the annotated parser function.
+
+    The function takes two arguments, the file name and a file object.
+    file_type is an arbitrary string, also useful for versioninging."""
+    return lambda f: _wraploader(file_type, f)
+
+def _test():
+    with tempfile.NamedTemporaryFile() as t:
+        try:
+            data = "foo bar baz\n"
+            t.write(data)
+            t.flush()
+
+            l = _wraploader("foo", lambda p, f: f.read())
+            assert l(t.name) == data
+            assert l(t.name) == data
+            t.write(data)
+            t.flush()
+            assert l(t.name) == (data + data)
+        finally:
+            safeunlink(t.name + EXTENSION)
+
+if __name__ == "__main__":
+    _test()




More information about the Secure-testing-commits mailing list