[Python-modules-commits] [python-cassandra-driver] 01/05: Import python-cassandra-driver_3.7.0.orig.tar.gz
Sandro Tosi
morph at moszumanska.debian.org
Sun Oct 2 18:39:34 UTC 2016
This is an automated email from the git hooks/post-receive script.
morph pushed a commit to branch master
in repository python-cassandra-driver.
commit 7119ff07d3a40cf67657df33b4e806cb55399a7b
Author: Sandro Tosi <morph at debian.org>
Date: Sun Oct 2 13:53:32 2016 -0400
Import python-cassandra-driver_3.7.0.orig.tar.gz
---
PKG-INFO | 6 +-
README.rst | 4 -
cassandra/__init__.py | 32 +-
cassandra/cluster.py | 884 +++++++++++++++++++++++++---------
cassandra/concurrent.py | 14 +-
cassandra/connection.py | 28 +-
cassandra/cqlengine/columns.py | 97 +++-
cassandra/cqlengine/connection.py | 283 +++++++----
cassandra/cqlengine/functions.py | 6 +-
cassandra/cqlengine/management.py | 193 +++++---
cassandra/cqlengine/models.py | 86 +++-
cassandra/cqlengine/named.py | 11 +-
cassandra/cqlengine/query.py | 179 ++++++-
cassandra/cqlengine/statements.py | 4 +-
cassandra/cqlengine/usertype.py | 9 +-
cassandra/cqltypes.py | 24 +-
cassandra/encoder.py | 3 +-
cassandra/io/asyncorereactor.py | 2 +
cassandra/io/eventletreactor.py | 14 -
cassandra/io/geventreactor.py | 18 +-
cassandra/io/libevreactor.py | 10 +-
cassandra/metadata.py | 72 ++-
cassandra/metrics.py | 35 +-
cassandra/numpy_parser.pyx | 30 +-
cassandra/policies.py | 84 +++-
cassandra/pool.py | 20 +-
cassandra/protocol.py | 175 +++++--
cassandra/query.py | 44 +-
cassandra/row_parser.pyx | 4 +-
cassandra/type_codes.py | 1 -
cassandra/util.py | 4 +
cassandra_driver.egg-info/PKG-INFO | 6 +-
cassandra_driver.egg-info/SOURCES.txt | 2 -
cassandra_driver.egg-info/pbr.json | 1 -
setup.cfg | 10 -
35 files changed, 1754 insertions(+), 641 deletions(-)
diff --git a/PKG-INFO b/PKG-INFO
index d1b5ebc..e076ef9 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: cassandra-driver
-Version: 3.4.1
+Version: 3.7.0
Summary: Python driver for Cassandra
Home-page: http://github.com/datastax/python-driver
Author: Tyler Hobbs
@@ -78,10 +78,6 @@ Description: DataStax Python Driver for Apache Cassandra
For IRC, use the #datastax-drivers channel on irc.freenode.net. If you don't have an IRC client,
you can use `freenode's web-based client <http://webchat.freenode.net/?channels=#datastax-drivers>`_.
- Features to be Added
- --------------------
- * C extension for encoding/decoding messages
-
License
-------
Copyright 2013-2016 DataStax
diff --git a/README.rst b/README.rst
index 88d50da..b0726a5 100644
--- a/README.rst
+++ b/README.rst
@@ -70,10 +70,6 @@ and the IRC channel.
For IRC, use the #datastax-drivers channel on irc.freenode.net. If you don't have an IRC client,
you can use `freenode's web-based client <http://webchat.freenode.net/?channels=#datastax-drivers>`_.
-Features to be Added
---------------------
-* C extension for encoding/decoding messages
-
License
-------
Copyright 2013-2016 DataStax
diff --git a/cassandra/__init__.py b/cassandra/__init__.py
index 81172c8..8b348df 100644
--- a/cassandra/__init__.py
+++ b/cassandra/__init__.py
@@ -22,7 +22,7 @@ class NullHandler(logging.Handler):
logging.getLogger('cassandra').addHandler(NullHandler())
-__version_info__ = (3, 4, 1)
+__version_info__ = (3, 7, 0)
__version__ = '.'.join(map(str, __version_info__))
@@ -325,16 +325,34 @@ class CoordinationFailure(RequestExecutionException):
The number of replicas that sent a failure message
"""
- def __init__(self, summary_message, consistency=None, required_responses=None, received_responses=None, failures=None):
+ error_code_map = None
+ """
+ A map of inet addresses to error codes representing replicas that sent
+ a failure message. Only set when `protocol_version` is 5 or higher.
+ """
+
+ def __init__(self, summary_message, consistency=None, required_responses=None,
+ received_responses=None, failures=None, error_code_map=None):
self.consistency = consistency
self.required_responses = required_responses
self.received_responses = received_responses
self.failures = failures
- Exception.__init__(self, summary_message + ' info=' +
- repr({'consistency': consistency_value_to_name(consistency),
- 'required_responses': required_responses,
- 'received_responses': received_responses,
- 'failures': failures}))
+ self.error_code_map = error_code_map
+
+ info_dict = {
+ 'consistency': consistency_value_to_name(consistency),
+ 'required_responses': required_responses,
+ 'received_responses': received_responses,
+ 'failures': failures
+ }
+
+ if error_code_map is not None:
+ # make error codes look like "0x002a"
+ formatted_map = dict((addr, '0x%04x' % err_code)
+ for (addr, err_code) in error_code_map.items())
+ info_dict['error_code_map'] = formatted_map
+
+ Exception.__init__(self, summary_message + ' info=' + repr(info_dict))
class ReadFailure(CoordinationFailure):
diff --git a/cassandra/cluster.py b/cassandra/cluster.py
index d311988..d5fa3af 100644
--- a/cassandra/cluster.py
+++ b/cassandra/cluster.py
@@ -20,18 +20,19 @@ from __future__ import absolute_import
import atexit
from collections import defaultdict, Mapping
-from concurrent.futures import ThreadPoolExecutor
+from concurrent.futures import ThreadPoolExecutor, FIRST_COMPLETED, wait as wait_futures
+from copy import copy
+from functools import partial, wraps
+from itertools import groupby, count
import logging
from random import random
+import six
+from six.moves import filter, range, queue as Queue
import socket
import sys
import time
from threading import Lock, RLock, Thread, Event
-import six
-from six.moves import range
-from six.moves import queue as Queue
-
import weakref
from weakref import WeakValueDictionary
try:
@@ -39,9 +40,6 @@ try:
except ImportError:
from cassandra.util import WeakSet # NOQA
-from functools import partial, wraps
-from itertools import groupby, count
-
from cassandra import (ConsistencyLevel, AuthenticationFailed,
OperationTimedOut, UnsupportedOperation,
SchemaTargetType, DriverException)
@@ -64,7 +62,8 @@ from cassandra.protocol import (QueryMessage, ResultMessage,
from cassandra.metadata import Metadata, protect_name, murmur3
from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy,
ExponentialReconnectionPolicy, HostDistance,
- RetryPolicy, IdentityTranslator)
+ RetryPolicy, IdentityTranslator, NoSpeculativeExecutionPlan,
+ NoSpeculativeExecutionPolicy)
from cassandra.pool import (Host, _ReconnectionHandler, _HostReconnectionHandler,
HostConnectionPool, HostConnection,
NoConnectionsAvailable)
@@ -184,16 +183,133 @@ def _shutdown_clusters():
atexit.register(_shutdown_clusters)
-# murmur3 implementation required for TokenAware is only available for CPython
-import platform
-if platform.python_implementation() == 'CPython':
- def default_lbp_factory():
- if murmur3 is not None:
- return TokenAwarePolicy(DCAwareRoundRobinPolicy())
- return DCAwareRoundRobinPolicy()
-else:
- def default_lbp_factory():
- return DCAwareRoundRobinPolicy()
+def default_lbp_factory():
+ if murmur3 is not None:
+ return TokenAwarePolicy(DCAwareRoundRobinPolicy())
+ return DCAwareRoundRobinPolicy()
+
+
+class ExecutionProfile(object):
+ load_balancing_policy = None
+ """
+ An instance of :class:`.policies.LoadBalancingPolicy` or one of its subclasses.
+
+ Used in determining host distance for establishing connections, and routing requests.
+
+ Defaults to ``TokenAwarePolicy(DCAwareRoundRobinPolicy())`` if not specified
+ """
+
+ retry_policy = None
+ """
+ An instance of :class:`.policies.RetryPolicy` instance used when :class:`.Statement` objects do not have a
+ :attr:`~.Statement.retry_policy` explicitly set.
+
+ Defaults to :class:`.RetryPolicy` if not specified
+ """
+
+ consistency_level = ConsistencyLevel.LOCAL_ONE
+ """
+ :class:`.ConsistencyLevel` used when not specified on a :class:`.Statement`.
+ """
+
+ serial_consistency_level = None
+ """
+ Serial :class:`.ConsistencyLevel` used when not specified on a :class:`.Statement` (for LWT conditional statements).
+ """
+
+ request_timeout = 10.0
+ """
+ Request timeout used when not overridden in :meth:`.Session.execute`
+ """
+
+ row_factory = staticmethod(tuple_factory)
+ """
+ A callable to format results, accepting ``(colnames, rows)`` where ``colnames`` is a list of column names, and
+ ``rows`` is a list of tuples, with each tuple representing a row of parsed values.
+
+ Some example implementations:
+
+ - :func:`cassandra.query.tuple_factory` - return a result row as a tuple
+ - :func:`cassandra.query.named_tuple_factory` - return a result row as a named tuple
+ - :func:`cassandra.query.dict_factory` - return a result row as a dict
+ - :func:`cassandra.query.ordered_dict_factory` - return a result row as an OrderedDict
+ """
+
+ speculative_execution_policy = None
+ """
+ An instance of :class:`.policies.SpeculativeExecutionPolicy`
+
+ Defaults to :class:`.NoSpeculativeExecutionPolicy` if not specified
+ """
+
+ def __init__(self, load_balancing_policy=None, retry_policy=None,
+ consistency_level=ConsistencyLevel.LOCAL_ONE, serial_consistency_level=None,
+ request_timeout=10.0, row_factory=named_tuple_factory, speculative_execution_policy=None):
+ self.load_balancing_policy = load_balancing_policy or default_lbp_factory()
+ self.retry_policy = retry_policy or RetryPolicy()
+ self.consistency_level = consistency_level
+ self.serial_consistency_level = serial_consistency_level
+ self.request_timeout = request_timeout
+ self.row_factory = row_factory
+ self.speculative_execution_policy = speculative_execution_policy or NoSpeculativeExecutionPolicy()
+
+
+class ProfileManager(object):
+
+ def __init__(self):
+ self.profiles = dict()
+
+ def distance(self, host):
+ distances = set(p.load_balancing_policy.distance(host) for p in self.profiles.values())
+ return HostDistance.LOCAL if HostDistance.LOCAL in distances else \
+ HostDistance.REMOTE if HostDistance.REMOTE in distances else \
+ HostDistance.IGNORED
+
+ def populate(self, cluster, hosts):
+ for p in self.profiles.values():
+ p.load_balancing_policy.populate(cluster, hosts)
+
+ def check_supported(self):
+ for p in self.profiles.values():
+ p.load_balancing_policy.check_supported()
+
+ def on_up(self, host):
+ for p in self.profiles.values():
+ p.load_balancing_policy.on_up(host)
+
+ def on_down(self, host):
+ for p in self.profiles.values():
+ p.load_balancing_policy.on_down(host)
+
+ def on_add(self, host):
+ for p in self.profiles.values():
+ p.load_balancing_policy.on_add(host)
+
+ def on_remove(self, host):
+ for p in self.profiles.values():
+ p.load_balancing_policy.on_remove(host)
+
+ @property
+ def default(self):
+ """
+ internal-only; no checks are done because this entry is populated on cluster init
+ """
+ return self.profiles[EXEC_PROFILE_DEFAULT]
+
+
+EXEC_PROFILE_DEFAULT = object()
+"""
+Key for the ``Cluster`` default execution profile, used when no other profile is selected in
+``Session.execute(execution_profile)``.
+
+Use this as the key in ``Cluster(execution_profiles)`` to override the default profile.
+"""
+
+
+class _ConfigMode(object):
+ UNCOMMITTED = 0
+ LEGACY = 1
+ PROFILES = 2
class Cluster(object):
@@ -243,10 +359,11 @@ class Cluster(object):
"""
The maximum version of the native protocol to use.
- The driver will automatically downgrade version based on a negotiation with
- the server, but it is most efficient to set this to the maximum supported
- by your version of Cassandra. Setting this will also prevent conflicting
- versions negotiated if your cluster is upgraded.
+ If not set in the constructor, the driver will automatically downgrade
+ version based on a negotiation with the server, but it is most efficient
+ to set this to the maximum supported by your version of Cassandra.
+ Setting this will also prevent conflicting versions negotiated if your
+ cluster is upgraded.
Version 2 of the native protocol adds support for lightweight transactions,
batch operations, and automatic query paging. The v2 protocol is
@@ -275,6 +392,14 @@ class Cluster(object):
+-------------------+-------------------+
| 2.2 | 1, 2, 3, 4 |
+-------------------+-------------------+
+ | 3.x | 3, 4 |
+ +-------------------+-------------------+
+ """
+
+ allow_beta_protocol_version = False
+ """
+ Setting true injects a flag in all messages that makes the server accept and use "beta" protocol version.
+ Used for testing new protocol features incrementally before the new version is complete.
"""
compression = True
@@ -326,20 +451,34 @@ class Cluster(object):
self._auth_provider = value
- load_balancing_policy = None
- """
- An instance of :class:`.policies.LoadBalancingPolicy` or
- one of its subclasses.
+ _load_balancing_policy = None
+ @property
+ def load_balancing_policy(self):
+ """
+ An instance of :class:`.policies.LoadBalancingPolicy` or
+ one of its subclasses.
- .. versionchanged:: 2.6.0
+ .. versionchanged:: 2.6.0
- Defaults to :class:`~.TokenAwarePolicy` (:class:`~.DCAwareRoundRobinPolicy`).
- when using CPython (where the murmur3 extension is available). :class:`~.DCAwareRoundRobinPolicy`
- otherwise. Default local DC will be chosen from contact points.
+ Defaults to :class:`~.TokenAwarePolicy` (:class:`~.DCAwareRoundRobinPolicy`).
+ when using CPython (where the murmur3 extension is available). :class:`~.DCAwareRoundRobinPolicy`
+ otherwise. Default local DC will be chosen from contact points.
- **Please see** :class:`~.DCAwareRoundRobinPolicy` **for a discussion on default behavior with respect to
- DC locality and remote nodes.**
- """
+ **Please see** :class:`~.DCAwareRoundRobinPolicy` **for a discussion on default behavior with respect to
+ DC locality and remote nodes.**
+ """
+ return self._load_balancing_policy
+
+ @load_balancing_policy.setter
+ def load_balancing_policy(self, lbp):
+ if self._config_mode == _ConfigMode.PROFILES:
+ raise ValueError("Cannot set Cluster.load_balancing_policy while using Configuration Profiles. Set this in a profile instead.")
+ self._load_balancing_policy = lbp
+ self._config_mode = _ConfigMode.LEGACY
+
+ @property
+ def _default_load_balancing_policy(self):
+ return self.profile_manager.default.load_balancing_policy
reconnection_policy = ExponentialReconnectionPolicy(1.0, 600.0)
"""
@@ -348,12 +487,22 @@ class Cluster(object):
a max delay of ten minutes.
"""
- default_retry_policy = RetryPolicy()
- """
- A default :class:`.policies.RetryPolicy` instance to use for all
- :class:`.Statement` objects which do not have a :attr:`~.Statement.retry_policy`
- explicitly set.
- """
+ _default_retry_policy = RetryPolicy()
+ @property
+ def default_retry_policy(self):
+ """
+ A default :class:`.policies.RetryPolicy` instance to use for all
+ :class:`.Statement` objects which do not have a :attr:`~.Statement.retry_policy`
+ explicitly set.
+ """
+ return self._default_retry_policy
+
+ @default_retry_policy.setter
+ def default_retry_policy(self, policy):
+ if self._config_mode == _ConfigMode.PROFILES:
+ raise ValueError("Cannot set Cluster.default_retry_policy while using Configuration Profiles. Set this in a profile instead.")
+ self._default_retry_policy = policy
+ self._config_mode = _ConfigMode.LEGACY
conviction_policy_factory = SimpleConvictionPolicy
"""
@@ -570,6 +719,9 @@ class Cluster(object):
def token_metadata_enabled(self, enabled):
self.control_connection._token_meta_enabled = bool(enabled)
+ profile_manager = None
+ _config_mode = _ConfigMode.UNCOMMITTED
+
sessions = None
control_connection = None
scheduler = None
@@ -579,6 +731,8 @@ class Cluster(object):
_prepared_statements = None
_prepared_statement_lock = None
_idle_heartbeat = None
+ _protocol_version_explicit = False
+ _discount_down_events = True
_user_types = None
"""
@@ -602,7 +756,7 @@ class Cluster(object):
ssl_options=None,
sockopts=None,
cql_version=None,
- protocol_version=4,
+ protocol_version=_NOT_SET,
executor_threads=2,
max_schema_agreement_wait=10,
control_connection_timeout=2.0,
@@ -615,7 +769,9 @@ class Cluster(object):
address_translator=None,
status_event_refresh_window=2,
prepare_on_all_hosts=True,
- reprepare_on_up=True):
+ reprepare_on_up=True,
+ execution_profiles=None,
+ allow_beta_protocol_version=False):
"""
``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
extablishing connection pools or refreshing metadata.
@@ -636,7 +792,12 @@ class Cluster(object):
for endpoint in socket.getaddrinfo(a, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM)]
self.compression = compression
- self.protocol_version = protocol_version
+
+ if protocol_version is not _NOT_SET:
+ self.protocol_version = protocol_version
+ self._protocol_version_explicit = True
+ self.allow_beta_protocol_version = allow_beta_protocol_version
+
self.auth_provider = auth_provider
if load_balancing_policy is not None:
@@ -644,7 +805,7 @@ class Cluster(object):
raise TypeError("load_balancing_policy should not be a class, it should be an instance of that class")
self.load_balancing_policy = load_balancing_policy
else:
- self.load_balancing_policy = default_lbp_factory()
+ self._load_balancing_policy = default_lbp_factory() # set internal attribute to avoid committing to legacy config mode
if reconnection_policy is not None:
if isinstance(reconnection_policy, type):
@@ -669,6 +830,24 @@ class Cluster(object):
if connection_class is not None:
self.connection_class = connection_class
+ self.profile_manager = ProfileManager()
+ self.profile_manager.profiles[EXEC_PROFILE_DEFAULT] = ExecutionProfile(self.load_balancing_policy,
+ self.default_retry_policy,
+ Session._default_consistency_level,
+ Session._default_serial_consistency_level,
+ Session._default_timeout,
+ Session._row_factory)
+ # legacy mode if either of these is not default
+ if load_balancing_policy or default_retry_policy:
+ if execution_profiles:
+ raise ValueError("Clusters constructed with execution_profiles should not specify legacy parameters "
+ "load_balancing_policy or default_retry_policy. Configure this in a profile instead.")
+ self._config_mode = _ConfigMode.LEGACY
+ else:
+ if execution_profiles:
+ self.profile_manager.profiles.update(execution_profiles)
+ self._config_mode = _ConfigMode.PROFILES
+
self.metrics_enabled = metrics_enabled
self.ssl_options = ssl_options
self.sockopts = sockopts
@@ -789,6 +968,39 @@ class Cluster(object):
session.user_type_registered(keyspace, user_type, klass)
UserType.evict_udt_class(keyspace, user_type)
+ def add_execution_profile(self, name, profile, pool_wait_timeout=5):
+ """
+ Adds an :class:`.ExecutionProfile` to the cluster. This makes it available for use by ``name`` in :meth:`.Session.execute`
+ and :meth:`.Session.execute_async`. This method will raise if the profile already exists.
+
+ Normally profiles will be injected at cluster initialization via ``Cluster(execution_profiles)``. This method
+ provides a way of adding them dynamically.
+
+ Adding a new profile updates the connection pools according to the specified ``load_balancing_policy``. By default,
+ this method will wait up to five seconds for the pool creation to complete, so the profile can be used immediately
+ upon return. This behavior can be controlled using ``pool_wait_timeout`` (see
+ `concurrent.futures.wait <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.wait>`_
+ for timeout semantics).
+ """
+ if not isinstance(profile, ExecutionProfile):
+ raise TypeError("profile must be an instance of ExecutionProfile")
+ if self._config_mode == _ConfigMode.LEGACY:
+ raise ValueError("Cannot add execution profiles when legacy parameters are set explicitly. TODO: link to doc")
+ if name in self.profile_manager.profiles:
+ raise ValueError("Profile %s already exists")
+ self.profile_manager.profiles[name] = profile
+ profile.load_balancing_policy.populate(self, self.metadata.all_hosts())
+ # on_up after populate allows things like DCA LBP to choose default local dc
+ for host in filter(lambda h: h.is_up, self.metadata.all_hosts()):
+ profile.load_balancing_policy.on_up(host)
+ futures = set()
+ for session in self.sessions:
+ futures.update(session.update_created_pools())
+ _, not_done = wait_futures(futures, pool_wait_timeout)
+ if not_done:
+ raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout.")
+
+
def get_min_requests_per_connection(self, host_distance):
return self._min_requests_per_connection[host_distance]
@@ -921,10 +1133,14 @@ class Cluster(object):
kwargs_dict.setdefault('cql_version', self.cql_version)
kwargs_dict.setdefault('protocol_version', self.protocol_version)
kwargs_dict.setdefault('user_type_map', self._user_types)
+ kwargs_dict.setdefault('allow_beta_protocol_version', self.allow_beta_protocol_version)
return kwargs_dict
def protocol_downgrade(self, host_addr, previous_version):
+ if self._protocol_version_explicit:
+ raise DriverException("ProtocolError returned from server while using explicitly set client protocol_version %d" % (previous_version,))
+
new_version = previous_version - 1
if new_version < self.protocol_version:
if new_version >= MIN_SUPPORTED_VERSION:
@@ -935,7 +1151,7 @@ class Cluster(object):
else:
raise DriverException("Cannot downgrade protocol version (%d) below minimum supported version: %d" % (new_version, MIN_SUPPORTED_VERSION))
- def connect(self, keyspace=None):
+ def connect(self, keyspace=None, wait_for_all_pools=False):
"""
Creates and returns a new :class:`~.Session` object. If `keyspace`
is specified, that keyspace will be the default keyspace for
@@ -957,11 +1173,18 @@ class Cluster(object):
for listener in self.listeners:
listener.on_add(host)
- self.load_balancing_policy.populate(
+ self.profile_manager.populate(
weakref.proxy(self), self.metadata.all_hosts())
try:
self.control_connection.connect()
+
+ # we set all contact points up for connecting, but we won't infer state after this
+ for address in self.contact_points_resolved:
+ h = self.metadata.get_host(address)
+ if h and self.profile_manager.distance(h) == HostDistance.IGNORED:
+ h.is_up = None
+
log.debug("Control connection created")
except Exception:
log.exception("Control connection failed to connect, "
@@ -969,15 +1192,15 @@ class Cluster(object):
self.shutdown()
raise
- self.load_balancing_policy.check_supported()
+ self.profile_manager.check_supported() # todo: rename this method
if self.idle_heartbeat_interval:
self._idle_heartbeat = ConnectionHeartbeat(self.idle_heartbeat_interval, self.get_connection_holders)
self._is_setup = True
- session = self._new_session()
- if keyspace:
- session.set_keyspace(keyspace)
+ session = self._new_session(keyspace)
+ if wait_for_all_pools:
+ wait_futures(session._initial_connect_futures)
return session
def get_connection_holders(self):
@@ -1021,8 +1244,8 @@ class Cluster(object):
def __exit__(self, *args):
self.shutdown()
- def _new_session(self):
- session = Session(self, self.metadata.all_hosts())
+ def _new_session(self, keyspace):
+ session = Session(self, self.metadata.all_hosts(), keyspace)
self._session_register_user_types(session)
self.sessions.add(session)
return session
@@ -1033,7 +1256,7 @@ class Cluster(object):
session.user_type_registered(keyspace, udt_name, klass)
def _cleanup_failed_on_up_handling(self, host):
- self.load_balancing_policy.on_down(host)
+ self.profile_manager.on_down(host)
self.control_connection.on_down(host)
for session in self.sessions:
session.remove_pool(host)
@@ -1113,8 +1336,8 @@ class Cluster(object):
for session in self.sessions:
session.remove_pool(host)
- log.debug("Signalling to load balancing policy that host %s is up", host)
- self.load_balancing_policy.on_up(host)
+ log.debug("Signalling to load balancing policies that host %s is up", host)
+ self.profile_manager.on_up(host)
log.debug("Signalling to control connection that host %s is up", host)
self.control_connection.on_up(host)
@@ -1142,13 +1365,14 @@ class Cluster(object):
else:
if not have_future:
with host.lock:
+ host.set_up()
host._currently_handling_node_up = False
# for testing purposes
return futures
def _start_reconnector(self, host, is_host_addition):
- if self.load_balancing_policy.distance(host) == HostDistance.IGNORED:
+ if self.profile_manager.distance(host) == HostDistance.IGNORED:
return
schedule = self.reconnection_policy.new_schedule()
@@ -1180,14 +1404,27 @@ class Cluster(object):
return
with host.lock:
- if (not host.is_up and not expect_host_to_be_down) or host.is_currently_reconnecting():
- return
+ was_up = host.is_up
+
+ # ignore down signals if we have open pools to the host
+ # this is to avoid closing pools when a control connection host became isolated
+ if self._discount_down_events and self.profile_manager.distance(host) != HostDistance.IGNORED:
+ connected = False
+ for session in self.sessions:
+ pool_states = session.get_pool_state()
+ pool_state = pool_states.get(host)
+ if pool_state:
+ connected |= pool_state['open_count'] > 0
+ if connected:
+ return
host.set_down()
+ if (not was_up and not expect_host_to_be_down) or host.is_currently_reconnecting():
+ return
log.warning("Host %s has been marked down", host)
- self.load_balancing_policy.on_down(host)
+ self.profile_manager.on_down(host)
self.control_connection.on_down(host)
for session in self.sessions:
session.on_down(host)
@@ -1203,12 +1440,12 @@ class Cluster(object):
log.debug("Handling new host %r and notifying listeners", host)
- distance = self.load_balancing_policy.distance(host)
+ distance = self.profile_manager.distance(host)
if distance != HostDistance.IGNORED:
self._prepare_all_queries(host)
log.debug("Done preparing queries for new host %r", host)
- self.load_balancing_policy.on_add(host)
+ self.profile_manager.on_add(host)
self.control_connection.on_add(host, refresh_nodes)
if distance == HostDistance.IGNORED:
@@ -1273,7 +1510,7 @@ class Cluster(object):
log.debug("Removing host %s", host)
host.set_down()
- self.load_balancing_policy.on_remove(host)
+ self.profile_manager.on_remove(host)
for session in self.sessions:
session.on_remove(host)
for listener in self.listeners:
@@ -1360,6 +1597,14 @@ class Cluster(object):
return SchemaTargetType.KEYSPACE
return None
+ def get_control_connection_host(self):
+ """
+ Returns the control connection host metadata.
+ """
+ connection = self.control_connection._connection
+ host = connection.host if connection else None
+ return self.metadata.get_host(host) if host else None
+
def refresh_schema_metadata(self, max_schema_agreement_wait=None):
"""
Synchronously refresh all schema metadata.
@@ -1442,13 +1687,15 @@ class Cluster(object):
schema_agreement_wait=max_schema_agreement_wait, force=True):
raise DriverException("User Aggregate metadata was not refreshed. See log for details.")
- def refresh_nodes(self):
+ def refresh_nodes(self, force_token_rebuild=False):
"""
Synchronously refresh the node list and token metadata
+ `force_token_rebuild` can be used to rebuild the token map metadata, even if no new nodes are discovered.
+
An Exception is raised if node refresh fails for any reason.
"""
- if not self.control_connection.refresh_node_list_and_token_map():
+ if not self.control_connection.refresh_node_list_and_token_map(force_token_rebuild):
raise DriverException("Node list was not refreshed. See log for details.")
def set_meta_refresh_enabled(self, enabled):
@@ -1533,54 +1780,85 @@ class Session(object):
keyspace = None
is_shutdown = False
- row_factory = staticmethod(named_tuple_factory)
- """
- The format to return row results in. By default, each
- returned row will be a named tuple. You can alternatively
- use any of the following:
+ _row_factory = staticmethod(named_tuple_factory)
+ @property
+ def row_factory(self):
+ """
+ The format to return row results in. By default, each
+ returned row will be a named tuple. You can alternatively
+ use any of the following:
- - :func:`cassandra.query.tuple_factory` - return a result row as a tuple
- - :func:`cassandra.query.named_tuple_factory` - return a result row as a named tuple
- - :func:`cassandra.query.dict_factory` - return a result row as a dict
- - :func:`cassandra.query.ordered_dict_factory` - return a result row as an OrderedDict
+ - :func:`cassandra.query.tuple_factory` - return a result row as a tuple
+ - :func:`cassandra.query.named_tuple_factory` - return a result row as a named tuple
+ - :func:`cassandra.query.dict_factory` - return a result row as a dict
+ - :func:`cassandra.query.ordered_dict_factory` - return a result row as an OrderedDict
- """
+ """
+ return self._row_factory
- default_timeout = 10.0
- """
- A default timeout, measured in seconds, for queries executed through
- :meth:`.execute()` or :meth:`.execute_async()`. This default may be
- overridden with the `timeout` parameter for either of those methods.
+ @row_factory.setter
+ def row_factory(self, rf):
+ self._validate_set_legacy_config('row_factory', rf)
- Setting this to :const:`None` will cause no timeouts to be set by default.
+ _default_timeout = 10.0
- Please see :meth:`.ResponseFuture.result` for details on the scope and
- effect of this timeout.
+ @property
+ def default_timeout(self):
+ """
+ A default timeout, measured in seconds, for queries executed through
+ :meth:`.execute()` or :meth:`.execute_async()`. This default may be
+ overridden with the `timeout` parameter for either of those methods.
- .. versionadded:: 2.0.0
- """
+ Setting this to :const:`None` will cause no timeouts to be set by default.
- default_consistency_level = ConsistencyLevel.LOCAL_ONE
- """
- The default :class:`~ConsistencyLevel` for operations executed through
- this session. This default may be overridden by setting the
- :attr:`~.Statement.consistency_level` on individual statements.
+ Please see :meth:`.ResponseFuture.result` for details on the scope and
+ effect of this timeout.
- .. versionadded:: 1.2.0
+ .. versionadded:: 2.0.0
+ """
+ return self._default_timeout
- .. versionchanged:: 3.0.0
+ @default_timeout.setter
+ def default_timeout(self, timeout):
+ self._validate_set_legacy_config('default_timeout', timeout)
- default changed from ONE to LOCAL_ONE
- """
+ _default_consistency_level = ConsistencyLevel.LOCAL_ONE
- default_serial_consistency_level = None
- """
- The default :class:`~ConsistencyLevel` for serial phase of conditional updates executed through
- this session. This default may be overridden by setting the
- :attr:`~.Statement.serial_consistency_level` on individual statements.
+ @property
+ def default_consistency_level(self):
+ """
+ The default :class:`~ConsistencyLevel` for operations executed through
+ this session. This default may be overridden by setting the
+ :attr:`~.Statement.consistency_level` on individual statements.
- Only valid for ``protocol_version >= 2``.
- """
+ .. versionadded:: 1.2.0
+
+ .. versionchanged:: 3.0.0
+
+ default changed from ONE to LOCAL_ONE
+ """
+ return self._default_consistency_level
+
+ @default_consistency_level.setter
+ def default_consistency_level(self, cl):
+ self._validate_set_legacy_config('default_consistency_level', cl)
+
+ _default_serial_consistency_level = None
+
+ @property
+ def default_serial_consistency_level(self):
+ """
+ The default :class:`~ConsistencyLevel` for serial phase of conditional updates executed through
+ this session. This default may be overridden by setting the
+ :attr:`~.Statement.serial_consistency_level` on individual statements.
+
+ Only valid for ``protocol_version >= 2``.
+ """
+ return self._default_serial_consistency_level
+
+ @default_serial_consistency_level.setter
+ def default_serial_consistency_level(self, cl):
+ self._validate_set_legacy_config('default_serial_consistency_level', cl)
max_trace_wait = 2.0
"""
@@ -1654,32 +1932,36 @@ class Session(object):
_lock = None
_pools = None
- _load_balancer = None
+ _profile_manager = None
_metrics = None
+ _request_init_callbacks = None
- def __init__(self, cluster, hosts):
+ def __init__(self, cluster, hosts, keyspace=None):
self.cluster = cluster
self.hosts = hosts
+ self.keyspace = keyspace
self._lock = RLock()
self._pools = {}
- self._load_balancer = cluster.load_balancing_policy
+ self._profile_manager = cluster.profile_manager
self._metrics = cluster.metrics
+ self._request_init_callbacks = []
self._protocol_version = self.cluster.protocol_version
self.encoder = Encoder()
# create connection pools in parallel
- futures = []
+ self._initial_connect_futures = set()
for host in hosts:
future = self.add_or_renew_pool(host, is_host_addition=False)
- if future is not None:
- futures.append(future)
+ if future:
+ self._initial_connect_futures.add(future)
- for future in futures:
- future.result()
+ futures = wait_futures(self._initial_connect_futures, return_when=FIRST_COMPLETED)
+ while futures.not_done and not any(f.result() for f in futures.done):
+ futures = wait_futures(futures.not_done, return_when=FIRST_COMPLETED)
- def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False, custom_payload=None):
+ def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False, custom_payload=None, execution_profile=EXEC_PROFILE_DEFAULT, paging_state=None):
"""
Execute the given query and synchronously wait for the response.
@@ -1706,28 +1988,24 @@ class Session(object):
`custom_payload` is a :ref:`custom_payload` dict to be passed to the server.
If `query` is a Statement with its own custom_payload. The message payload
will be a union of the two, with the values specified here taking precedence.
+
+ `execution_profile` is the execution profile to use for this request. It can be a key to a profile configured
+ via :meth:`Cluster.add_execution_profile` or an instance (from :meth:`Session.execution_profile_clone_update`,
+ for example
+
+ `paging_state` is an optional paging state, reused from a previous :class:`ResultSet`.
"""
- return self.execute_async(query, parameters, trace, custom_payload, timeout).result()
+ return self.execute_async(query, parameters, trace, custom_payload, timeout, execution_profile, paging_state).result()
- def execute_async(self, query, parameters=None, trace=False, custom_payload=None, timeout=_NOT_SET):
+ def execute_async(self, query, parameters=None, trace=False, custom_payload=None, timeout=_NOT_SET, execution_profile=EXEC_PROFILE_DEFAULT, paging_state=None):
"""
Execute the given query and return a :class:`~.ResponseFuture` object
which callbacks may be attached to for asynchronous response
delivery. You may also call :meth:`~.ResponseFuture.result()`
- on the :class:`.ResponseFuture` to syncronously block for results at
+ on the :class:`.ResponseFuture` to synchronously block for results at
any time.
- If `trace` is set to :const:`True`, you may get the query trace descriptors using
- :meth:`.ResponseFuture.get_query_trace()` or :meth:`.ResponseFuture.get_all_query_traces()`
- on the future result.
-
- `custom_payload` is a :ref:`custom_payload` dict to be passed to the server.
- If `query` is a Statement with its own custom_payload. The message payload
- will be a union of the two, with the values specified here taking precedence.
-
- If the server sends a custom payload in the response message,
- the dict can be obtained following :meth:`.ResponseFuture.result` via
- :attr:`.ResponseFuture.custom_payload`
+ See :meth:`Session.execute` for parameter definitions.
Example usage::
@@ -1754,15 +2032,13 @@ class Session(object):
... log.exception("Operation failed:")
"""
- if timeout is _NOT_SET:
- timeout = self.default_timeout
-
- future = self._create_response_future(query, parameters, trace, custom_payload, timeout)
+ future = self._create_response_future(query, parameters, trace, custom_payload, timeout, execution_profile, paging_state)
future._protocol_handler = self.client_protocol_handler
+ self._on_request(future)
future.send_request()
return future
- def _create_response_future(self, query, parameters, trace, custom_payload, timeout):
+ def _create_response_future(self, query, parameters, trace, custom_payload, timeout, execution_profile=EXEC_PROFILE_DEFAULT, paging_state=None):
""" Returns the ResponseFuture before calling send_request() on it """
prepared_statement = None
@@ -1772,8 +2048,34 @@ class Session(object):
elif isinstance(query, PreparedStatement):
query = query.bind(parameters)
- cl = query.consistency_level if query.consistency_level is not None else self.default_consistency_level
- serial_cl = query.serial_consistency_level if query.serial_consistency_level is not None else self.default_serial_consistency_level
+ if self.cluster._config_mode == _ConfigMode.LEGACY:
+ if execution_profile is not EXEC_PROFILE_DEFAULT:
+ raise ValueError("Cannot specify execution_profile while using legacy parameters.")
+
+ if timeout is _NOT_SET:
+ timeout = self.default_timeout
+
+ cl = query.consistency_level if query.consistency_level is not None else self.default_consistency_level
... 4040 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-cassandra-driver.git
More information about the Python-modules-commits
mailing list