[Python-modules-commits] [elasticsearch-curator] 10/18: Import elasticsearch-curator_5.2.0.orig.tar.gz
Apollon Oikonomopoulos
apoikos at moszumanska.debian.org
Tue Oct 17 08:17:16 UTC 2017
This is an automated email from the git hooks/post-receive script.
apoikos pushed a commit to branch master
in repository elasticsearch-curator.
commit ea25a657dc671aa8c45963ae04c6ed6daa1da5df
Author: Apollon Oikonomopoulos <apoikos at debian.org>
Date: Tue Oct 17 10:56:17 2017 +0300
Import elasticsearch-curator_5.2.0.orig.tar.gz
---
.travis.yml | 2 +-
CONTRIBUTORS | 4 +
curator/_version.py | 2 +-
curator/actions.py | 354 +++++++++++++++++++++++++++++++-
curator/cli.py | 31 +--
curator/defaults/filter_elements.py | 6 +
curator/defaults/filtertypes.py | 1 +
curator/defaults/option_defaults.py | 44 +++-
curator/defaults/settings.py | 2 +
curator/indexlist.py | 34 ++-
curator/utils.py | 83 +++++++-
curator/validators/options.py | 15 ++
docs/Changelog.rst | 43 ++++
docs/actionclasses.rst | 31 ++-
docs/asciidoc/actions.asciidoc | 112 ++++++++++
docs/asciidoc/examples.asciidoc | 59 +++++-
docs/asciidoc/filter_elements.asciidoc | 86 ++++++++
docs/asciidoc/filters.asciidoc | 1 +
docs/asciidoc/inc_kinds.asciidoc | 2 +-
docs/asciidoc/index.asciidoc | 4 +-
docs/asciidoc/options.asciidoc | 289 ++++++++++++++++++++++++--
test/integration/test_delete_indices.py | 164 +++++++++++++++
test/integration/test_rollover.py | 52 ++++-
test/integration/testvars.py | 17 ++
test/unit/test_utils.py | 42 +++-
test/unit/testvars.py | 11 +-
26 files changed, 1438 insertions(+), 53 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index da97633..8b7997d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -12,7 +12,7 @@ env:
- ES_VERSION=5.2.2
- ES_VERSION=5.3.3
- ES_VERSION=5.4.3
- - ES_VERSION=5.5.1
+ - ES_VERSION=5.5.2
os: linux
diff --git a/CONTRIBUTORS b/CONTRIBUTORS
index 1b9334f..d2ebbd7 100644
--- a/CONTRIBUTORS
+++ b/CONTRIBUTORS
@@ -79,3 +79,7 @@ Contributors:
* (dtrv)
* Christopher "Chief" Najewicz (chiefy)
* Filipe Gonçalves (basex)
+* Sönke Liebau (soenkeliebau)
+* Timothy Schroder (tschroeder-zendesk)
+* Jared Carey (jpcarey)
+* Juraj Seffer (jurajseffer)
\ No newline at end of file
diff --git a/curator/_version.py b/curator/_version.py
index 1c66eec..9b2ff6c 100644
--- a/curator/_version.py
+++ b/curator/_version.py
@@ -1,2 +1,2 @@
-__version__ = '5.1.2'
+__version__ = '5.2.0'
diff --git a/curator/actions.py b/curator/actions.py
index 013f603..312d9e8 100644
--- a/curator/actions.py
+++ b/curator/actions.py
@@ -863,7 +863,7 @@ class Rollover(object):
self.settings = extra_settings
#: Instance variable.
#: Internal reference to `new_index`
- self.new_index = new_index
+ self.new_index = parse_date_pattern(new_index) if new_index else new_index
#: Instance variable.
#: Internal reference to `wait_for_active_shards`
self.wait_for_active_shards = wait_for_active_shards
@@ -1744,3 +1744,355 @@ class Restore(object):
)
except Exception as e:
report_failure(e)
+
+class Shrink(object):
+ def __init__(self, ilo, shrink_node='DETERMINISTIC', node_filters={},
+ number_of_shards=1, number_of_replicas=1,
+ shrink_prefix='', shrink_suffix='-shrink',
+ delete_after=True, post_allocation={},
+ wait_for_active_shards=1,
+ extra_settings={}, wait_for_completion=True, wait_interval=9,
+ max_wait=-1):
+ """
+ :arg ilo: A :class:`curator.indexlist.IndexList` object
+ :arg shrink_node: The node name to use as the shrink target, or
+ ``DETERMINISTIC``, which will use the values in ``node_filters`` to
+ determine which node will be the shrink node.
+ :arg node_filters: If the value of ``shrink_node`` is ``DETERMINISTIC``,
+ the values in ``node_filters`` will be used while determining which
+ node to allocate the shards on before performing the shrink.
+ :type node_filters: dict, representing the filters
+ :arg number_of_shards: The number of shards the shrunk index should have
+ :arg number_of_replicas: The number of replicas for the shrunk index
+ :arg shrink_prefix: Prepend the shrunk index with this value
+ :arg shrink_suffix: Append the value to the shrunk index (default: `-shrink`)
+ :arg delete_after: Whether to delete each index after shrinking. (default: `True`)
+ :type delete_after: bool
+ :arg post_allocation: If populated, the `allocation_type`, `key`, and
+ `value` will be applied to the shrunk index to re-route it.
+ :type post_allocation: dict, with keys `allocation_type`, `key`, and `value`
+ :arg wait_for_active_shards: The number of shards expected to be active before returning.
+ :arg extra_settings: Permitted root keys are `settings` and `aliases`.
+ See https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-shrink-index.html
+ :type extra_settings: dict
+ :arg wait_for_active_shards: Wait for active shards before returning.
+ :arg wait_for_completion: Wait (or not) for the operation
+ to complete before returning. You should not normally change this,
+ ever. (default: `True`)
+ :arg wait_interval: How long in seconds to wait between checks for
+ completion.
+ :arg max_wait: Maximum number of seconds to `wait_for_completion`
+ :type wait_for_completion: bool
+ """
+ self.loggit = logging.getLogger('curator.actions.shrink')
+ verify_index_list(ilo)
+ if not 'permit_masters' in node_filters:
+ node_filters['permit_masters'] = False
+ #: Instance variable. The Elasticsearch Client object derived from `ilo`
+ self.client = ilo.client
+ #: Instance variable. Internal reference to `ilo`
+ self.index_list = ilo
+ #: Instance variable. Internal reference to `shrink_node`
+ self.shrink_node = shrink_node
+ #: Instance variable. Internal reference to `node_filters`
+ self.node_filters = node_filters
+ #: Instance variable. Internal reference to `shrink_prefix`
+ self.shrink_prefix = shrink_prefix
+ #: Instance variable. Internal reference to `shrink_suffix`
+ self.shrink_suffix = shrink_suffix
+ #: Instance variable. Internal reference to `delete_after`
+ self.delete_after = delete_after
+ #: Instance variable. Internal reference to `post_allocation`
+ self.post_allocation = post_allocation
+ #: Instance variable. Internal reference to `wait_for_completion`
+ self.wfc = wait_for_completion
+ #: Instance variable. How many seconds to wait between checks for completion.
+ self.wait_interval = wait_interval
+ #: Instance variable. How long in seconds to `wait_for_completion` before returning with an exception. A value of -1 means wait forever.
+ self.max_wait = max_wait
+ #: Instance variable. Internal reference to `number_of_shards`
+ self.number_of_shards = number_of_shards
+ self.wait_for_active_shards = wait_for_active_shards
+ self.shrink_node_name = None
+ self.body = {
+ 'settings': {
+ 'index.number_of_shards' : number_of_shards,
+ 'index.number_of_replicas' : number_of_replicas,
+ }
+ }
+ if extra_settings:
+ self._merge_extra_settings(extra_settings)
+
+ def _merge_extra_settings(self, extra_settings):
+ self.loggit.debug(
+ 'Adding extra_settings to shrink body: '
+ '{0}'.format(extra_settings)
+ )
+ # Pop these here, otherwise we could overwrite our default number of
+ # shards and replicas
+ if 'settings' in extra_settings:
+ settings = extra_settings.pop('settings')
+ try:
+ self.body['settings'].update(settings)
+ except Exception as e:
+ raise ConfigurationError('Unable to apply extra settings "{0}" to shrink body'.format({'settings':settings}))
+ if extra_settings:
+ try: # Apply any remaining keys, should there be any.
+ self.body.update(extra_settings)
+ except Exception as e:
+ raise ConfigurationError('Unable to apply extra settings "{0}" to shrink body'.format(extra_settings))
+
+ def _data_node(self, node_id):
+ roles = node_roles(self.client, node_id)
+ name = node_id_to_name(self.client, node_id)
+ if not 'data' in roles:
+ self.loggit.info('Skipping node "{0}": non-data node'.format(name))
+ return False
+ if 'master' in roles and not self.node_filters['permit_masters']:
+ self.loggit.info('Skipping node "{0}": master node'.format(name))
+ return False
+ elif 'master' in roles and self.node_filters['permit_masters']:
+ self.loggit.warn('Not skipping node "{0}" which is a master node (not recommended), but permit_masters is True'.format(name))
+ return True
+ else: # It does have `data` as a role.
+ return True
+
+ def _exclude_node(self, name):
+ if 'exclude_nodes' in self.node_filters:
+ if name in self.node_filters['exclude_nodes']:
+ self.loggit.info('Excluding node "{0}" due to node_filters'.format(name))
+ return True
+ return False
+
+ def _shrink_target(self, name):
+ return '{0}{1}{2}'.format(self.shrink_prefix, name, self.shrink_suffix)
+
+ def qualify_single_node(self):
+ node_id = name_to_node_id(self.client, self.shrink_node)
+ if node_id:
+ self.shrink_node_id = node_id
+ self.shrink_node_name = self.shrink_node
+ else:
+ raise ConfigurationError('Unable to find node named: "{0}"'.format(self.shrink_node))
+ if self._exclude_node(self.shrink_node):
+ raise ConfigurationError('Node "{0}" listed for exclusion'.format(self.shrink_node))
+ if not self._data_node(node_id):
+ raise ActionError('Node "{0}" is not usable as a shrink node'.format(self.shrink_node))
+ if not single_data_path(self.client, node_id):
+ raise ActionError(
+ 'Node "{0}" has multiple data paths and cannot be used '
+ 'for shrink operations.'
+ )
+ self.shrink_node_avail = (
+ self.client.nodes.stats()['nodes'][node_id]['fs']['total']['available_in_bytes']
+ )
+
+ def most_available_node(self):
+ """
+ Determine which data node name has the most available free space, and
+ meets the other node filters settings.
+
+ :arg client: An :class:`elasticsearch.Elasticsearch` client object
+ """
+ mvn_avail = 0
+ # mvn_total = 0
+ mvn_name = None
+ mvn_id = None
+ nodes = self.client.nodes.stats()['nodes']
+ for node_id in nodes:
+ name = self.client.nodes.stats()['nodes'][node_id]['name']
+ if self._exclude_node(name):
+ self.loggit.debug('Node "{0}" excluded by node filters'.format(name))
+ continue
+ if not self._data_node(node_id):
+ self.loggit.debug('Node "{0}" is not a data node'.format(name))
+ continue
+ if not single_data_path(self.client, node_id):
+ self.loggit.info('Node "{0}" has multiple data paths and will not be used for shrink operations.')
+ continue
+ value = nodes[node_id]['fs']['total']['available_in_bytes']
+ if value > mvn_avail:
+ mvn_name = name
+ mvn_id = node_id
+ mvn_avail = value
+ # mvn_total = nodes[node_id]['fs']['total']['total_in_bytes']
+ self.shrink_node_name = mvn_name
+ self.shrink_node_id = mvn_id
+ self.shrink_node_avail = mvn_avail
+ # self.shrink_node_total = mvn_total
+
+ def route_index(self, idx, allocation_type, key, value):
+ bkey = 'index.routing.allocation.{0}.{1}'.format(allocation_type, key)
+ routing = { bkey : value }
+ try:
+ self.client.indices.put_settings(index=idx, body=routing)
+ wait_for_it(self.client, 'allocation', wait_interval=self.wait_interval, max_wait=self.max_wait)
+ except Exception as e:
+ report_failure(e)
+
+ def __log_action(self, error_msg, dry_run=False):
+ if not dry_run:
+ raise ActionError(error_msg)
+ else:
+ self.loggit.warn('DRY-RUN: {0}'.format(error_msg))
+
+ def _block_writes(self, idx):
+ block = { 'index.blocks.write': True }
+ self.client.indices.put_settings(index=idx, body=block)
+
+ def _unblock_writes(self, idx):
+ unblock = { 'index.blocks.write': False }
+ self.client.indices.put_settings(index=idx, body=unblock)
+
+ def _check_space(self, idx, dry_run=False):
+ # Disk watermark calculation is already baked into `available_in_bytes`
+ size = index_size(self.client, idx)
+ avail = self.shrink_node_avail
+ padded = (size * 2) + (32 * 1024)
+ if padded < self.shrink_node_avail:
+ self.loggit.debug('Sufficient space available for 2x the size of index "{0}". Required: {1}, available: {2}'.format(idx, padded, self.shrink_node_avail))
+ else:
+ error_msg = ('Insufficient space available for 2x the size of index "{0}", shrinking will exceed space available. Required: {1}, available: {2}'.format(idx, padded, self.shrink_node_avail))
+ self.__log_action(error_msg, dry_run)
+
+ def _check_node(self):
+ if self.shrink_node != 'DETERMINISTIC':
+ if not self.shrink_node_name:
+ self.qualify_single_node()
+ else:
+ self.most_available_node()
+ # At this point, we should have the three shrink-node identifying
+ # instance variables:
+ # - self.shrink_node_name
+ # - self.shrink_node_id
+ # - self.shrink_node_avail
+ # # - self.shrink_node_total - only if needed in the future
+
+ def _check_target_exists(self, idx, dry_run=False):
+ target = self._shrink_target(idx)
+ if self.client.indices.exists(target):
+ error_msg = 'Target index "{0}" already exists'.format(target)
+ self.__log_action(error_msg, dry_run)
+
+ def _check_doc_count(self, idx, dry_run=False):
+ max_docs = 2147483519
+ doc_count = self.client.indices.stats(idx)['indices'][idx]['primaries']['docs']['count']
+ if doc_count > (max_docs * self.number_of_shards):
+ error_msg = ('Too many documents ({0}) to fit in {1} shard(s). Maximum number of docs per shard is {2}'.format(doc_count, self.number_of_shards, max_docs))
+ self.__log_action(error_msg, dry_run)
+
+ def _check_shard_count(self, idx, src_shards, dry_run=False):
+ if self.number_of_shards >= src_shards:
+ error_msg = ('Target number of shards ({0}) must be less than current number of shards ({1}) in index "{2}"'.format(self.number_of_shards, src_shards, idx))
+ self.__log_action(error_msg, dry_run)
+
+ def _check_shard_factor(self, idx, src_shards, dry_run=False):
+ # Find the list of factors of src_shards
+ factors = [x for x in range(1,src_shards+1) if src_shards % x == 0]
+ # Pop the last one, because it will be the value of src_shards
+ factors.pop()
+ if not self.number_of_shards in factors:
+ error_msg = (
+ '"{0}" is not a valid factor of {1} shards. Valid values are '
+ '{2}'.format(self.number_of_shards, src_shards, factors)
+ )
+ self.__log_action(error_msg, dry_run)
+
+ def _check_all_shards(self, idx):
+ shards = self.client.cluster.state(index=idx)['routing_table']['indices'][idx]['shards']
+ found = []
+ for shardnum in shards:
+ for shard_idx in range(0, len(shards[shardnum])):
+ if shards[shardnum][shard_idx]['node'] == self.shrink_node_id:
+ found.append({'shard': shardnum, 'primary': shards[shardnum][shard_idx]['primary']})
+ if len(shards) != len(found):
+ self.loggit.debug('Found these shards on node "{0}": {1}'.format(self.shrink_node_name, found))
+ raise ActionError('Unable to shrink index "{0}" as not all shards were found on the designated shrink node ({1}): {2}'.format(idx, self.shrink_node_name, found))
+
+ def pre_shrink_check(self, idx, dry_run=False):
+ self.loggit.debug('BEGIN PRE_SHRINK_CHECK')
+ self.loggit.debug('Check that target exists')
+ self._check_target_exists(idx, dry_run)
+ self.loggit.debug('Check doc count constraints')
+ self._check_doc_count(idx, dry_run)
+ self.loggit.debug('Check shard count')
+ src_shards = int(self.client.indices.get(idx)[idx]['settings']['index']['number_of_shards'])
+ self._check_shard_count(idx, src_shards, dry_run)
+ self.loggit.debug('Check shard factor')
+ self._check_shard_factor(idx, src_shards, dry_run)
+ self.loggit.debug('Check node availability')
+ self._check_node()
+ self.loggit.debug('Check available disk space')
+ self._check_space(idx, dry_run)
+ self.loggit.debug('FINISH PRE_SHRINK_CHECK')
+
+ def do_dry_run(self):
+ """
+ Show what a regular run would do, but don't actually do it.
+ """
+ self.index_list.filter_closed()
+ self.index_list.empty_list_check()
+ try:
+ index_lists = chunk_index_list(self.index_list.indices)
+ for l in index_lists:
+ for idx in l: # Shrink can only be done one at a time...
+ target = self._shrink_target(idx)
+ self.pre_shrink_check(idx, dry_run=True)
+ self.loggit.info('DRY-RUN: Moving shards to shrink node: "{0}"'.format(self.shrink_node_name))
+ self.loggit.info('DRY-RUN: Shrinking index "{0}" to "{1}" with settings: {2}, wait_for_active_shards={3}'.format(idx, target, self.body, self.wait_for_active_shards))
+ if self.post_allocation:
+ self.loggit.info('DRY-RUN: Applying post-shrink allocation rule "{0}" to index "{1}"'.format('index.routing.allocation.{0}.{1}:{2}'.format(self.post_allocation['allocation_type'], self.post_allocation['key'], self.post_allocation['value']), target))
+ if self.delete_after:
+ self.loggit.info('DRY-RUN: Deleting source index "{0}"'.format(idx))
+ except Exception as e:
+ report_failure(e)
+
+ def do_action(self):
+ self.index_list.filter_closed()
+ self.index_list.empty_list_check()
+ try:
+ index_lists = chunk_index_list(self.index_list.indices)
+ for l in index_lists:
+ for idx in l: # Shrink can only be done one at a time...
+ target = self._shrink_target(idx)
+ self.loggit.info('Source index: {0} -- Target index: {1}'.format(idx, target))
+ # Pre-check ensures disk space available for each pass of the loop
+ self.pre_shrink_check(idx)
+ # Route the index to the shrink node
+ self.loggit.info('Moving shards to shrink node: "{0}"'.format(self.shrink_node_name))
+ self.route_index(idx, 'require', '_name', self.shrink_node_name)
+ # Ensure a copy of each shard is present
+ self._check_all_shards(idx)
+ # Block writes on index
+ self._block_writes(idx)
+ # Do final health check
+ if not health_check(self.client, status='green'):
+ raise ActionError('Unable to proceed with shrink action. Cluster health is not "green"')
+ # Do the shrink
+ self.loggit.info('Shrinking index "{0}" to "{1}" with settings: {2}, wait_for_active_shards={3}'.format(idx, target, self.body, self.wait_for_active_shards))
+ self.client.indices.shrink(index=idx, target=target, body=self.body, wait_for_active_shards=self.wait_for_active_shards)
+ # Wait for it to complete
+ if self.wfc:
+ self.loggit.debug('Wait for shards to complete allocation for index: {0}'.format(target))
+ wait_for_it(self.client, 'shrink', wait_interval=self.wait_interval, max_wait=self.max_wait)
+ self.loggit.info('Index "{0}" successfully shrunk to "{1}"'.format(idx, target))
+ # Do post-shrink steps
+ # Unblock writes on index (just in case)
+ self._unblock_writes(idx)
+ ## Post-allocation, if enabled
+ if self.post_allocation:
+ self.loggit.info('Applying post-shrink allocation rule "{0}" to index "{1}"'.format('index.routing.allocation.{0}.{1}:{2}'.format(self.post_allocation['allocation_type'], self.post_allocation['key'], self.post_allocation['value']), target))
+ self.route_index(target, self.post_allocation['allocation_type'], self.post_allocation['key'], self.post_allocation['value'])
+ ## Delete, if flagged
+ if self.delete_after:
+ self.loggit.info('Deleting source index "{0}"'.format(idx))
+ self.client.indices.delete(index=idx)
+ else: # Let's unset the routing we applied here.
+ self.loggit.info('Unassigning routing for source index: "{0}"'.format(idx))
+ self.route_index(idx, 'require', '_name', '')
+
+ except Exception as e:
+ # Just in case it fails after attempting to meet this condition
+ self._unblock_writes(idx)
+ report_failure(e)
+
diff --git a/curator/cli.py b/curator/cli.py
index 93d60b7..86fbff5 100644
--- a/curator/cli.py
+++ b/curator/cli.py
@@ -29,6 +29,7 @@ CLASS_MAP = {
'restore' : Restore,
'rollover' : Rollover,
'snapshot' : Snapshot,
+ 'shrink' : Shrink,
}
def process_action(client, config, **kwargs):
@@ -98,19 +99,9 @@ def process_action(client, config, **kwargs):
logger.debug('Doing the action here.')
action_obj.do_action()
- at click.command()
- at click.option('--config',
- help="Path to configuration file. Default: ~/.curator/curator.yml",
- type=click.Path(exists=True), default=settings.config_file()
-)
- at click.option('--dry-run', is_flag=True, help='Do not perform any changes.')
- at click.argument('action_file', type=click.Path(exists=True), nargs=1)
- at click.version_option(version=__version__)
-def cli(config, dry_run, action_file):
+def run(config, action_file, dry_run=False):
"""
- Curator for Elasticsearch indices.
-
- See http://elastic.co/guide/en/elasticsearch/client/curator/current
+ Actually run.
"""
client_args = process_config(config)
logger = logging.getLogger(__name__)
@@ -202,3 +193,19 @@ def cli(config, dry_run, action_file):
sys.exit(1)
logger.info('Action ID: {0}, "{1}" completed.'.format(idx, action))
logger.info('Job completed.')
+
+ at click.command()
+ at click.option('--config',
+ help="Path to configuration file. Default: ~/.curator/curator.yml",
+ type=click.Path(exists=True), default=settings.config_file()
+)
+ at click.option('--dry-run', is_flag=True, help='Do not perform any changes.')
+ at click.argument('action_file', type=click.Path(exists=True), nargs=1)
+ at click.version_option(version=__version__)
+def cli(config, dry_run, action_file):
+ """
+ Curator for Elasticsearch indices.
+
+ See http://elastic.co/guide/en/elasticsearch/client/curator/current
+ """
+ run(config, action_file, dry_run)
\ No newline at end of file
diff --git a/curator/defaults/filter_elements.py b/curator/defaults/filter_elements.py
index f7acc3e..650b823 100644
--- a/curator/defaults/filter_elements.py
+++ b/curator/defaults/filter_elements.py
@@ -122,6 +122,12 @@ def unit_count(**kwargs):
# filtertype if use_age is set to True.
return { Required('unit_count'): Coerce(int) }
+def unit_count_pattern(**kwargs):
+ # This setting is used with the age filtertype to define, whether
+ # the unit_count value is taken from the configuration or read from
+ # the index name via a regular expression
+ return { Optional('unit_count_pattern'): Any(str, unicode) }
+
def use_age(**kwargs):
# Use of this setting requires the additional setting, source.
return { Optional('use_age', default=False): Any(bool, All(Any(str, unicode), Boolean())) }
diff --git a/curator/defaults/filtertypes.py b/curator/defaults/filtertypes.py
index cc16d17..e75f780 100644
--- a/curator/defaults/filtertypes.py
+++ b/curator/defaults/filtertypes.py
@@ -48,6 +48,7 @@ def age(action, config):
filter_elements.direction(),
filter_elements.unit(),
filter_elements.unit_count(),
+ filter_elements.unit_count_pattern(),
filter_elements.epoch(),
filter_elements.exclude(),
]
diff --git a/curator/defaults/option_defaults.py b/curator/defaults/option_defaults.py
index e335db0..13eac1c 100644
--- a/curator/defaults/option_defaults.py
+++ b/curator/defaults/option_defaults.py
@@ -27,6 +27,9 @@ def delay():
)
}
+def delete_after():
+ return { Optional('delete_after', default=True): Any(bool, All(Any(str, unicode), Boolean())) }
+
def delete_aliases():
return { Optional('delete_aliases', default=False): Any(bool, All(Any(str, unicode), Boolean())) }
@@ -70,7 +73,7 @@ def max_wait(action):
value = -1
# if action in ['allocation', 'cluster_routing', 'replicas']:
# value = -1
- # elif action in ['restore', 'snapshot', 'reindex']:
+ # elif action in ['restore', 'snapshot', 'reindex', 'shrink']:
# value = -1
return { Optional('max_wait', default=value): Any(-1, Coerce(int), None) }
@@ -93,9 +96,32 @@ def name(action):
def new_index():
return { Optional('new_index', default=None): Any(str, unicode) }
+def node_filters():
+ return {
+ Optional('node_filters', default={}): {
+ Optional('permit_masters', default=False): Any(bool, All(Any(str, unicode), Boolean())),
+ Optional('exclude_nodes', default=[]): Any(list, None)
+ }
+ }
+
+def number_of_replicas():
+ return { Optional('number_of_replicas', default=1): All(Coerce(int), Range(min=0, max=10)) }
+
+def number_of_shards():
+ return { Optional('number_of_shards', default=1): All(Coerce(int), Range(min=1, max=99)) }
+
def partial():
return { Optional('partial', default=False): Any(bool, All(Any(str, unicode), Boolean())) }
+def post_allocation():
+ return {
+ Optional('post_allocation', default={}): {
+ Required('allocation_type', default='require'): All(Any(str, unicode), Any('require', 'include', 'exclude')),
+ Required('key'): Any(str, unicode),
+ Required('value', default=None): Any(str, unicode, None)
+ }
+ }
+
def preserve_existing():
return { Optional('preserve_existing', default=False): Any(bool, All(Any(str, unicode), Boolean())) }
@@ -164,6 +190,7 @@ def request_body():
Optional('socket_timeout'): Any(str, unicode),
Optional('connect_timeout'): Any(str, unicode),
},
+ Optional('size'): Coerce(int),
Optional('type'): Any(Any(str, unicode), list),
Optional('query'): dict,
Optional('sort'): dict,
@@ -213,6 +240,15 @@ def cluster_routing_value():
)
}
+def shrink_node():
+ return { Required('shrink_node'): Any(str, unicode) }
+
+def shrink_prefix():
+ return { Optional('shrink_prefix', default=''): Any(str, unicode, None) }
+
+def shrink_suffix():
+ return { Optional('shrink_suffix', default='-shrink'): Any(str, unicode, None) }
+
def skip_repo_fs_check():
return { Optional('skip_repo_fs_check', default=False): Any(bool, All(Any(str, unicode), Boolean())) }
@@ -243,11 +279,11 @@ def value():
def wait_for_active_shards(action):
value = 0
- if action == 'reindex':
+ if action in ['reindex', 'shrink']:
value = 1
return {
Optional('wait_for_active_shards', default=value): Any(
- Coerce(int), None)
+ Coerce(int), 'all', None)
}
def wait_for_completion(action):
@@ -262,7 +298,7 @@ def wait_interval(action):
maxval = 30
# if action in ['allocation', 'cluster_routing', 'replicas']:
value = 3
- if action in ['restore', 'snapshot', 'reindex']:
+ if action in ['restore', 'snapshot', 'reindex', 'shrink']:
value = 9
return { Optional('wait_interval', default=value): Any(All(
Coerce(int), Range(min=minval, max=maxval)), None) }
diff --git a/curator/defaults/settings.py b/curator/defaults/settings.py
index 8e618cb..b94e7fb 100644
--- a/curator/defaults/settings.py
+++ b/curator/defaults/settings.py
@@ -54,6 +54,7 @@ def index_actions():
'reindex',
'replicas',
'rollover',
+ 'shrink',
'snapshot',
]
@@ -118,6 +119,7 @@ def structural_filter_elements():
Optional('timestring'): Any(str, unicode, None),
Optional('unit'): Any(str, unicode),
Optional('unit_count'): Coerce(int),
+ Optional('unit_count_pattern'): Any(str, unicode),
Optional('use_age'): Boolean(),
Optional('value'): Any(int, float, str, unicode, bool),
Optional('week_starts_on'): Any(str, unicode, None),
diff --git a/curator/indexlist.py b/curator/indexlist.py
index 3f82b5d..5c0c246 100644
--- a/curator/indexlist.py
+++ b/curator/indexlist.py
@@ -390,7 +390,7 @@ class IndexList(object):
def filter_by_age(self, source='name', direction=None, timestring=None,
unit=None, unit_count=None, field=None, stats_result='min_value',
- epoch=None, exclude=False,
+ epoch=None, exclude=False, unit_count_pattern=False
):
"""
Match `indices` by relative age calculations.
@@ -404,6 +404,8 @@ class IndexList(object):
``weeks``, ``months``, or ``years``.
:arg unit_count: The number of ``unit`` (s). ``unit_count`` * ``unit`` will
be calculated out to the relative number of seconds.
+ :arg unit_count_pattern: A regular expression whose capture group identifies
+ the value for ``unit_count``.
:arg field: A timestamp field name. Only used for ``field_stats`` based
calculations.
:arg stats_result: Either `min_value` or `max_value`. Only used in
@@ -431,6 +433,12 @@ class IndexList(object):
source=source, timestring=timestring, field=field,
stats_result=stats_result
)
+ if unit_count_pattern:
+ try:
+ unit_count_matcher = re.compile(unit_count_pattern)
+ except:
+ # We got an illegal regex, so won't be able to match anything
+ unit_count_matcher = None
for index in self.working_list():
try:
age = int(self.index_info[index]['age'][self.age_keyfield])
@@ -445,10 +453,30 @@ class IndexList(object):
)
# Because time adds to epoch, smaller numbers are actually older
# timestamps.
+ if unit_count_pattern:
+ self.loggit.debug('Unit_count_pattern is set, trying to match pattern to index "{0}"'.format(index))
+ unit_count_from_index = get_unit_count_from_name(index, unit_count_matcher)
+ if unit_count_from_index:
+ self.loggit.debug('Pattern matched, applying unit_count of "{0}"'.format(unit_count_from_index))
+ adjustedPoR = get_point_of_reference(unit, unit_count_from_index, epoch)
+ test = 0
+ elif unit_count == -1:
+ # Unable to match pattern and unit_count is -1, meaning no fallback, so this
+ # index is removed from the list
+ self.loggit.debug('Unable to match pattern and no fallback value set. Removing index "{0}" from actionable list'.format(index))
+ exclude = True
+ adjustedPoR = PoR # necessary to avoid exception if the first index is excluded
+ else:
+ # Unable to match the pattern and unit_count is set, so fall back to using unit_count
+ # for determining whether to keep this index in the list
+ self.loggit.debug('Unable to match pattern using fallback value of "{0}"'.format(unit_count))
+ adjustedPoR = PoR
+ else:
+ adjustedPoR = PoR
if direction == 'older':
- agetest = age < PoR
+ agetest = age < adjustedPoR
else:
- agetest = age > PoR
+ agetest = age > adjustedPoR
self.__excludify(agetest, exclude, index, msg)
except KeyError:
self.loggit.debug(
diff --git a/curator/utils.py b/curator/utils.py
index c6f3ddd..e0127b7 100644
--- a/curator/utils.py
+++ b/curator/utils.py
@@ -348,7 +348,19 @@ def get_point_of_reference(unit, count, epoch=None):
epoch = time.time()
epoch = fix_epoch(epoch)
return epoch - multiplier * count
-
+
+def get_unit_count_from_name(index_name, pattern):
+ if (pattern == None):
+ return None
+ match = pattern.search(index_name)
+ if match:
+ try:
+ return int(match.group(1))
+ except Exception:
+ return None
+ else:
+ return None
+
def date_range(unit, range_from, range_to, epoch=None, week_starts_on='sunday'):
"""
Get the epoch start time and end time of a range of ``unit``s, reckoning the
@@ -610,12 +622,12 @@ def check_version(client):
logger.error(
'Elasticsearch version {0} incompatible '
'with this version of Curator '
- '({0})'.format(".".join(map(str,version_number)), __version__)
+ '({1})'.format(".".join(map(str,version_number)), __version__)
)
raise CuratorException(
'Elasticsearch version {0} incompatible '
'with this version of Curator '
- '({0})'.format(".".join(map(str,version_number)), __version__)
+ '({1})'.format(".".join(map(str,version_number)), __version__)
)
def check_master(client, master_only=False):
@@ -1565,6 +1577,10 @@ def wait_for_it(
'function':task_check,
'args':{'task_id':task_id},
},
+ 'shrink':{
+ 'function': health_check,
+ 'args': {'status':'green'},
+ },
}
wait_actions = list(action_map.keys())
@@ -1622,8 +1638,8 @@ def wait_for_it(
else:
logger.debug(
'Action "{0}" not yet complete, {1} total seconds elapsed. '
- 'Waiting {1} seconds before checking '
- 'again.'.format(action, wait_interval))
+ 'Waiting {2} seconds before checking '
+ 'again.'.format(action, elapsed, wait_interval))
time.sleep(wait_interval)
logger.debug('Result: {0}'.format(result))
@@ -1631,4 +1647,59 @@ def wait_for_it(
raise ActionTimeout(
'Action "{0}" failed to complete in the max_wait period of '
'{1} seconds'.format(action, max_wait)
- )
\ No newline at end of file
+ )
+
+def node_roles(client, node_id):
+ """
+ Return the list of roles assigned to the node identified by ``node_id``
+
+ :arg client: An :class:`elasticsearch.Elasticsearch` client object
+ :rtype: list
+ """
+ return client.nodes.info()['nodes'][node_id]['roles']
+
+def index_size(client, idx):
+ return client.indices.stats(index=idx)['indices'][idx]['total']['store']['size_in_bytes']
+
+def single_data_path(client, node_id):
+ """
+ In order for a shrink to work, it should be on a single filesystem, as
+ shards cannot span filesystems. Return `True` if the node has a single
+ filesystem, and `False` otherwise.
+
+ :arg client: An :class:`elasticsearch.Elasticsearch` client object
+ :rtype: bool
+ """
+ return len(client.nodes.stats()['nodes'][node_id]['fs']['data']) == 1
+
+
+def name_to_node_id(client, name):
+ """
+ Return the node_id of the node identified by ``name``
+
+ :arg client: An :class:`elasticsearch.Elasticsearch` client object
+ :rtype: str
+ """
+ stats = client.nodes.stats()
+ for node in stats['nodes']:
+ if stats['nodes'][node]['name'] == name:
+ logger.debug('Found node_id "{0}" for name "{1}".'.format(node, name))
+ return node
+ logger.error('No node_id found matching name: "{0}"'.format(name))
+ return None
+
+def node_id_to_name(client, node_id):
+ """
+ Return the name of the node identified by ``node_id``
+
+ :arg client: An :class:`elasticsearch.Elasticsearch` client object
+ :rtype: str
+ """
+ stats = client.nodes.stats()
+ name = None
+ if node_id in stats['nodes']:
+ name = stats['nodes'][node_id]['name']
+ else:
+ logger.error('No node_id found matching: "{0}"'.format(node_id))
+ logger.debug('Name associated with node_id "{0}": {1}'.format(node_id, name))
+ return name
diff --git a/curator/validators/options.py b/curator/validators/options.py
index a8ca520..d516a23 100644
--- a/curator/validators/options.py
+++ b/curator/validators/options.py
@@ -108,6 +108,21 @@ def action_specific(action):
option_defaults.max_wait(action),
option_defaults.skip_repo_fs_check(),
],
+ 'shrink' : [
+ option_defaults.shrink_node(),
+ option_defaults.node_filters(),
+ option_defaults.number_of_shards(),
+ option_defaults.number_of_replicas(),
+ option_defaults.shrink_prefix(),
+ option_defaults.shrink_suffix(),
+ option_defaults.delete_after(),
+ option_defaults.post_allocation(),
+ option_defaults.wait_for_active_shards(action),
+ option_defaults.extra_settings(),
+ option_defaults.wait_for_completion(action),
+ option_defaults.wait_interval(action),
+ option_defaults.max_wait(action),
+ ],
}
return options[action]
diff --git a/docs/Changelog.rst b/docs/Changelog.rst
index c57199b..b9d66f3 100644
--- a/docs/Changelog.rst
+++ b/docs/Changelog.rst
@@ -3,6 +3,38 @@
Changelog
=========
+5.2.0 (1 September 2017)
+------------------------
+
+**New Features**
+
+ * Shrink action! Apologies to all who have patiently waited for this
+ feature. It's been a long time coming, but it is hopefully worth the
+ wait. There are a lot of checks and tests associated with this action,
+ as there are many conditions that have to be met in order for a shrink
+ to take place. Curator will try its best to ensure that all of these
+ conditions are met so you can comfortably rest assured that shrink will
+ work properly unattended. See the documentation for more information.
+ * The ``cli`` function has been split into ``cli`` and ``run`` functions.
+ The behavior of ``cli`` will be indistinguishable from previous releases,
+ preserving API integrity. The new ``run`` function allows lambda and other
+ users to `run` Curator from the API with only a client configuration file
+ and action file as arguments. Requested in #1031 (untergeek)
+ * Allow use of time/date string interpolation for Rollover index naming.
+ Added in #1010 (tschroeder-zendesk)
+ * New ``unit_count_pattern`` allows you to derive the ``unit_count`` from
+ the index name itself. This involves regular expressions, so be sure to
+ do lots of testing in ``--dry-run`` mode before deploying to production.
+ Added by (soenkeliebau) in #997
+
+**Bug Fixes**
+
+ * Reindex ``request_body`` allows for 2 different ``size`` options. One
+ limits the number of documents reindexed. The other is for batch sizing.
+ The batch sizing option was missing from the schema validator. This has
+ been corrected. Reported in #1038 (untergeek)
+ * A few sundry logging and notification changes were made.
+
5.1.2 (08 August 2017)
----------------------
@@ -51,8 +83,13 @@ Changelog
* Fix/improve rST API documentation.
* Thanks to many users who not only found and reported documentation issues,
but also submitted corrections.
+<<<<<<< HEAD
+=======
+
+>>>>>>> master
5.1.1 (8 June 2017)
+-------------------
**Bug Fixes**
@@ -60,6 +97,7 @@ Changelog
I reverted the string-based comparison as before.
5.1.0 (8 June 2017)
+-------------------
**New Features**
@@ -89,6 +127,7 @@ Changelog
to reflect this.
5.0.4 (16 May 2017)
+-------------------
**Bug Fixes**
@@ -97,6 +136,7 @@ Changelog
#966. (untergeek)
5.0.3 (15 May 2017)
+-------------------
**Bug Fixes**
@@ -106,6 +146,7 @@ Changelog
Reported in #962. (untergeek)
5.0.2 (4 May 2017)
+------------------
**Bug Fixes**
@@ -126,6 +167,7 @@ Changelog
* Document how to include file paths better. Fixes #944. (untergeek)
5.0.1 (10 April 2017)
+---------------------
**Bug Fixes**
@@ -139,6 +181,7 @@ Changelog
* Address date matching behavior better per #858 (untergeek)
5.0.0 (5 April 2017)
+--------------------
The full feature set of 5.0 (including alpha releases) is included here.
diff --git a/docs/actionclasses.rst b/docs/actionclasses.rst
index 7d5766d..24d7b2a 100644
--- a/docs/actionclasses.rst
+++ b/docs/actionclasses.rst
@@ -11,11 +11,17 @@ Action Classes
* `Allocation`_
* `Close`_
* `ClusterRouting`_
+* `CreateIndex`_
* `DeleteIndices`_
* `DeleteSnapshots`_
* `ForceMerge`_
+* `IndexSettings`_
* `Open`_
+* `Reindex`_
* `Replicas`_
+* `Restore`_
+* `Rollover`_
+* `Shrink`_
* `Snapshot`_
@@ -39,6 +45,11 @@ ClusterRouting
.. autoclass:: curator.actions.ClusterRouting
:members:
+CreateIndex
+--------------
+.. autoclass:: curator.actions.CreateIndex
+ :members:
+
DeleteIndices
-------------
... 1110 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/elasticsearch-curator.git
More information about the Python-modules-commits
mailing list