[Python-modules-commits] [python-async] 02/08: Imported Upstream version 0.6.2
Takaki Taniguchi
takaki at moszumanska.debian.org
Wed Nov 25 12:54:35 UTC 2015
This is an automated email from the git hooks/post-receive script.
takaki pushed a commit to branch master
in repository python-async.
commit 1f57e7b0c2ec2e7fb4060f697afe0251ea22b8a9
Author: TANIGUCHI Takaki <takaki at asis.media-as.org>
Date: Wed Nov 25 21:46:04 2015 +0900
Imported Upstream version 0.6.2
---
LICENSE | 30 +
MANIFEST.in | 13 +
PKG-INFO | 19 +-
README | 38 --
__init__.py | 42 --
async.egg-info/PKG-INFO | 25 +
async.egg-info/SOURCES.txt | 25 +
async.egg-info/dependency_links.txt | 1 +
async.egg-info/top_level.txt | 1 +
async.egg-info/zip-safe | 1 +
async/__init__.py | 10 +
async/channel.py | 388 ++++++++++++
async/graph.py | 130 +++++
async/pool.py | 514 ++++++++++++++++
async/task.py | 243 ++++++++
async/test/__init__.py | 4 +
async/test/lib.py | 20 +
async/test/task.py | 205 +++++++
async/test/test_channel.py | 110 ++++
async/test/test_example.py | 45 ++
async/test/test_graph.py | 86 +++
async/test/test_performance.py | 57 ++
async/test/test_pool.py | 427 ++++++++++++++
async/test/test_thread.py | 57 ++
async/thread.py | 218 +++++++
async/util.py | 283 +++++++++
channel.py | 381 ------------
graph.py | 126 ----
mod/__init__.py | 0
mod/zlibmodule.c | 1104 -----------------------------------
pool.py | 508 ----------------
setup.cfg | 5 +
setup.py | 135 +++--
task.py | 242 --------
test/__init__.py | 0
test/lib.py | 6 -
test/mod/__init__.py | 0
test/mod/test_zlib.py | 70 ---
test/task.py | 202 -------
test/test_channel.py | 98 ----
test/test_example.py | 44 --
test/test_graph.py | 80 ---
test/test_performance.py | 51 --
test/test_pool.py | 482 ---------------
test/test_task.py | 15 -
test/test_thread.py | 44 --
thread.py | 206 -------
util.py | 279 ---------
48 files changed, 2989 insertions(+), 4081 deletions(-)
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..710010f
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,30 @@
+Copyright (C) 2010, 2011 Sebastian Thiel and contributors
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+
+* Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer in the
+documentation and/or other materials provided with the distribution.
+
+* Neither the name of the async project nor the names of
+its contributors may be used to endorse or promote products derived
+from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..5986d6a
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1,13 @@
+include VERSION
+include LICENSE
+include CHANGES
+include AUTHORS
+include README.rst
+
+graft async/test
+
+global-exclude .git*
+global-exclude *.pyc
+global-exclude *.so
+global-exclude *.dll
+global-exclude *.o
diff --git a/PKG-INFO b/PKG-INFO
index 4d84fe1..841d1af 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
-Metadata-Version: 1.0
+Metadata-Version: 1.1
Name: async
-Version: 0.6.1
+Version: 0.6.2
Summary: Async Framework
Home-page: http://gitorious.org/git-python/async
Author: Sebastian Thiel
@@ -8,3 +8,18 @@ Author-email: byronimo at gmail.com
License: BSD License
Description: Async is a framework to process interdependent tasks in a pool of workers
Platform: UNKNOWN
+Classifier: Development Status :: 7 - Inactive
+Classifier: Environment :: Console
+Classifier: Intended Audience :: Developers
+Classifier: License :: OSI Approved :: BSD License
+Classifier: Operating System :: OS Independent
+Classifier: Operating System :: POSIX
+Classifier: Operating System :: Microsoft :: Windows
+Classifier: Operating System :: MacOS :: MacOS X
+Classifier: Programming Language :: Python
+Classifier: Programming Language :: Python :: 2
+Classifier: Programming Language :: Python :: 2.6
+Classifier: Programming Language :: Python :: 2.7
+Classifier: Programming Language :: Python :: 3
+Classifier: Programming Language :: Python :: 3.3
+Classifier: Programming Language :: Python :: 3.4
diff --git a/README b/README
deleted file mode 100644
index 265f465..0000000
--- a/README
+++ /dev/null
@@ -1,38 +0,0 @@
-async
-=====
-Async aims to make writing asyncronous processing easier. It provides a task-graph
-with interdependent tasks that communicate using blocking channels, allowing
-to delay actual computations until items are requested.
-Tasks will automatically be distributed among 0 or more threads for the actual computation.
-
-Even though the GIL effectively prevents true concurrency, operations which block,
-such as file IO, can be sped up with it already. In conjuction with
-custom c extensions which release the GIL, true concurrency can be obtained as well.
-
-REQUIREMENTS
-============
-
-* Python Nose - for running the tests
-
-SOURCE
-======
-The source is available in a git repository at gitorious and github:
-
-git://gitorious.org/git-python/async.git
-git://github.com/Byron/async.git
-
-Run the tests with
- cd async
- nosetests
-
-MAILING LIST
-============
-http://groups.google.com/group/git-python
-
-ISSUE TRACKER
-=============
-http://byronimo.lighthouseapp.com/projects/51787-gitpython
-
-LICENSE
-=======
-New BSD License
diff --git a/__init__.py b/__init__.py
deleted file mode 100644
index 1d1da11..0000000
--- a/__init__.py
+++ /dev/null
@@ -1,42 +0,0 @@
-"""Initialize the multi-processing package"""
-
-#{ Initialization
-def _init_atexit():
- """Setup an at-exit job to be sure our workers are shutdown correctly before
- the interpreter quits"""
- import atexit
- import thread
- atexit.register(thread.do_terminate_threads)
-
-def _init_signals():
- """Assure we shutdown our threads correctly when being interrupted"""
- import signal
- import thread
- import sys
-
- prev_handler = signal.getsignal(signal.SIGINT)
- def thread_interrupt_handler(signum, frame):
- thread.do_terminate_threads()
- if callable(prev_handler):
- prev_handler(signum, frame)
- raise KeyboardInterrupt()
- # END call previous handler
- # END signal handler
- try:
- signal.signal(signal.SIGINT, thread_interrupt_handler)
- except ValueError:
- # happens if we don't try it from the main thread
- print >> sys.stderr, "Failed to setup thread-interrupt handler. This is usually not critical"
- # END exception handling
-
-
-#} END init
-
-_init_atexit()
-_init_signals()
-
-
-# initial imports
-from task import *
-from pool import *
-from channel import *
diff --git a/async.egg-info/PKG-INFO b/async.egg-info/PKG-INFO
new file mode 100644
index 0000000..841d1af
--- /dev/null
+++ b/async.egg-info/PKG-INFO
@@ -0,0 +1,25 @@
+Metadata-Version: 1.1
+Name: async
+Version: 0.6.2
+Summary: Async Framework
+Home-page: http://gitorious.org/git-python/async
+Author: Sebastian Thiel
+Author-email: byronimo at gmail.com
+License: BSD License
+Description: Async is a framework to process interdependent tasks in a pool of workers
+Platform: UNKNOWN
+Classifier: Development Status :: 7 - Inactive
+Classifier: Environment :: Console
+Classifier: Intended Audience :: Developers
+Classifier: License :: OSI Approved :: BSD License
+Classifier: Operating System :: OS Independent
+Classifier: Operating System :: POSIX
+Classifier: Operating System :: Microsoft :: Windows
+Classifier: Operating System :: MacOS :: MacOS X
+Classifier: Programming Language :: Python
+Classifier: Programming Language :: Python :: 2
+Classifier: Programming Language :: Python :: 2.6
+Classifier: Programming Language :: Python :: 2.7
+Classifier: Programming Language :: Python :: 3
+Classifier: Programming Language :: Python :: 3.3
+Classifier: Programming Language :: Python :: 3.4
diff --git a/async.egg-info/SOURCES.txt b/async.egg-info/SOURCES.txt
new file mode 100644
index 0000000..0937551
--- /dev/null
+++ b/async.egg-info/SOURCES.txt
@@ -0,0 +1,25 @@
+AUTHORS
+LICENSE
+MANIFEST.in
+setup.py
+async/__init__.py
+async/channel.py
+async/graph.py
+async/pool.py
+async/task.py
+async/thread.py
+async/util.py
+async.egg-info/PKG-INFO
+async.egg-info/SOURCES.txt
+async.egg-info/dependency_links.txt
+async.egg-info/top_level.txt
+async.egg-info/zip-safe
+async/test/__init__.py
+async/test/lib.py
+async/test/task.py
+async/test/test_channel.py
+async/test/test_example.py
+async/test/test_graph.py
+async/test/test_performance.py
+async/test/test_pool.py
+async/test/test_thread.py
\ No newline at end of file
diff --git a/async.egg-info/dependency_links.txt b/async.egg-info/dependency_links.txt
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/async.egg-info/dependency_links.txt
@@ -0,0 +1 @@
+
diff --git a/async.egg-info/top_level.txt b/async.egg-info/top_level.txt
new file mode 100644
index 0000000..4812b6f
--- /dev/null
+++ b/async.egg-info/top_level.txt
@@ -0,0 +1 @@
+async
diff --git a/async.egg-info/zip-safe b/async.egg-info/zip-safe
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/async.egg-info/zip-safe
@@ -0,0 +1 @@
+
diff --git a/async/__init__.py b/async/__init__.py
new file mode 100644
index 0000000..3f337af
--- /dev/null
+++ b/async/__init__.py
@@ -0,0 +1,10 @@
+# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo at gmail.com) and contributors
+#
+# This module is part of async and is released under
+# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
+"""Initialize the multi-processing package"""
+
+# initial imports
+from .task import *
+from .pool import *
+from .channel import *
diff --git a/async/channel.py b/async/channel.py
new file mode 100644
index 0000000..3f496c8
--- /dev/null
+++ b/async/channel.py
@@ -0,0 +1,388 @@
+# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo at gmail.com) and contributors
+#
+# This module is part of async and is released under
+# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
+"""Contains a queue based channel implementation"""
+try:
+ from queue import Empty
+except ImportError:
+ from Queue import Empty
+
+from .util import (
+ AsyncQueue,
+ SyncQueue,
+ ReadOnly
+ )
+
+import threading
+import sys
+
+__all__ = ( 'Channel', 'SerialChannel', 'Writer', 'ChannelWriter', 'CallbackChannelWriter',
+ 'Reader', 'ChannelReader', 'CallbackChannelReader', 'mkchannel', 'ReadOnly',
+ 'IteratorReader', 'CallbackReaderMixin', 'CallbackWriterMixin')
+
+#{ Classes
+class Channel(object):
+ """A channel is similar to a file like object. It has a write end as well as one or
+ more read ends. If Data is in the channel, it can be read, if not the read operation
+ will block until data becomes available.
+ If the channel is closed, any read operation will result in an exception
+
+ This base class is not instantiated directly, but instead serves as constructor
+ for Rwriter pairs.
+
+ Create a new channel """
+ __slots__ = 'queue'
+
+ # The queue to use to store the actual data
+ QueueCls = AsyncQueue
+
+ def __init__(self):
+ """initialize this instance with a queue holding the channel contents"""
+ self.queue = self.QueueCls()
+
+
+class SerialChannel(Channel):
+ """A slightly faster version of a Channel, which sacrificed thead-safety for performance"""
+ QueueCls = SyncQueue
+
+
+class Writer(object):
+ """A writer is an object providing write access to a possibly blocking reading device"""
+ __slots__ = tuple()
+
+ #{ Interface
+
+ def __init__(self, device):
+ """Initialize the instance with the device to write to"""
+
+ def write(self, item, block=True, timeout=None):
+ """Write the given item into the device
+ :param block: True if the device may block until space for the item is available
+ :param timeout: The time in seconds to wait for the device to become ready
+ in blocking mode"""
+ raise NotImplementedError()
+
+ def size(self):
+ """:return: number of items already in the device, they could be read with a reader"""
+ raise NotImplementedError()
+
+ def close(self):
+ """Close the channel. Multiple close calls on a closed channel are no
+ an error"""
+ raise NotImplementedError()
+
+ def closed(self):
+ """:return: True if the channel was closed"""
+ raise NotImplementedError()
+
+ #} END interface
+
+
+class ChannelWriter(Writer):
+ """The write end of a channel, a file-like interface for a channel"""
+ __slots__ = ('channel', '_put')
+
+ def __init__(self, channel):
+ """Initialize the writer to use the given channel"""
+ self.channel = channel
+ self._put = self.channel.queue.put
+
+ #{ Interface
+ def write(self, item, block=False, timeout=None):
+ return self._put(item, block, timeout)
+
+ def size(self):
+ return self.channel.queue.qsize()
+
+ def close(self):
+ """Close the channel. Multiple close calls on a closed channel are no
+ an error"""
+ self.channel.queue.set_writable(False)
+
+ def closed(self):
+ """:return: True if the channel was closed"""
+ return not self.channel.queue.writable()
+ #} END interface
+
+
+class CallbackWriterMixin(object):
+ """The write end of a channel which allows you to setup a callback to be
+ called after an item was written to the channel"""
+ # slots don't work with mixin's :(
+ # __slots__ = ('_pre_cb')
+
+ def __init__(self, *args):
+ super(CallbackWriterMixin, self).__init__(*args)
+ self._pre_cb = None
+
+ def set_pre_cb(self, fun = lambda item: item):
+ """
+ Install a callback to be called before the given item is written.
+ It returns a possibly altered item which will be written to the channel
+ instead, making it useful for pre-write item conversions.
+ Providing None uninstalls the current method.
+
+ :return: the previously installed function or None
+ :note: Must be thread-safe if the channel is used in multiple threads"""
+ prev = self._pre_cb
+ self._pre_cb = fun
+ return prev
+
+ def write(self, item, block=True, timeout=None):
+ if self._pre_cb:
+ item = self._pre_cb(item)
+ super(CallbackWriterMixin, self).write(item, block, timeout)
+
+
+class CallbackChannelWriter(CallbackWriterMixin, ChannelWriter):
+ """Implements a channel writer with callback functionality"""
+ pass
+
+
+class Reader(object):
+ """Allows reading from a device"""
+ __slots__ = tuple()
+
+ #{ Interface
+ def __init__(self, device):
+ """Initialize the instance with the device to read from"""
+
+ #{ Iterator protocol
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ """Implements the iterator protocol, iterating individual items"""
+ items = self.read(1)
+ if items:
+ return items[0]
+ raise StopIteration
+
+ def next(self):
+ """Support the Python 2 iterator syntax"""
+ return self.__next__()
+
+ #} END iterator protocol
+
+
+ #{ Interface
+
+ def read(self, count=0, block=True, timeout=None):
+ """
+ read a list of items read from the device. The list, as a sequence
+ of items, is similar to the string of characters returned when reading from
+ file like objects.
+
+ :param count: given amount of items to read. If < 1, all items will be read
+ :param block: if True, the call will block until an item is available
+ :param timeout: if positive and block is True, it will block only for the
+ given amount of seconds, returning the items it received so far.
+ The timeout is applied to each read item, not for the whole operation.
+ :return: single item in a list if count is 1, or a list of count items.
+ If the device was empty and count was 1, an empty list will be returned.
+ If count was greater 1, a list with less than count items will be
+ returned.
+ If count was < 1, a list with all items that could be read will be
+ returned."""
+ raise NotImplementedError()
+
+ #} END interface
+
+
+class ChannelReader(Reader):
+ """Allows reading from a channel. The reader is thread-safe if the channel is as well"""
+ __slots__ = 'channel'
+
+ def __init__(self, channel):
+ """Initialize this instance from its parent write channel"""
+ self.channel = channel
+
+ #{ Interface
+
+ def read(self, count=0, block=True, timeout=None):
+ # if the channel is closed for writing, we never block
+ # NOTE: is handled by the queue
+ # We don't check for a closed state here has it costs time - most of
+ # the time, it will not be closed, and will bail out automatically once
+ # it gets closed
+
+
+ # in non-blocking mode, its all not a problem
+ out = list()
+ queue = self.channel.queue
+ if not block:
+ # be as fast as possible in non-blocking mode, hence
+ # its a bit 'unrolled'
+ try:
+ if count == 1:
+ out.append(queue.get(False))
+ elif count < 1:
+ while True:
+ out.append(queue.get(False))
+ # END for each item
+ else:
+ for i in range(count):
+ out.append(queue.get(False))
+ # END for each item
+ # END handle count
+ except Empty:
+ pass
+ # END handle exceptions
+ else:
+ # to get everything into one loop, we set the count accordingly
+ if count == 0:
+ count = sys.maxsize
+ # END handle count
+
+ i = 0
+ while i < count:
+ try:
+ out.append(queue.get(block, timeout))
+ i += 1
+ except Empty:
+ # here we are only if
+ # someone woke us up to inform us about the queue that changed
+ # its writable state
+ # The following branch checks for closed channels, and pulls
+ # as many items as we need and as possible, before
+ # leaving the loop.
+ if not queue.writable():
+ try:
+ while i < count:
+ out.append(queue.get(False, None))
+ i += 1
+ # END count loop
+ except Empty:
+ break # out of count loop
+ # END handle absolutely empty queue
+ # END handle closed channel
+
+ # if we are here, we woke up and the channel is not closed
+ # Either the queue became writable again, which currently shouldn't
+ # be able to happen in the channel, or someone read with a timeout
+ # that actually timed out.
+ # As it timed out, which is the only reason we are here,
+ # we have to abort
+ break
+ # END ignore empty
+
+ # END for each item
+ # END handle blocking
+ return out
+
+ #} END interface
+
+
+class CallbackReaderMixin(object):
+ """A channel which sends a callback before items are read from the channel"""
+ # unfortunately, slots can only use direct inheritance, have to turn it off :(
+ # __slots__ = "_pre_cb"
+
+ def __init__(self, *args):
+ super(CallbackReaderMixin, self).__init__(*args)
+ self._pre_cb = None
+ self._post_cb = None
+
+ def set_pre_cb(self, fun = lambda count: None):
+ """
+ Install a callback to call with the item count to be read before any
+ item is actually read from the channel.
+ Exceptions will be propagated.
+ If a function is not provided, the call is effectively uninstalled.
+
+ :return: the previously installed callback or None
+ :note: The callback must be threadsafe if the channel is used by multiple threads."""
+ prev = self._pre_cb
+ self._pre_cb = fun
+ return prev
+
+ def set_post_cb(self, fun = lambda items: items):
+ """
+ Install a callback to call after items have been read, but before
+ they are returned to the caller. The callback may adjust the items and/or the list.
+ If no function is provided, the callback is uninstalled
+
+ :return: the previously installed function"""
+ prev = self._post_cb
+ self._post_cb = fun
+ return prev
+
+ def read(self, count=0, block=True, timeout=None):
+ if self._pre_cb:
+ self._pre_cb(count)
+ items = super(CallbackReaderMixin, self).read(count, block, timeout)
+
+ if self._post_cb:
+ items = self._post_cb(items)
+ return items
+
+
+
+class CallbackChannelReader(CallbackReaderMixin, ChannelReader):
+ """Implements a channel reader with callback functionality"""
+ pass
+
+
+class IteratorReader(Reader):
+ """A Reader allowing to read items from an iterator, instead of a channel.
+ Reads will never block. Its thread-safe"""
+ __slots__ = ("_empty", '_iter', '_lock')
+
+ # the type of the lock to use when reading from the iterator
+ lock_type = threading.Lock
+
+ def __init__(self, iterator):
+ self._empty = False
+ if not hasattr(iterator, 'next') and not (hasattr(iterator, "__next__")):
+ raise ValueError("Iterator %r needs a next() function" % iterator)
+ self._iter = iterator
+ self._lock = self.lock_type()
+
+ def read(self, count=0, block=True, timeout=None):
+ """Non-Blocking implementation of read"""
+ # not threadsafe, but worst thing that could happen is that
+ # we try to get items one more time
+ if self._empty:
+ return list()
+ # END early abort
+
+ self._lock.acquire()
+ try:
+ if count == 0:
+ self._empty = True
+ return list(self._iter)
+ else:
+ out = list()
+ it = self._iter
+ for i in range(count):
+ try:
+ out.append(next(it))
+ except StopIteration:
+ self._empty = True
+ break
+ # END handle empty iterator
+ # END for each item to take
+ return out
+ # END handle count
+ finally:
+ self._lock.release()
+ # END handle locking
+
+
+#} END classes
+
+#{ Constructors
+def mkchannel(ctype = Channel, wtype = ChannelWriter, rtype = ChannelReader):
+ """
+ Create a channel, with a reader and a writer
+ :return: tuple(reader, writer)
+ :param ctype: Channel to instantiate
+ :param wctype: The type of the write channel to instantiate
+ :param rctype: The type of the read channel to instantiate"""
+ c = ctype()
+ wc = wtype(c)
+ rc = rtype(c)
+ return wc, rc
+#} END constructors
diff --git a/async/graph.py b/async/graph.py
new file mode 100644
index 0000000..eb5b6c5
--- /dev/null
+++ b/async/graph.py
@@ -0,0 +1,130 @@
+# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo at gmail.com) and contributors
+#
+# This module is part of async and is released under
+# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
+"""Simplistic implementation of a graph"""
+
+__all__ = ('Node', 'Graph')
+
+class Node(object):
+ """A Node in the graph. They know their neighbours, and have an id which should
+ resolve into a string"""
+ __slots__ = ('in_nodes', 'out_nodes', 'id')
+
+ def __init__(self, id=None):
+ self.id = id
+ self.in_nodes = list()
+ self.out_nodes = list()
+
+ def __str__(self):
+ return str(self.id)
+
+ def __repr__(self):
+ return "%s(%s)" % (type(self).__name__, self.id)
+
+
+class Graph(object):
+ """A simple graph implementation, keeping nodes and providing basic access and
+ editing functions. The performance is only suitable for small graphs of not
+ more than 10 nodes !"""
+ __slots__ = "nodes"
+
+ def __init__(self):
+ self.nodes = list()
+
+ def __del__(self):
+ """Deletes bidericational dependencies"""
+ for node in self.nodes:
+ node.in_nodes = None
+ node.out_nodes = None
+ # END cleanup nodes
+
+ # otherwise the nodes would keep floating around
+
+
+ def add_node(self, node):
+ """Add a new node to the graph
+ :return: the newly added node"""
+ self.nodes.append(node)
+ return node
+
+ def remove_node(self, node):
+ """Delete a node from the graph
+ :return: self"""
+ try:
+ del(self.nodes[self.nodes.index(node)])
+ except ValueError:
+ return self
+ # END ignore if it doesn't exist
+
+ # clear connections
+ for outn in node.out_nodes:
+ del(outn.in_nodes[outn.in_nodes.index(node)])
+ for inn in node.in_nodes:
+ del(inn.out_nodes[inn.out_nodes.index(node)])
+ node.out_nodes = list()
+ node.in_nodes = list()
+ return self
+
+ def add_edge(self, u, v):
+ """Add an undirected edge between the given nodes u and v.
+
+ :return: self
+ :raise ValueError: If the new edge would create a cycle"""
+ if u is v:
+ raise ValueError("Cannot connect a node with itself")
+
+ # are they already connected ?
+ if u in v.in_nodes and v in u.out_nodes or \
+ v in u.in_nodes and u in v.out_nodes:
+ return self
+ # END handle connection exists
+
+ # cycle check - if we can reach any of the two by following either ones
+ # history, its a cycle
+ for start, end in ((u, v), (v,u)):
+ if not start.in_nodes:
+ continue
+ nodes = start.in_nodes[:]
+ seen = set()
+ # depth first search - its faster
+ while nodes:
+ n = nodes.pop()
+ if n in seen:
+ continue
+ seen.add(n)
+ if n is end:
+ raise ValueError("Connecting u with v would create a cycle")
+ nodes.extend(n.in_nodes)
+ # END while we are searching
+ # END for each direction to look
+
+ # connection is valid, set it up
+ u.out_nodes.append(v)
+ v.in_nodes.append(u)
+
+ return self
+
+ def input_inclusive_dfirst_reversed(self, node):
+ """Return all input nodes of the given node, depth first,
+ It will return the actual input node last, as it is required
+ like that by the pool"""
+ stack = [node]
+ seen = set()
+
+ # depth first
+ out = list()
+ while stack:
+ n = stack.pop()
+ if n in seen:
+ continue
+ seen.add(n)
+ out.append(n)
+
+ # only proceed in that direction if visitor is fine with it
+ stack.extend(n.in_nodes)
+ # END call visitor
+ # END while walking
+ out.reverse()
+ return out
+
diff --git a/async/pool.py b/async/pool.py
new file mode 100644
index 0000000..4063bea
--- /dev/null
+++ b/async/pool.py
@@ -0,0 +1,514 @@
+# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo at gmail.com) and contributors
+#
+# This module is part of async and is released under
+# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
+"""Implementation of a thread-pool working with channels"""
+from .thread import WorkerThread
+from threading import Lock
+
+from .util import AsyncQueue
+from .graph import Graph
+from .channel import (
+ ChannelWriter,
+ Channel,
+ SerialChannel,
+ CallbackChannelReader
+ )
+
+import sys
+import weakref
+import logging
+from functools import reduce
+
+log = logging.root
+
+
+__all__ = ('PoolReader', 'Pool', 'ThreadPool')
+
+
+class PoolReader(CallbackChannelReader):
+ """A reader designed to read from channels which take part in pools
+ It acts like a handle to the underlying task in the pool."""
+ __slots__ = ('_task_ref', '_pool_ref')
+
+ def __init__(self, channel, task, pool):
+ CallbackChannelReader.__init__(self, channel)
+ self._task_ref = weakref.ref(task)
+ self._pool_ref = weakref.ref(pool)
+
+ def __del__(self):
+ """Assures that our task will be deleted if we were the last reader"""
+ task = self._task_ref()
+ if task is None:
+ return
+
+ pool = self._pool_ref()
+ if pool is None:
+ return
+
+ # if this is the last reader to the wc we just handled, there
+ # is no way anyone will ever read from the task again. If so,
+ # delete the task in question, it will take care of itself and orphans
+ # it might leave
+ # 1 is ourselves, + 1 for the call + 1, and 3 magical ones which
+ # I can't explain, but appears to be normal in the destructor
+ # On the caller side, getrefcount returns 2, as expected
+ # When just calling remove_task,
+ # it has no way of knowing that the write channel is about to diminsh.
+ # which is why we pass the info as a private kwarg - not nice, but
+ # okay for now
+ if sys.getrefcount(self) < 6:
+ pool.remove_task(task, _from_destructor_ = True)
+ # END handle refcount based removal of task
+
+ #{ Internal
+ def _read(self, count=0, block=True, timeout=None):
+ return CallbackChannelReader.read(self, count, block, timeout)
+
+
+ def pool_ref(self):
+ """:return: reference to the pool we belong to"""
+ return self._pool_ref
+
+ def task_ref(self):
+ """:return: reference to the task producing our items"""
+ return self._task_ref
+
+ #} END internal
+
+ #{ Interface
+
+ def task(self):
+ """:return: task we read from
+ :raise ValueError: If the instance is not attached to at task"""
+ task = self._task_ref()
+ if task is None:
+ raise ValueError("PoolReader is not associated with at task anymore")
+ return task
+
+ def pool(self):
+ """:return: pool our task belongs to
+ :raise ValueError: if the instance does not belong to a pool"""
+ pool = self._pool_ref()
+ if pool is None:
+ raise ValueError("PoolReader is not associated with a pool anymore")
+ return pool
+
+
+ #} END interface
+
+ def read(self, count=0, block=True, timeout=None):
+ """Read an item that was processed by one of our threads
+ :note: Triggers task dependency handling needed to provide the necessary input"""
+ # NOTE: we always queue the operation that would give us count items
+ # as tracking the scheduled items or testing the channels size
+ # is in herently unsafe depending on the design of the task network
+ # If we put on tasks onto the queue for every request, we are sure
+ # to always produce enough items, even if the task.min_count actually
+ # provided enough - its better to have some possibly empty task runs
+ # than having and empty queue that blocks.
+
+ # if the user tries to use us to read from a done task, we will never
+ # compute as all produced items are already in the channel
+ task = self._task_ref()
+ if task is None:
+ return list()
+ # END abort if task was deleted
+
+ skip_compute = task.is_done() or task.error()
+
+ ########## prepare ##############################
+ if not skip_compute:
+ self._pool_ref()._prepare_channel_read(task, count)
+ # END prepare pool scheduling
+
+
+ ####### read data ########
+ ##########################
+ # read actual items, tasks were setup to put their output into our channel ( as well )
+ items = CallbackChannelReader.read(self, count, block, timeout)
... 6435 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-async.git
More information about the Python-modules-commits
mailing list