[Python-modules-commits] [python-async] 02/08: Imported Upstream version 0.6.2

Takaki Taniguchi takaki at moszumanska.debian.org
Wed Nov 25 12:54:35 UTC 2015


This is an automated email from the git hooks/post-receive script.

takaki pushed a commit to branch master
in repository python-async.

commit 1f57e7b0c2ec2e7fb4060f697afe0251ea22b8a9
Author: TANIGUCHI Takaki <takaki at asis.media-as.org>
Date:   Wed Nov 25 21:46:04 2015 +0900

    Imported Upstream version 0.6.2
---
 LICENSE                             |   30 +
 MANIFEST.in                         |   13 +
 PKG-INFO                            |   19 +-
 README                              |   38 --
 __init__.py                         |   42 --
 async.egg-info/PKG-INFO             |   25 +
 async.egg-info/SOURCES.txt          |   25 +
 async.egg-info/dependency_links.txt |    1 +
 async.egg-info/top_level.txt        |    1 +
 async.egg-info/zip-safe             |    1 +
 async/__init__.py                   |   10 +
 async/channel.py                    |  388 ++++++++++++
 async/graph.py                      |  130 +++++
 async/pool.py                       |  514 ++++++++++++++++
 async/task.py                       |  243 ++++++++
 async/test/__init__.py              |    4 +
 async/test/lib.py                   |   20 +
 async/test/task.py                  |  205 +++++++
 async/test/test_channel.py          |  110 ++++
 async/test/test_example.py          |   45 ++
 async/test/test_graph.py            |   86 +++
 async/test/test_performance.py      |   57 ++
 async/test/test_pool.py             |  427 ++++++++++++++
 async/test/test_thread.py           |   57 ++
 async/thread.py                     |  218 +++++++
 async/util.py                       |  283 +++++++++
 channel.py                          |  381 ------------
 graph.py                            |  126 ----
 mod/__init__.py                     |    0
 mod/zlibmodule.c                    | 1104 -----------------------------------
 pool.py                             |  508 ----------------
 setup.cfg                           |    5 +
 setup.py                            |  135 +++--
 task.py                             |  242 --------
 test/__init__.py                    |    0
 test/lib.py                         |    6 -
 test/mod/__init__.py                |    0
 test/mod/test_zlib.py               |   70 ---
 test/task.py                        |  202 -------
 test/test_channel.py                |   98 ----
 test/test_example.py                |   44 --
 test/test_graph.py                  |   80 ---
 test/test_performance.py            |   51 --
 test/test_pool.py                   |  482 ---------------
 test/test_task.py                   |   15 -
 test/test_thread.py                 |   44 --
 thread.py                           |  206 -------
 util.py                             |  279 ---------
 48 files changed, 2989 insertions(+), 4081 deletions(-)

diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..710010f
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,30 @@
+Copyright (C) 2010, 2011 Sebastian Thiel and contributors
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without 
+modification, are permitted provided that the following conditions 
+are met:
+
+* Redistributions of source code must retain the above copyright 
+notice, this list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright 
+notice, this list of conditions and the following disclaimer in the 
+documentation and/or other materials provided with the distribution.
+
+* Neither the name of the async project nor the names of 
+its contributors may be used to endorse or promote products derived 
+from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 
+TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
+PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 
+NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..5986d6a
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1,13 @@
+include VERSION
+include LICENSE
+include CHANGES
+include AUTHORS
+include README.rst
+
+graft async/test
+
+global-exclude .git*
+global-exclude *.pyc
+global-exclude *.so
+global-exclude *.dll
+global-exclude *.o
diff --git a/PKG-INFO b/PKG-INFO
index 4d84fe1..841d1af 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
-Metadata-Version: 1.0
+Metadata-Version: 1.1
 Name: async
-Version: 0.6.1
+Version: 0.6.2
 Summary: Async Framework
 Home-page: http://gitorious.org/git-python/async
 Author: Sebastian Thiel
@@ -8,3 +8,18 @@ Author-email: byronimo at gmail.com
 License: BSD License
 Description: Async is a framework to process interdependent tasks in a pool of workers
 Platform: UNKNOWN
+Classifier: Development Status :: 7 - Inactive
+Classifier: Environment :: Console
+Classifier: Intended Audience :: Developers
+Classifier: License :: OSI Approved :: BSD License
+Classifier: Operating System :: OS Independent
+Classifier: Operating System :: POSIX
+Classifier: Operating System :: Microsoft :: Windows
+Classifier: Operating System :: MacOS :: MacOS X
+Classifier: Programming Language :: Python
+Classifier: Programming Language :: Python :: 2
+Classifier: Programming Language :: Python :: 2.6
+Classifier: Programming Language :: Python :: 2.7
+Classifier: Programming Language :: Python :: 3
+Classifier: Programming Language :: Python :: 3.3
+Classifier: Programming Language :: Python :: 3.4
diff --git a/README b/README
deleted file mode 100644
index 265f465..0000000
--- a/README
+++ /dev/null
@@ -1,38 +0,0 @@
-async
-=====
-Async aims to make writing asyncronous processing easier. It provides a task-graph 
-with interdependent tasks that communicate using blocking channels, allowing 
-to delay actual computations until items are requested.
-Tasks will automatically be distributed among 0 or more threads for the actual computation.
-
-Even though the GIL effectively prevents true concurrency, operations which block, 
-such as file IO, can be sped up with it already. In conjuction with 
-custom c extensions which release the GIL, true concurrency can be obtained as well.
-
-REQUIREMENTS
-============
-
-* Python Nose - for running the tests
-
-SOURCE
-======
-The source is available in a git repository at gitorious and github:
-
-git://gitorious.org/git-python/async.git
-git://github.com/Byron/async.git
-
-Run the tests with 
- cd async
- nosetests
-
-MAILING LIST
-============
-http://groups.google.com/group/git-python
-
-ISSUE TRACKER
-=============
-http://byronimo.lighthouseapp.com/projects/51787-gitpython
-
-LICENSE
-=======
-New BSD License
diff --git a/__init__.py b/__init__.py
deleted file mode 100644
index 1d1da11..0000000
--- a/__init__.py
+++ /dev/null
@@ -1,42 +0,0 @@
-"""Initialize the multi-processing package"""
-
-#{ Initialization
-def _init_atexit():
-	"""Setup an at-exit job to be sure our workers are shutdown correctly before
-	the interpreter quits"""
-	import atexit
-	import thread
-	atexit.register(thread.do_terminate_threads)
-	
-def _init_signals():
-	"""Assure we shutdown our threads correctly when being interrupted"""
-	import signal
-	import thread
-	import sys
-	
-	prev_handler = signal.getsignal(signal.SIGINT)
-	def thread_interrupt_handler(signum, frame):
-		thread.do_terminate_threads()
-		if callable(prev_handler):
-			prev_handler(signum, frame)
-			raise KeyboardInterrupt()
-		# END call previous handler
-	# END signal handler
-	try:
-		signal.signal(signal.SIGINT, thread_interrupt_handler)
-	except ValueError:
-		# happens if we don't try it from the main thread
-		print >> sys.stderr, "Failed to setup thread-interrupt handler. This is usually not critical"
-	# END exception handling
-
-
-#} END init
-
-_init_atexit()
-_init_signals()
-
-
-# initial imports
-from task import *
-from pool import *
-from channel import *
diff --git a/async.egg-info/PKG-INFO b/async.egg-info/PKG-INFO
new file mode 100644
index 0000000..841d1af
--- /dev/null
+++ b/async.egg-info/PKG-INFO
@@ -0,0 +1,25 @@
+Metadata-Version: 1.1
+Name: async
+Version: 0.6.2
+Summary: Async Framework
+Home-page: http://gitorious.org/git-python/async
+Author: Sebastian Thiel
+Author-email: byronimo at gmail.com
+License: BSD License
+Description: Async is a framework to process interdependent tasks in a pool of workers
+Platform: UNKNOWN
+Classifier: Development Status :: 7 - Inactive
+Classifier: Environment :: Console
+Classifier: Intended Audience :: Developers
+Classifier: License :: OSI Approved :: BSD License
+Classifier: Operating System :: OS Independent
+Classifier: Operating System :: POSIX
+Classifier: Operating System :: Microsoft :: Windows
+Classifier: Operating System :: MacOS :: MacOS X
+Classifier: Programming Language :: Python
+Classifier: Programming Language :: Python :: 2
+Classifier: Programming Language :: Python :: 2.6
+Classifier: Programming Language :: Python :: 2.7
+Classifier: Programming Language :: Python :: 3
+Classifier: Programming Language :: Python :: 3.3
+Classifier: Programming Language :: Python :: 3.4
diff --git a/async.egg-info/SOURCES.txt b/async.egg-info/SOURCES.txt
new file mode 100644
index 0000000..0937551
--- /dev/null
+++ b/async.egg-info/SOURCES.txt
@@ -0,0 +1,25 @@
+AUTHORS
+LICENSE
+MANIFEST.in
+setup.py
+async/__init__.py
+async/channel.py
+async/graph.py
+async/pool.py
+async/task.py
+async/thread.py
+async/util.py
+async.egg-info/PKG-INFO
+async.egg-info/SOURCES.txt
+async.egg-info/dependency_links.txt
+async.egg-info/top_level.txt
+async.egg-info/zip-safe
+async/test/__init__.py
+async/test/lib.py
+async/test/task.py
+async/test/test_channel.py
+async/test/test_example.py
+async/test/test_graph.py
+async/test/test_performance.py
+async/test/test_pool.py
+async/test/test_thread.py
\ No newline at end of file
diff --git a/async.egg-info/dependency_links.txt b/async.egg-info/dependency_links.txt
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/async.egg-info/dependency_links.txt
@@ -0,0 +1 @@
+
diff --git a/async.egg-info/top_level.txt b/async.egg-info/top_level.txt
new file mode 100644
index 0000000..4812b6f
--- /dev/null
+++ b/async.egg-info/top_level.txt
@@ -0,0 +1 @@
+async
diff --git a/async.egg-info/zip-safe b/async.egg-info/zip-safe
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/async.egg-info/zip-safe
@@ -0,0 +1 @@
+
diff --git a/async/__init__.py b/async/__init__.py
new file mode 100644
index 0000000..3f337af
--- /dev/null
+++ b/async/__init__.py
@@ -0,0 +1,10 @@
+# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo at gmail.com) and contributors
+#
+# This module is part of async and is released under
+# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
+"""Initialize the multi-processing package"""
+
+# initial imports
+from .task import *
+from .pool import *
+from .channel import *
diff --git a/async/channel.py b/async/channel.py
new file mode 100644
index 0000000..3f496c8
--- /dev/null
+++ b/async/channel.py
@@ -0,0 +1,388 @@
+# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo at gmail.com) and contributors
+#
+# This module is part of async and is released under
+# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
+"""Contains a queue based channel implementation"""
+try:
+    from queue import Empty
+except ImportError:
+    from Queue import Empty
+
+from .util import (
+        AsyncQueue,
+        SyncQueue,
+        ReadOnly
+        )
+
+import threading
+import sys
+
+__all__ = ( 'Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter',
+            'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly',
+            'IteratorReader', 'CallbackReaderMixin', 'CallbackWriterMixin')
+
+#{ Classes
+class Channel(object):
+    """A channel is similar to a file like object. It has a write end as well as one or
+    more read ends. If Data is in the channel, it can be read, if not the read operation
+    will block until data becomes available.
+    If the channel is closed, any read operation will result in an exception
+
+    This base class is not instantiated directly, but instead serves as constructor
+    for Rwriter pairs.
+
+    Create a new channel """
+    __slots__ = 'queue'
+
+    # The queue to use to store the actual data
+    QueueCls = AsyncQueue
+
+    def __init__(self):
+        """initialize this instance with a queue holding the channel contents"""
+        self.queue = self.QueueCls()
+
+
+class SerialChannel(Channel):
+    """A slightly faster version of a Channel, which sacrificed thead-safety for performance"""
+    QueueCls = SyncQueue
+
+
+class Writer(object):
+    """A writer is an object providing write access to a possibly blocking reading device"""
+    __slots__ = tuple()
+
+    #{ Interface
+
+    def __init__(self, device):
+        """Initialize the instance with the device to write to"""
+
+    def write(self, item, block=True, timeout=None):
+        """Write the given item into the device
+        :param block: True if the device may block until space for the item is available
+        :param timeout: The time in seconds to wait for the device to become ready
+        in blocking mode"""
+        raise NotImplementedError()
+
+    def size(self):
+        """:return: number of items already in the device, they could be read with a reader"""
+        raise NotImplementedError()
+
+    def close(self):
+        """Close the channel. Multiple close calls on a closed channel are no
+        an error"""
+        raise NotImplementedError()
+
+    def closed(self):
+        """:return: True if the channel was closed"""
+        raise NotImplementedError()
+
+    #} END interface
+
+
+class ChannelWriter(Writer):
+    """The write end of a channel, a file-like interface for a channel"""
+    __slots__ = ('channel', '_put')
+
+    def __init__(self, channel):
+        """Initialize the writer to use the given channel"""
+        self.channel = channel
+        self._put = self.channel.queue.put
+
+    #{ Interface
+    def write(self, item, block=False, timeout=None):
+        return self._put(item, block, timeout)
+
+    def size(self):
+        return self.channel.queue.qsize()
+
+    def close(self):
+        """Close the channel. Multiple close calls on a closed channel are no
+        an error"""
+        self.channel.queue.set_writable(False)
+
+    def closed(self):
+        """:return: True if the channel was closed"""
+        return not self.channel.queue.writable()
+    #} END interface
+
+
+class CallbackWriterMixin(object):
+    """The write end of a channel which allows you to setup a callback to be
+    called after an item was written to the channel"""
+    # slots don't work with mixin's :(
+    # __slots__ = ('_pre_cb')
+
+    def __init__(self, *args):
+        super(CallbackWriterMixin, self).__init__(*args)
+        self._pre_cb = None
+
+    def set_pre_cb(self, fun = lambda item: item):
+        """
+        Install a callback to be called before the given item is written.
+        It returns a possibly altered item which will be written to the channel
+        instead, making it useful for pre-write item conversions.
+        Providing None uninstalls the current method.
+
+        :return: the previously installed function or None
+        :note: Must be thread-safe if the channel is used in multiple threads"""
+        prev = self._pre_cb
+        self._pre_cb = fun
+        return prev
+
+    def write(self, item, block=True, timeout=None):
+        if self._pre_cb:
+            item = self._pre_cb(item)
+        super(CallbackWriterMixin, self).write(item, block, timeout)
+
+
+class CallbackChannelWriter(CallbackWriterMixin, ChannelWriter):
+    """Implements a channel writer with callback functionality"""
+    pass
+
+
+class Reader(object):
+    """Allows reading from a device"""
+    __slots__ = tuple()
+
+    #{ Interface
+    def __init__(self, device):
+        """Initialize the instance with the device to read from"""
+
+    #{ Iterator protocol
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        """Implements the iterator protocol, iterating individual items"""
+        items = self.read(1)
+        if items:
+            return items[0]
+        raise StopIteration
+
+    def next(self):
+        """Support the Python 2 iterator syntax"""
+        return self.__next__()
+
+    #} END iterator protocol
+
+
+    #{ Interface
+
+    def read(self, count=0, block=True, timeout=None):
+        """
+        read a list of items read from the device. The list, as a sequence
+        of items, is similar to the string of characters returned when reading from
+        file like objects.
+
+        :param count: given amount of items to read. If < 1, all items will be read
+        :param block: if True, the call will block until an item is available
+        :param timeout: if positive and block is True, it will block only for the
+            given amount of seconds, returning the items it received so far.
+            The timeout is applied to each read item, not for the whole operation.
+        :return: single item in a list if count is 1, or a list of count items.
+            If the device was empty and count was 1, an empty list will be returned.
+            If count was greater 1, a list with less than count items will be
+            returned.
+            If count was < 1, a list with all items that could be read will be
+            returned."""
+        raise NotImplementedError()
+
+    #} END interface
+
+
+class ChannelReader(Reader):
+    """Allows reading from a channel. The reader is thread-safe if the channel is as well"""
+    __slots__ = 'channel'
+
+    def __init__(self, channel):
+        """Initialize this instance from its parent write channel"""
+        self.channel = channel
+
+    #{ Interface
+
+    def read(self, count=0, block=True, timeout=None):
+        # if the channel is closed for writing, we never block
+        # NOTE: is handled by the queue
+        # We don't check for a closed state here has it costs time - most of
+        # the time, it will not be closed, and will bail out automatically once
+        # it gets closed
+
+
+        # in non-blocking mode, its all not a problem
+        out = list()
+        queue = self.channel.queue
+        if not block:
+            # be as fast as possible in non-blocking mode, hence
+            # its a bit 'unrolled'
+            try:
+                if count == 1:
+                    out.append(queue.get(False))
+                elif count < 1:
+                    while True:
+                        out.append(queue.get(False))
+                    # END for each item
+                else:
+                    for i in range(count):
+                        out.append(queue.get(False))
+                    # END for each item
+                # END handle count
+            except Empty:
+                pass
+            # END handle exceptions
+        else:
+            # to get everything into one loop, we set the count accordingly
+            if count == 0:
+                count = sys.maxsize
+            # END handle count
+
+            i = 0
+            while i < count:
+                try:
+                    out.append(queue.get(block, timeout))
+                    i += 1
+                except Empty:
+                    # here we are only if
+                    # someone woke us up to inform us about the queue that changed
+                    # its writable state
+                    # The following branch checks for closed channels, and pulls
+                    # as many items as we need and as possible, before
+                    # leaving the loop.
+                    if not queue.writable():
+                        try:
+                            while i < count:
+                                out.append(queue.get(False, None))
+                                i += 1
+                            # END count loop
+                        except Empty:
+                            break   # out of count loop
+                        # END handle absolutely empty queue
+                    # END handle closed channel
+
+                    # if we are here, we woke up and the channel is not closed
+                    # Either the queue became writable again, which currently shouldn't
+                    # be able to happen in the channel, or someone read with a timeout
+                    # that actually timed out.
+                    # As it timed out, which is the only reason we are here,
+                    # we have to abort
+                    break
+                # END ignore empty
+
+            # END for each item
+        # END handle blocking
+        return out
+
+    #} END interface
+
+
+class CallbackReaderMixin(object):
+    """A channel which sends a callback before items are read from the channel"""
+    # unfortunately, slots can only use direct inheritance, have to turn it off :(
+    # __slots__ = "_pre_cb"
+
+    def __init__(self, *args):
+        super(CallbackReaderMixin, self).__init__(*args)
+        self._pre_cb = None
+        self._post_cb = None
+
+    def set_pre_cb(self, fun = lambda count: None):
+        """
+        Install a callback to call with the item count to be read before any
+        item is actually read from the channel.
+        Exceptions will be propagated.
+        If a function is not provided, the call is effectively uninstalled.
+
+        :return: the previously installed callback or None
+        :note: The callback must be threadsafe if the channel is used by multiple threads."""
+        prev = self._pre_cb
+        self._pre_cb = fun
+        return prev
+
+    def set_post_cb(self, fun = lambda items: items):
+        """
+        Install a callback to call after items have been read, but before
+        they are returned to the caller. The callback may adjust the items and/or the list.
+        If no function is provided, the callback is uninstalled
+
+        :return: the previously installed function"""
+        prev = self._post_cb
+        self._post_cb = fun
+        return prev
+
+    def read(self, count=0, block=True, timeout=None):
+        if self._pre_cb:
+            self._pre_cb(count)
+        items = super(CallbackReaderMixin, self).read(count, block, timeout)
+
+        if self._post_cb:
+            items = self._post_cb(items)
+        return items
+
+
+
+class CallbackChannelReader(CallbackReaderMixin, ChannelReader):
+    """Implements a channel reader with callback functionality"""
+    pass
+
+
+class IteratorReader(Reader):
+    """A Reader allowing to read items from an iterator, instead of a channel.
+    Reads will never block. Its thread-safe"""
+    __slots__ = ("_empty", '_iter', '_lock')
+
+    # the type of the lock to use when reading from the iterator
+    lock_type = threading.Lock
+
+    def __init__(self, iterator):
+        self._empty = False
+        if not hasattr(iterator, 'next') and not (hasattr(iterator, "__next__")):
+            raise ValueError("Iterator %r needs a next() function" % iterator)
+        self._iter = iterator
+        self._lock = self.lock_type()
+
+    def read(self, count=0, block=True, timeout=None):
+        """Non-Blocking implementation of read"""
+        # not threadsafe, but worst thing that could happen is that
+        # we try to get items one more time
+        if self._empty:
+            return list()
+        # END early abort
+
+        self._lock.acquire()
+        try:
+            if count == 0:
+                self._empty = True
+                return list(self._iter)
+            else:
+                out = list()
+                it = self._iter
+                for i in range(count):
+                    try:
+                        out.append(next(it))
+                    except StopIteration:
+                        self._empty = True
+                        break
+                    # END handle empty iterator
+                # END for each item to take
+                return out
+            # END handle count
+        finally:
+            self._lock.release()
+        # END handle locking
+
+
+#} END classes
+
+#{ Constructors
+def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader):
+    """
+    Create a channel, with a reader and a writer
+    :return: tuple(reader, writer)
+    :param ctype: Channel to instantiate
+    :param wctype: The type of the write channel to instantiate
+    :param rctype: The type of the read channel to instantiate"""
+    c = ctype()
+    wc = wtype(c)
+    rc = rtype(c)
+    return wc, rc
+#} END constructors
diff --git a/async/graph.py b/async/graph.py
new file mode 100644
index 0000000..eb5b6c5
--- /dev/null
+++ b/async/graph.py
@@ -0,0 +1,130 @@
+# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo at gmail.com) and contributors
+#
+# This module is part of async and is released under
+# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
+"""Simplistic implementation of a graph"""
+
+__all__ = ('Node', 'Graph')
+
+class Node(object):
+    """A Node in the graph. They know their neighbours, and have an id which should 
+    resolve into a string"""
+    __slots__ = ('in_nodes', 'out_nodes', 'id')
+    
+    def __init__(self, id=None):
+        self.id = id
+        self.in_nodes = list()
+        self.out_nodes = list()
+        
+    def __str__(self):
+        return str(self.id)
+        
+    def __repr__(self):
+        return "%s(%s)" % (type(self).__name__, self.id)
+    
+    
+class Graph(object):
+    """A simple graph implementation, keeping nodes and providing basic access and 
+    editing functions. The performance is only suitable for small graphs of not 
+    more than 10 nodes !"""
+    __slots__ = "nodes"
+    
+    def __init__(self):
+        self.nodes = list()
+
+    def __del__(self):
+        """Deletes bidericational dependencies"""
+        for node in self.nodes:
+            node.in_nodes = None
+            node.out_nodes = None
+        # END cleanup nodes
+        
+        # otherwise the nodes would keep floating around
+    
+
+    def add_node(self, node):
+        """Add a new node to the graph
+        :return: the newly added node"""
+        self.nodes.append(node)
+        return node
+    
+    def remove_node(self, node):
+        """Delete a node from the graph
+        :return: self"""
+        try:
+            del(self.nodes[self.nodes.index(node)])
+        except ValueError:
+            return self
+        # END ignore if it doesn't exist
+        
+        # clear connections
+        for outn in node.out_nodes:
+            del(outn.in_nodes[outn.in_nodes.index(node)])
+        for inn in node.in_nodes:
+            del(inn.out_nodes[inn.out_nodes.index(node)])
+        node.out_nodes = list()
+        node.in_nodes = list()
+        return self
+    
+    def add_edge(self, u, v):
+        """Add an undirected edge between the given nodes u and v.
+        
+        :return: self
+        :raise ValueError: If the new edge would create a cycle"""
+        if u is v:
+            raise ValueError("Cannot connect a node with itself")
+        
+        # are they already connected ?
+        if  u in v.in_nodes and v in u.out_nodes or \
+            v in u.in_nodes and u in v.out_nodes:
+            return self
+        # END handle connection exists
+        
+        # cycle check - if we can reach any of the two by following either ones 
+        # history, its a cycle
+        for start, end in ((u, v), (v,u)):
+            if not start.in_nodes: 
+                continue
+            nodes = start.in_nodes[:]
+            seen = set()
+            # depth first search - its faster
+            while nodes:
+                n = nodes.pop()
+                if n in seen:
+                    continue
+                seen.add(n)
+                if n is end:
+                    raise ValueError("Connecting u with v would create a cycle")
+                nodes.extend(n.in_nodes)
+            # END while we are searching
+        # END for each direction to look
+        
+        # connection is valid, set it up
+        u.out_nodes.append(v)
+        v.in_nodes.append(u)
+        
+        return self
+    
+    def input_inclusive_dfirst_reversed(self, node):
+        """Return all input nodes of the given node, depth first,
+        It will return the actual input node last, as it is required
+        like that by the pool"""
+        stack = [node]
+        seen = set()
+        
+        # depth first
+        out = list()
+        while stack:
+            n = stack.pop()
+            if n in seen:
+                continue
+            seen.add(n)
+            out.append(n)
+            
+            # only proceed in that direction if visitor is fine with it
+            stack.extend(n.in_nodes)
+            # END call visitor
+        # END while walking
+        out.reverse()
+        return out
+        
diff --git a/async/pool.py b/async/pool.py
new file mode 100644
index 0000000..4063bea
--- /dev/null
+++ b/async/pool.py
@@ -0,0 +1,514 @@
+# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo at gmail.com) and contributors
+#
+# This module is part of async and is released under
+# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
+"""Implementation of a thread-pool working with channels"""
+from .thread import WorkerThread
+from threading import Lock
+
+from .util import AsyncQueue
+from .graph import Graph
+from .channel import (
+        ChannelWriter,
+        Channel,
+        SerialChannel,
+        CallbackChannelReader
+    )
+
+import sys
+import weakref
+import logging
+from functools import reduce
+
+log = logging.root
+
+
+__all__ = ('PoolReader', 'Pool', 'ThreadPool')
+
+
+class PoolReader(CallbackChannelReader):
+    """A reader designed to read from channels which take part in pools
+    It acts like a handle to the underlying task in the pool."""
+    __slots__ = ('_task_ref', '_pool_ref')
+
+    def __init__(self, channel, task, pool):
+        CallbackChannelReader.__init__(self, channel)
+        self._task_ref = weakref.ref(task)
+        self._pool_ref = weakref.ref(pool)
+
+    def __del__(self):
+        """Assures that our task will be deleted if we were the last reader"""
+        task = self._task_ref()
+        if task is None:
+            return
+
+        pool = self._pool_ref()
+        if pool is None:
+            return
+
+        # if this is the last reader to the wc we just handled, there
+        # is no way anyone will ever read from the task again. If so,
+        # delete the task in question, it will take care of itself and orphans
+        # it might leave
+        # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which
+        # I can't explain, but appears to be normal in the destructor
+        # On the caller side, getrefcount returns 2, as expected
+        # When just calling remove_task,
+        # it has no way of knowing that the write channel is about to diminsh.
+        # which is why we pass the info as a private kwarg  - not nice, but
+        # okay for now
+        if sys.getrefcount(self) < 6:
+            pool.remove_task(task, _from_destructor_ = True)
+        # END handle refcount based removal of task
+
+    #{ Internal
+    def _read(self, count=0, block=True, timeout=None):
+        return CallbackChannelReader.read(self, count, block, timeout)
+
+
+    def pool_ref(self):
+        """:return: reference to the pool we belong to"""
+        return self._pool_ref
+
+    def task_ref(self):
+        """:return: reference to the task producing our items"""
+        return self._task_ref
+
+    #} END internal
+
+    #{ Interface
+
+    def task(self):
+        """:return: task we read from
+        :raise ValueError: If the instance is not attached to at task"""
+        task = self._task_ref()
+        if task is None:
+            raise ValueError("PoolReader is not associated with at task anymore")
+        return task
+
+    def pool(self):
+        """:return: pool our task belongs to
+        :raise ValueError: if the instance does not belong to a pool"""
+        pool = self._pool_ref()
+        if pool is None:
+            raise ValueError("PoolReader is not associated with a pool anymore")
+        return pool
+
+
+    #} END interface
+
+    def read(self, count=0, block=True, timeout=None):
+        """Read an item that was processed by one of our threads
+        :note: Triggers task dependency handling needed to provide the necessary input"""
+        # NOTE: we always queue the operation that would give us count items
+        # as tracking the scheduled items or testing the channels size
+        # is in herently unsafe depending on the design of the task network
+        # If we put on tasks onto the queue for every request, we are sure
+        # to always produce enough items, even if the task.min_count actually
+        # provided enough - its better to have some possibly empty task runs
+        # than having and empty queue that blocks.
+
+        # if the user tries to use us to read from a done task, we will never
+        # compute as all produced items are already in the channel
+        task = self._task_ref()
+        if task is None:
+            return list()
+        # END abort if task was deleted
+
+        skip_compute = task.is_done() or task.error()
+
+        ########## prepare ##############################
+        if not skip_compute:
+            self._pool_ref()._prepare_channel_read(task, count)
+        # END prepare pool scheduling
+
+
+        ####### read data ########
+        ##########################
+        # read actual items, tasks were setup to put their output into our channel ( as well )
+        items = CallbackChannelReader.read(self, count, block, timeout)
... 6435 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-async.git



More information about the Python-modules-commits mailing list