[Python-modules-commits] [python-cassandra-driver] 01/05: Import python-cassandra-driver_3.7.0.orig.tar.gz

Sandro Tosi morph at moszumanska.debian.org
Sun Oct 2 18:39:34 UTC 2016


This is an automated email from the git hooks/post-receive script.

morph pushed a commit to branch master
in repository python-cassandra-driver.

commit 7119ff07d3a40cf67657df33b4e806cb55399a7b
Author: Sandro Tosi <morph at debian.org>
Date:   Sun Oct 2 13:53:32 2016 -0400

    Import python-cassandra-driver_3.7.0.orig.tar.gz
---
 PKG-INFO                              |   6 +-
 README.rst                            |   4 -
 cassandra/__init__.py                 |  32 +-
 cassandra/cluster.py                  | 884 +++++++++++++++++++++++++---------
 cassandra/concurrent.py               |  14 +-
 cassandra/connection.py               |  28 +-
 cassandra/cqlengine/columns.py        |  97 +++-
 cassandra/cqlengine/connection.py     | 283 +++++++----
 cassandra/cqlengine/functions.py      |   6 +-
 cassandra/cqlengine/management.py     | 193 +++++---
 cassandra/cqlengine/models.py         |  86 +++-
 cassandra/cqlengine/named.py          |  11 +-
 cassandra/cqlengine/query.py          | 179 ++++++-
 cassandra/cqlengine/statements.py     |   4 +-
 cassandra/cqlengine/usertype.py       |   9 +-
 cassandra/cqltypes.py                 |  24 +-
 cassandra/encoder.py                  |   3 +-
 cassandra/io/asyncorereactor.py       |   2 +
 cassandra/io/eventletreactor.py       |  14 -
 cassandra/io/geventreactor.py         |  18 +-
 cassandra/io/libevreactor.py          |  10 +-
 cassandra/metadata.py                 |  72 ++-
 cassandra/metrics.py                  |  35 +-
 cassandra/numpy_parser.pyx            |  30 +-
 cassandra/policies.py                 |  84 +++-
 cassandra/pool.py                     |  20 +-
 cassandra/protocol.py                 | 175 +++++--
 cassandra/query.py                    |  44 +-
 cassandra/row_parser.pyx              |   4 +-
 cassandra/type_codes.py               |   1 -
 cassandra/util.py                     |   4 +
 cassandra_driver.egg-info/PKG-INFO    |   6 +-
 cassandra_driver.egg-info/SOURCES.txt |   2 -
 cassandra_driver.egg-info/pbr.json    |   1 -
 setup.cfg                             |  10 -
 35 files changed, 1754 insertions(+), 641 deletions(-)

diff --git a/PKG-INFO b/PKG-INFO
index d1b5ebc..e076ef9 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: cassandra-driver
-Version: 3.4.1
+Version: 3.7.0
 Summary: Python driver for Cassandra
 Home-page: http://github.com/datastax/python-driver
 Author: Tyler Hobbs
@@ -78,10 +78,6 @@ Description: DataStax Python Driver for Apache Cassandra
         For IRC, use the #datastax-drivers channel on irc.freenode.net.  If you don't have an IRC client,
         you can use `freenode's web-based client <http://webchat.freenode.net/?channels=#datastax-drivers>`_.
         
-        Features to be Added
-        --------------------
-        * C extension for encoding/decoding messages
-        
         License
         -------
         Copyright 2013-2016 DataStax
diff --git a/README.rst b/README.rst
index 88d50da..b0726a5 100644
--- a/README.rst
+++ b/README.rst
@@ -70,10 +70,6 @@ and the IRC channel.
 For IRC, use the #datastax-drivers channel on irc.freenode.net.  If you don't have an IRC client,
 you can use `freenode's web-based client <http://webchat.freenode.net/?channels=#datastax-drivers>`_.
 
-Features to be Added
---------------------
-* C extension for encoding/decoding messages
-
 License
 -------
 Copyright 2013-2016 DataStax
diff --git a/cassandra/__init__.py b/cassandra/__init__.py
index 81172c8..8b348df 100644
--- a/cassandra/__init__.py
+++ b/cassandra/__init__.py
@@ -22,7 +22,7 @@ class NullHandler(logging.Handler):
 
 logging.getLogger('cassandra').addHandler(NullHandler())
 
-__version_info__ = (3, 4, 1)
+__version_info__ = (3, 7, 0)
 __version__ = '.'.join(map(str, __version_info__))
 
 
@@ -325,16 +325,34 @@ class CoordinationFailure(RequestExecutionException):
     The number of replicas that sent a failure message
     """
 
-    def __init__(self, summary_message, consistency=None, required_responses=None, received_responses=None, failures=None):
+    error_code_map = None
+    """
+    A map of inet addresses to error codes representing replicas that sent
+    a failure message.  Only set when `protocol_version` is 5 or higher.
+    """
+
+    def __init__(self, summary_message, consistency=None, required_responses=None,
+                 received_responses=None, failures=None, error_code_map=None):
         self.consistency = consistency
         self.required_responses = required_responses
         self.received_responses = received_responses
         self.failures = failures
-        Exception.__init__(self, summary_message + ' info=' +
-                           repr({'consistency': consistency_value_to_name(consistency),
-                                 'required_responses': required_responses,
-                                 'received_responses': received_responses,
-                                 'failures': failures}))
+        self.error_code_map = error_code_map
+
+        info_dict = {
+            'consistency': consistency_value_to_name(consistency),
+            'required_responses': required_responses,
+            'received_responses': received_responses,
+            'failures': failures
+        }
+
+        if error_code_map is not None:
+            # make error codes look like "0x002a"
+            formatted_map = dict((addr, '0x%04x' % err_code)
+                                 for (addr, err_code) in error_code_map.items())
+            info_dict['error_code_map'] = formatted_map
+
+        Exception.__init__(self, summary_message + ' info=' + repr(info_dict))
 
 
 class ReadFailure(CoordinationFailure):
diff --git a/cassandra/cluster.py b/cassandra/cluster.py
index d311988..d5fa3af 100644
--- a/cassandra/cluster.py
+++ b/cassandra/cluster.py
@@ -20,18 +20,19 @@ from __future__ import absolute_import
 
 import atexit
 from collections import defaultdict, Mapping
-from concurrent.futures import ThreadPoolExecutor
+from concurrent.futures import ThreadPoolExecutor, FIRST_COMPLETED, wait as wait_futures
+from copy import copy
+from functools import partial, wraps
+from itertools import groupby, count
 import logging
 from random import random
+import six
+from six.moves import filter, range, queue as Queue
 import socket
 import sys
 import time
 from threading import Lock, RLock, Thread, Event
 
-import six
-from six.moves import range
-from six.moves import queue as Queue
-
 import weakref
 from weakref import WeakValueDictionary
 try:
@@ -39,9 +40,6 @@ try:
 except ImportError:
     from cassandra.util import WeakSet  # NOQA
 
-from functools import partial, wraps
-from itertools import groupby, count
-
 from cassandra import (ConsistencyLevel, AuthenticationFailed,
                        OperationTimedOut, UnsupportedOperation,
                        SchemaTargetType, DriverException)
@@ -64,7 +62,8 @@ from cassandra.protocol import (QueryMessage, ResultMessage,
 from cassandra.metadata import Metadata, protect_name, murmur3
 from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy,
                                 ExponentialReconnectionPolicy, HostDistance,
-                                RetryPolicy, IdentityTranslator)
+                                RetryPolicy, IdentityTranslator, NoSpeculativeExecutionPlan,
+                                NoSpeculativeExecutionPolicy)
 from cassandra.pool import (Host, _ReconnectionHandler, _HostReconnectionHandler,
                             HostConnectionPool, HostConnection,
                             NoConnectionsAvailable)
@@ -184,16 +183,133 @@ def _shutdown_clusters():
 atexit.register(_shutdown_clusters)
 
 
-# murmur3 implementation required for TokenAware is only available for CPython
-import platform
-if platform.python_implementation() == 'CPython':
-    def default_lbp_factory():
-        if murmur3 is not None:
-            return TokenAwarePolicy(DCAwareRoundRobinPolicy())
-        return DCAwareRoundRobinPolicy()
-else:
-    def default_lbp_factory():
-        return DCAwareRoundRobinPolicy()
+def default_lbp_factory():
+    if murmur3 is not None:
+        return TokenAwarePolicy(DCAwareRoundRobinPolicy())
+    return DCAwareRoundRobinPolicy()
+
+
+class ExecutionProfile(object):
+    load_balancing_policy = None
+    """
+    An instance of :class:`.policies.LoadBalancingPolicy` or one of its subclasses.
+
+    Used in determining host distance for establishing connections, and routing requests.
+
+    Defaults to ``TokenAwarePolicy(DCAwareRoundRobinPolicy())`` if not specified
+    """
+
+    retry_policy = None
+    """
+    An instance of :class:`.policies.RetryPolicy` instance used when :class:`.Statement` objects do not have a
+    :attr:`~.Statement.retry_policy` explicitly set.
+
+    Defaults to :class:`.RetryPolicy` if not specified
+    """
+
+    consistency_level = ConsistencyLevel.LOCAL_ONE
+    """
+    :class:`.ConsistencyLevel` used when not specified on a :class:`.Statement`.
+    """
+
+    serial_consistency_level = None
+    """
+    Serial :class:`.ConsistencyLevel` used when not specified on a :class:`.Statement` (for LWT conditional statements).
+    """
+
+    request_timeout = 10.0
+    """
+    Request timeout used when not overridden in :meth:`.Session.execute`
+    """
+
+    row_factory = staticmethod(tuple_factory)
+    """
+    A callable to format results, accepting ``(colnames, rows)`` where ``colnames`` is a list of column names, and
+    ``rows`` is a list of tuples, with each tuple representing a row of parsed values.
+
+    Some example implementations:
+
+        - :func:`cassandra.query.tuple_factory` - return a result row as a tuple
+        - :func:`cassandra.query.named_tuple_factory` - return a result row as a named tuple
+        - :func:`cassandra.query.dict_factory` - return a result row as a dict
+        - :func:`cassandra.query.ordered_dict_factory` - return a result row as an OrderedDict
+    """
+
+    speculative_execution_policy = None
+    """
+    An instance of :class:`.policies.SpeculativeExecutionPolicy`
+
+    Defaults to :class:`.NoSpeculativeExecutionPolicy` if not specified
+    """
+
+    def __init__(self, load_balancing_policy=None, retry_policy=None,
+                 consistency_level=ConsistencyLevel.LOCAL_ONE, serial_consistency_level=None,
+                 request_timeout=10.0, row_factory=named_tuple_factory, speculative_execution_policy=None):
+        self.load_balancing_policy = load_balancing_policy or default_lbp_factory()
+        self.retry_policy = retry_policy or RetryPolicy()
+        self.consistency_level = consistency_level
+        self.serial_consistency_level = serial_consistency_level
+        self.request_timeout = request_timeout
+        self.row_factory = row_factory
+        self.speculative_execution_policy = speculative_execution_policy or NoSpeculativeExecutionPolicy()
+
+
+class ProfileManager(object):
+
+    def __init__(self):
+        self.profiles = dict()
+
+    def distance(self, host):
+        distances = set(p.load_balancing_policy.distance(host) for p in self.profiles.values())
+        return HostDistance.LOCAL if HostDistance.LOCAL in distances else \
+            HostDistance.REMOTE if HostDistance.REMOTE in distances else \
+            HostDistance.IGNORED
+
+    def populate(self, cluster, hosts):
+        for p in self.profiles.values():
+            p.load_balancing_policy.populate(cluster, hosts)
+
+    def check_supported(self):
+        for p in self.profiles.values():
+            p.load_balancing_policy.check_supported()
+
+    def on_up(self, host):
+        for p in self.profiles.values():
+            p.load_balancing_policy.on_up(host)
+
+    def on_down(self, host):
+        for p in self.profiles.values():
+            p.load_balancing_policy.on_down(host)
+
+    def on_add(self, host):
+        for p in self.profiles.values():
+            p.load_balancing_policy.on_add(host)
+
+    def on_remove(self, host):
+        for p in self.profiles.values():
+            p.load_balancing_policy.on_remove(host)
+
+    @property
+    def default(self):
+        """
+        internal-only; no checks are done because this entry is populated on cluster init
+        """
+        return self.profiles[EXEC_PROFILE_DEFAULT]
+
+
+EXEC_PROFILE_DEFAULT = object()
+"""
+Key for the ``Cluster`` default execution profile, used when no other profile is selected in
+``Session.execute(execution_profile)``.
+
+Use this as the key in ``Cluster(execution_profiles)`` to override the default profile.
+"""
+
+
+class _ConfigMode(object):
+    UNCOMMITTED = 0
+    LEGACY = 1
+    PROFILES = 2
 
 
 class Cluster(object):
@@ -243,10 +359,11 @@ class Cluster(object):
     """
     The maximum version of the native protocol to use.
 
-    The driver will automatically downgrade version based on a negotiation with
-    the server, but it is most efficient to set this to the maximum supported
-    by your version of Cassandra. Setting this will also prevent conflicting
-    versions negotiated if your cluster is upgraded.
+    If not set in the constructor, the driver will automatically downgrade
+    version based on a negotiation with the server, but it is most efficient
+    to set this to the maximum supported by your version of Cassandra.
+    Setting this will also prevent conflicting versions negotiated if your
+    cluster is upgraded.
 
     Version 2 of the native protocol adds support for lightweight transactions,
     batch operations, and automatic query paging. The v2 protocol is
@@ -275,6 +392,14 @@ class Cluster(object):
     +-------------------+-------------------+
     | 2.2               | 1, 2, 3, 4        |
     +-------------------+-------------------+
+    | 3.x               | 3, 4              |
+    +-------------------+-------------------+
+    """
+
+    allow_beta_protocol_version = False
+    """
+    Setting true injects a flag in all messages that makes the server accept and use "beta" protocol version.
+    Used for testing new protocol features incrementally before the new version is complete.
     """
 
     compression = True
@@ -326,20 +451,34 @@ class Cluster(object):
 
         self._auth_provider = value
 
-    load_balancing_policy = None
-    """
-    An instance of :class:`.policies.LoadBalancingPolicy` or
-    one of its subclasses.
+    _load_balancing_policy = None
+    @property
+    def load_balancing_policy(self):
+        """
+        An instance of :class:`.policies.LoadBalancingPolicy` or
+        one of its subclasses.
 
-    .. versionchanged:: 2.6.0
+        .. versionchanged:: 2.6.0
 
-    Defaults to :class:`~.TokenAwarePolicy` (:class:`~.DCAwareRoundRobinPolicy`).
-    when using CPython (where the murmur3 extension is available). :class:`~.DCAwareRoundRobinPolicy`
-    otherwise. Default local DC will be chosen from contact points.
+        Defaults to :class:`~.TokenAwarePolicy` (:class:`~.DCAwareRoundRobinPolicy`).
+        when using CPython (where the murmur3 extension is available). :class:`~.DCAwareRoundRobinPolicy`
+        otherwise. Default local DC will be chosen from contact points.
 
-    **Please see** :class:`~.DCAwareRoundRobinPolicy` **for a discussion on default behavior with respect to
-    DC locality and remote nodes.**
-    """
+        **Please see** :class:`~.DCAwareRoundRobinPolicy` **for a discussion on default behavior with respect to
+        DC locality and remote nodes.**
+        """
+        return self._load_balancing_policy
+
+    @load_balancing_policy.setter
+    def load_balancing_policy(self, lbp):
+        if self._config_mode == _ConfigMode.PROFILES:
+            raise ValueError("Cannot set Cluster.load_balancing_policy while using Configuration Profiles. Set this in a profile instead.")
+        self._load_balancing_policy = lbp
+        self._config_mode = _ConfigMode.LEGACY
+
+    @property
+    def _default_load_balancing_policy(self):
+        return self.profile_manager.default.load_balancing_policy
 
     reconnection_policy = ExponentialReconnectionPolicy(1.0, 600.0)
     """
@@ -348,12 +487,22 @@ class Cluster(object):
     a max delay of ten minutes.
     """
 
-    default_retry_policy = RetryPolicy()
-    """
-    A default :class:`.policies.RetryPolicy` instance to use for all
-    :class:`.Statement` objects which do not have a :attr:`~.Statement.retry_policy`
-    explicitly set.
-    """
+    _default_retry_policy = RetryPolicy()
+    @property
+    def default_retry_policy(self):
+        """
+        A default :class:`.policies.RetryPolicy` instance to use for all
+        :class:`.Statement` objects which do not have a :attr:`~.Statement.retry_policy`
+        explicitly set.
+        """
+        return self._default_retry_policy
+
+    @default_retry_policy.setter
+    def default_retry_policy(self, policy):
+        if self._config_mode == _ConfigMode.PROFILES:
+            raise ValueError("Cannot set Cluster.default_retry_policy while using Configuration Profiles. Set this in a profile instead.")
+        self._default_retry_policy = policy
+        self._config_mode = _ConfigMode.LEGACY
 
     conviction_policy_factory = SimpleConvictionPolicy
     """
@@ -570,6 +719,9 @@ class Cluster(object):
     def token_metadata_enabled(self, enabled):
         self.control_connection._token_meta_enabled = bool(enabled)
 
+    profile_manager = None
+    _config_mode = _ConfigMode.UNCOMMITTED
+
     sessions = None
     control_connection = None
     scheduler = None
@@ -579,6 +731,8 @@ class Cluster(object):
     _prepared_statements = None
     _prepared_statement_lock = None
     _idle_heartbeat = None
+    _protocol_version_explicit = False
+    _discount_down_events = True
 
     _user_types = None
     """
@@ -602,7 +756,7 @@ class Cluster(object):
                  ssl_options=None,
                  sockopts=None,
                  cql_version=None,
-                 protocol_version=4,
+                 protocol_version=_NOT_SET,
                  executor_threads=2,
                  max_schema_agreement_wait=10,
                  control_connection_timeout=2.0,
@@ -615,7 +769,9 @@ class Cluster(object):
                  address_translator=None,
                  status_event_refresh_window=2,
                  prepare_on_all_hosts=True,
-                 reprepare_on_up=True):
+                 reprepare_on_up=True,
+                 execution_profiles=None,
+                 allow_beta_protocol_version=False):
         """
         ``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
         extablishing connection pools or refreshing metadata.
@@ -636,7 +792,12 @@ class Cluster(object):
                                         for endpoint in socket.getaddrinfo(a, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM)]
 
         self.compression = compression
-        self.protocol_version = protocol_version
+
+        if protocol_version is not _NOT_SET:
+            self.protocol_version = protocol_version
+            self._protocol_version_explicit = True
+        self.allow_beta_protocol_version = allow_beta_protocol_version
+
         self.auth_provider = auth_provider
 
         if load_balancing_policy is not None:
@@ -644,7 +805,7 @@ class Cluster(object):
                 raise TypeError("load_balancing_policy should not be a class, it should be an instance of that class")
             self.load_balancing_policy = load_balancing_policy
         else:
-            self.load_balancing_policy = default_lbp_factory()
+            self._load_balancing_policy = default_lbp_factory()  # set internal attribute to avoid committing to legacy config mode
 
         if reconnection_policy is not None:
             if isinstance(reconnection_policy, type):
@@ -669,6 +830,24 @@ class Cluster(object):
         if connection_class is not None:
             self.connection_class = connection_class
 
+        self.profile_manager = ProfileManager()
+        self.profile_manager.profiles[EXEC_PROFILE_DEFAULT] = ExecutionProfile(self.load_balancing_policy,
+                                                                               self.default_retry_policy,
+                                                                               Session._default_consistency_level,
+                                                                               Session._default_serial_consistency_level,
+                                                                               Session._default_timeout,
+                                                                               Session._row_factory)
+        # legacy mode if either of these is not default
+        if load_balancing_policy or default_retry_policy:
+            if execution_profiles:
+                raise ValueError("Clusters constructed with execution_profiles should not specify legacy parameters "
+                                 "load_balancing_policy or default_retry_policy. Configure this in a profile instead.")
+            self._config_mode = _ConfigMode.LEGACY
+        else:
+            if execution_profiles:
+                self.profile_manager.profiles.update(execution_profiles)
+                self._config_mode = _ConfigMode.PROFILES
+
         self.metrics_enabled = metrics_enabled
         self.ssl_options = ssl_options
         self.sockopts = sockopts
@@ -789,6 +968,39 @@ class Cluster(object):
             session.user_type_registered(keyspace, user_type, klass)
         UserType.evict_udt_class(keyspace, user_type)
 
+    def add_execution_profile(self, name, profile, pool_wait_timeout=5):
+        """
+        Adds an :class:`.ExecutionProfile` to the cluster. This makes it available for use by ``name`` in :meth:`.Session.execute`
+        and :meth:`.Session.execute_async`. This method will raise if the profile already exists.
+
+        Normally profiles will be injected at cluster initialization via ``Cluster(execution_profiles)``. This method
+        provides a way of adding them dynamically.
+
+        Adding a new profile updates the connection pools according to the specified ``load_balancing_policy``. By default,
+        this method will wait up to five seconds for the pool creation to complete, so the profile can be used immediately
+        upon return. This behavior can be controlled using ``pool_wait_timeout`` (see
+        `concurrent.futures.wait <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.wait>`_
+        for timeout semantics).
+        """
+        if not isinstance(profile, ExecutionProfile):
+            raise TypeError("profile must be an instance of ExecutionProfile")
+        if self._config_mode == _ConfigMode.LEGACY:
+            raise ValueError("Cannot add execution profiles when legacy parameters are set explicitly. TODO: link to doc")
+        if name in self.profile_manager.profiles:
+            raise ValueError("Profile %s already exists")
+        self.profile_manager.profiles[name] = profile
+        profile.load_balancing_policy.populate(self, self.metadata.all_hosts())
+        # on_up after populate allows things like DCA LBP to choose default local dc
+        for host in filter(lambda h: h.is_up, self.metadata.all_hosts()):
+            profile.load_balancing_policy.on_up(host)
+        futures = set()
+        for session in self.sessions:
+            futures.update(session.update_created_pools())
+        _, not_done = wait_futures(futures, pool_wait_timeout)
+        if not_done:
+            raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout.")
+
+
     def get_min_requests_per_connection(self, host_distance):
         return self._min_requests_per_connection[host_distance]
 
@@ -921,10 +1133,14 @@ class Cluster(object):
         kwargs_dict.setdefault('cql_version', self.cql_version)
         kwargs_dict.setdefault('protocol_version', self.protocol_version)
         kwargs_dict.setdefault('user_type_map', self._user_types)
+        kwargs_dict.setdefault('allow_beta_protocol_version', self.allow_beta_protocol_version)
 
         return kwargs_dict
 
     def protocol_downgrade(self, host_addr, previous_version):
+        if self._protocol_version_explicit:
+            raise DriverException("ProtocolError returned from server while using explicitly set client protocol_version %d" % (previous_version,))
+
         new_version = previous_version - 1
         if new_version < self.protocol_version:
             if new_version >= MIN_SUPPORTED_VERSION:
@@ -935,7 +1151,7 @@ class Cluster(object):
             else:
                 raise DriverException("Cannot downgrade protocol version (%d) below minimum supported version: %d" % (new_version, MIN_SUPPORTED_VERSION))
 
-    def connect(self, keyspace=None):
+    def connect(self, keyspace=None, wait_for_all_pools=False):
         """
         Creates and returns a new :class:`~.Session` object.  If `keyspace`
         is specified, that keyspace will be the default keyspace for
@@ -957,11 +1173,18 @@ class Cluster(object):
                         for listener in self.listeners:
                             listener.on_add(host)
 
-                self.load_balancing_policy.populate(
+                self.profile_manager.populate(
                     weakref.proxy(self), self.metadata.all_hosts())
 
                 try:
                     self.control_connection.connect()
+
+                    # we set all contact points up for connecting, but we won't infer state after this
+                    for address in self.contact_points_resolved:
+                        h = self.metadata.get_host(address)
+                        if h and self.profile_manager.distance(h) == HostDistance.IGNORED:
+                            h.is_up = None
+
                     log.debug("Control connection created")
                 except Exception:
                     log.exception("Control connection failed to connect, "
@@ -969,15 +1192,15 @@ class Cluster(object):
                     self.shutdown()
                     raise
 
-                self.load_balancing_policy.check_supported()
+                self.profile_manager.check_supported()  # todo: rename this method
 
                 if self.idle_heartbeat_interval:
                     self._idle_heartbeat = ConnectionHeartbeat(self.idle_heartbeat_interval, self.get_connection_holders)
                 self._is_setup = True
 
-        session = self._new_session()
-        if keyspace:
-            session.set_keyspace(keyspace)
+        session = self._new_session(keyspace)
+        if wait_for_all_pools:
+            wait_futures(session._initial_connect_futures)
         return session
 
     def get_connection_holders(self):
@@ -1021,8 +1244,8 @@ class Cluster(object):
     def __exit__(self, *args):
         self.shutdown()
 
-    def _new_session(self):
-        session = Session(self, self.metadata.all_hosts())
+    def _new_session(self, keyspace):
+        session = Session(self, self.metadata.all_hosts(), keyspace)
         self._session_register_user_types(session)
         self.sessions.add(session)
         return session
@@ -1033,7 +1256,7 @@ class Cluster(object):
                 session.user_type_registered(keyspace, udt_name, klass)
 
     def _cleanup_failed_on_up_handling(self, host):
-        self.load_balancing_policy.on_down(host)
+        self.profile_manager.on_down(host)
         self.control_connection.on_down(host)
         for session in self.sessions:
             session.remove_pool(host)
@@ -1113,8 +1336,8 @@ class Cluster(object):
             for session in self.sessions:
                 session.remove_pool(host)
 
-            log.debug("Signalling to load balancing policy that host %s is up", host)
-            self.load_balancing_policy.on_up(host)
+            log.debug("Signalling to load balancing policies that host %s is up", host)
+            self.profile_manager.on_up(host)
 
             log.debug("Signalling to control connection that host %s is up", host)
             self.control_connection.on_up(host)
@@ -1142,13 +1365,14 @@ class Cluster(object):
         else:
             if not have_future:
                 with host.lock:
+                    host.set_up()
                     host._currently_handling_node_up = False
 
         # for testing purposes
         return futures
 
     def _start_reconnector(self, host, is_host_addition):
-        if self.load_balancing_policy.distance(host) == HostDistance.IGNORED:
+        if self.profile_manager.distance(host) == HostDistance.IGNORED:
             return
 
         schedule = self.reconnection_policy.new_schedule()
@@ -1180,14 +1404,27 @@ class Cluster(object):
             return
 
         with host.lock:
-            if (not host.is_up and not expect_host_to_be_down) or host.is_currently_reconnecting():
-                return
+            was_up = host.is_up
+
+            # ignore down signals if we have open pools to the host
+            # this is to avoid closing pools when a control connection host became isolated
+            if self._discount_down_events and self.profile_manager.distance(host) != HostDistance.IGNORED:
+                connected = False
+                for session in self.sessions:
+                    pool_states = session.get_pool_state()
+                    pool_state = pool_states.get(host)
+                    if pool_state:
+                        connected |= pool_state['open_count'] > 0
+                if connected:
+                    return
 
             host.set_down()
+            if (not was_up and not expect_host_to_be_down) or host.is_currently_reconnecting():
+                return
 
         log.warning("Host %s has been marked down", host)
 
-        self.load_balancing_policy.on_down(host)
+        self.profile_manager.on_down(host)
         self.control_connection.on_down(host)
         for session in self.sessions:
             session.on_down(host)
@@ -1203,12 +1440,12 @@ class Cluster(object):
 
         log.debug("Handling new host %r and notifying listeners", host)
 
-        distance = self.load_balancing_policy.distance(host)
+        distance = self.profile_manager.distance(host)
         if distance != HostDistance.IGNORED:
             self._prepare_all_queries(host)
             log.debug("Done preparing queries for new host %r", host)
 
-        self.load_balancing_policy.on_add(host)
+        self.profile_manager.on_add(host)
         self.control_connection.on_add(host, refresh_nodes)
 
         if distance == HostDistance.IGNORED:
@@ -1273,7 +1510,7 @@ class Cluster(object):
 
         log.debug("Removing host %s", host)
         host.set_down()
-        self.load_balancing_policy.on_remove(host)
+        self.profile_manager.on_remove(host)
         for session in self.sessions:
             session.on_remove(host)
         for listener in self.listeners:
@@ -1360,6 +1597,14 @@ class Cluster(object):
             return SchemaTargetType.KEYSPACE
         return None
 
+    def get_control_connection_host(self):
+        """
+        Returns the control connection host metadata.
+        """
+        connection = self.control_connection._connection
+        host = connection.host if connection else None
+        return self.metadata.get_host(host) if host else None
+
     def refresh_schema_metadata(self, max_schema_agreement_wait=None):
         """
         Synchronously refresh all schema metadata.
@@ -1442,13 +1687,15 @@ class Cluster(object):
                                                       schema_agreement_wait=max_schema_agreement_wait, force=True):
             raise DriverException("User Aggregate metadata was not refreshed. See log for details.")
 
-    def refresh_nodes(self):
+    def refresh_nodes(self, force_token_rebuild=False):
         """
         Synchronously refresh the node list and token metadata
 
+        `force_token_rebuild` can be used to rebuild the token map metadata, even if no new nodes are discovered.
+
         An Exception is raised if node refresh fails for any reason.
         """
-        if not self.control_connection.refresh_node_list_and_token_map():
+        if not self.control_connection.refresh_node_list_and_token_map(force_token_rebuild):
             raise DriverException("Node list was not refreshed. See log for details.")
 
     def set_meta_refresh_enabled(self, enabled):
@@ -1533,54 +1780,85 @@ class Session(object):
     keyspace = None
     is_shutdown = False
 
-    row_factory = staticmethod(named_tuple_factory)
-    """
-    The format to return row results in.  By default, each
-    returned row will be a named tuple.  You can alternatively
-    use any of the following:
+    _row_factory = staticmethod(named_tuple_factory)
+    @property
+    def row_factory(self):
+        """
+        The format to return row results in.  By default, each
+        returned row will be a named tuple.  You can alternatively
+        use any of the following:
 
-      - :func:`cassandra.query.tuple_factory` - return a result row as a tuple
-      - :func:`cassandra.query.named_tuple_factory` - return a result row as a named tuple
-      - :func:`cassandra.query.dict_factory` - return a result row as a dict
-      - :func:`cassandra.query.ordered_dict_factory` - return a result row as an OrderedDict
+          - :func:`cassandra.query.tuple_factory` - return a result row as a tuple
+          - :func:`cassandra.query.named_tuple_factory` - return a result row as a named tuple
+          - :func:`cassandra.query.dict_factory` - return a result row as a dict
+          - :func:`cassandra.query.ordered_dict_factory` - return a result row as an OrderedDict
 
-    """
+        """
+        return self._row_factory
 
-    default_timeout = 10.0
-    """
-    A default timeout, measured in seconds, for queries executed through
-    :meth:`.execute()` or :meth:`.execute_async()`.  This default may be
-    overridden with the `timeout` parameter for either of those methods.
+    @row_factory.setter
+    def row_factory(self, rf):
+        self._validate_set_legacy_config('row_factory', rf)
 
-    Setting this to :const:`None` will cause no timeouts to be set by default.
+    _default_timeout = 10.0
 
-    Please see :meth:`.ResponseFuture.result` for details on the scope and
-    effect of this timeout.
+    @property
+    def default_timeout(self):
+        """
+        A default timeout, measured in seconds, for queries executed through
+        :meth:`.execute()` or :meth:`.execute_async()`.  This default may be
+        overridden with the `timeout` parameter for either of those methods.
 
-    .. versionadded:: 2.0.0
-    """
+        Setting this to :const:`None` will cause no timeouts to be set by default.
 
-    default_consistency_level = ConsistencyLevel.LOCAL_ONE
-    """
-    The default :class:`~ConsistencyLevel` for operations executed through
-    this session.  This default may be overridden by setting the
-    :attr:`~.Statement.consistency_level` on individual statements.
+        Please see :meth:`.ResponseFuture.result` for details on the scope and
+        effect of this timeout.
 
-    .. versionadded:: 1.2.0
+        .. versionadded:: 2.0.0
+        """
+        return self._default_timeout
 
-    .. versionchanged:: 3.0.0
+    @default_timeout.setter
+    def default_timeout(self, timeout):
+        self._validate_set_legacy_config('default_timeout', timeout)
 
-        default changed from ONE to LOCAL_ONE
-    """
+    _default_consistency_level = ConsistencyLevel.LOCAL_ONE
 
-    default_serial_consistency_level = None
-    """
-    The default :class:`~ConsistencyLevel` for serial phase of  conditional updates executed through
-    this session.  This default may be overridden by setting the
-    :attr:`~.Statement.serial_consistency_level` on individual statements.
+    @property
+    def default_consistency_level(self):
+        """
+        The default :class:`~ConsistencyLevel` for operations executed through
+        this session.  This default may be overridden by setting the
+        :attr:`~.Statement.consistency_level` on individual statements.
 
-    Only valid for ``protocol_version >= 2``.
-    """
+        .. versionadded:: 1.2.0
+
+        .. versionchanged:: 3.0.0
+
+            default changed from ONE to LOCAL_ONE
+        """
+        return self._default_consistency_level
+
+    @default_consistency_level.setter
+    def default_consistency_level(self, cl):
+        self._validate_set_legacy_config('default_consistency_level', cl)
+
+    _default_serial_consistency_level = None
+
+    @property
+    def default_serial_consistency_level(self):
+        """
+        The default :class:`~ConsistencyLevel` for serial phase of  conditional updates executed through
+        this session.  This default may be overridden by setting the
+        :attr:`~.Statement.serial_consistency_level` on individual statements.
+
+        Only valid for ``protocol_version >= 2``.
+        """
+        return self._default_serial_consistency_level
+
+    @default_serial_consistency_level.setter
+    def default_serial_consistency_level(self, cl):
+        self._validate_set_legacy_config('default_serial_consistency_level', cl)
 
     max_trace_wait = 2.0
     """
@@ -1654,32 +1932,36 @@ class Session(object):
 
     _lock = None
     _pools = None
-    _load_balancer = None
+    _profile_manager = None
     _metrics = None
+    _request_init_callbacks = None
 
-    def __init__(self, cluster, hosts):
+    def __init__(self, cluster, hosts, keyspace=None):
         self.cluster = cluster
         self.hosts = hosts
+        self.keyspace = keyspace
 
         self._lock = RLock()
         self._pools = {}
-        self._load_balancer = cluster.load_balancing_policy
+        self._profile_manager = cluster.profile_manager
         self._metrics = cluster.metrics
+        self._request_init_callbacks = []
         self._protocol_version = self.cluster.protocol_version
 
         self.encoder = Encoder()
 
         # create connection pools in parallel
-        futures = []
+        self._initial_connect_futures = set()
         for host in hosts:
             future = self.add_or_renew_pool(host, is_host_addition=False)
-            if future is not None:
-                futures.append(future)
+            if future:
+                self._initial_connect_futures.add(future)
 
-        for future in futures:
-            future.result()
+        futures = wait_futures(self._initial_connect_futures, return_when=FIRST_COMPLETED)
+        while futures.not_done and not any(f.result() for f in futures.done):
+            futures = wait_futures(futures.not_done, return_when=FIRST_COMPLETED)
 
-    def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False, custom_payload=None):
+    def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False, custom_payload=None, execution_profile=EXEC_PROFILE_DEFAULT, paging_state=None):
         """
         Execute the given query and synchronously wait for the response.
 
@@ -1706,28 +1988,24 @@ class Session(object):
         `custom_payload` is a :ref:`custom_payload` dict to be passed to the server.
         If `query` is a Statement with its own custom_payload. The message payload
         will be a union of the two, with the values specified here taking precedence.
+
+        `execution_profile` is the execution profile to use for this request. It can be a key to a profile configured
+        via :meth:`Cluster.add_execution_profile` or an instance (from :meth:`Session.execution_profile_clone_update`,
+        for example
+
+        `paging_state` is an optional paging state, reused from a previous :class:`ResultSet`.
         """
-        return self.execute_async(query, parameters, trace, custom_payload, timeout).result()
+        return self.execute_async(query, parameters, trace, custom_payload, timeout, execution_profile, paging_state).result()
 
-    def execute_async(self, query, parameters=None, trace=False, custom_payload=None, timeout=_NOT_SET):
+    def execute_async(self, query, parameters=None, trace=False, custom_payload=None, timeout=_NOT_SET, execution_profile=EXEC_PROFILE_DEFAULT, paging_state=None):
         """
         Execute the given query and return a :class:`~.ResponseFuture` object
         which callbacks may be attached to for asynchronous response
         delivery.  You may also call :meth:`~.ResponseFuture.result()`
-        on the :class:`.ResponseFuture` to syncronously block for results at
+        on the :class:`.ResponseFuture` to synchronously block for results at
         any time.
 
-        If `trace` is set to :const:`True`, you may get the query trace descriptors using
-        :meth:`.ResponseFuture.get_query_trace()` or :meth:`.ResponseFuture.get_all_query_traces()`
-        on the future result.
-
-        `custom_payload` is a :ref:`custom_payload` dict to be passed to the server.
-        If `query` is a Statement with its own custom_payload. The message payload
-        will be a union of the two, with the values specified here taking precedence.
-
-        If the server sends a custom payload in the response message,
-        the dict can be obtained following :meth:`.ResponseFuture.result` via
-        :attr:`.ResponseFuture.custom_payload`
+        See :meth:`Session.execute` for parameter definitions.
 
         Example usage::
 
@@ -1754,15 +2032,13 @@ class Session(object):
             ...     log.exception("Operation failed:")
 
         """
-        if timeout is _NOT_SET:
-            timeout = self.default_timeout
-
-        future = self._create_response_future(query, parameters, trace, custom_payload, timeout)
+        future = self._create_response_future(query, parameters, trace, custom_payload, timeout, execution_profile, paging_state)
         future._protocol_handler = self.client_protocol_handler
+        self._on_request(future)
         future.send_request()
         return future
 
-    def _create_response_future(self, query, parameters, trace, custom_payload, timeout):
+    def _create_response_future(self, query, parameters, trace, custom_payload, timeout, execution_profile=EXEC_PROFILE_DEFAULT, paging_state=None):
         """ Returns the ResponseFuture before calling send_request() on it """
 
         prepared_statement = None
@@ -1772,8 +2048,34 @@ class Session(object):
         elif isinstance(query, PreparedStatement):
             query = query.bind(parameters)
 
-        cl = query.consistency_level if query.consistency_level is not None else self.default_consistency_level
-        serial_cl = query.serial_consistency_level if query.serial_consistency_level is not None else self.default_serial_consistency_level
+        if self.cluster._config_mode == _ConfigMode.LEGACY:
+            if execution_profile is not EXEC_PROFILE_DEFAULT:
+                raise ValueError("Cannot specify execution_profile while using legacy parameters.")
+
+            if timeout is _NOT_SET:
+                timeout = self.default_timeout
+
+            cl = query.consistency_level if query.consistency_level is not None else self.default_consistency_level
... 4040 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-cassandra-driver.git



More information about the Python-modules-commits mailing list