[Python-modules-commits] [elasticsearch-curator] 10/18: Import elasticsearch-curator_5.2.0.orig.tar.gz

Apollon Oikonomopoulos apoikos at moszumanska.debian.org
Tue Oct 17 08:17:16 UTC 2017


This is an automated email from the git hooks/post-receive script.

apoikos pushed a commit to branch master
in repository elasticsearch-curator.

commit ea25a657dc671aa8c45963ae04c6ed6daa1da5df
Author: Apollon Oikonomopoulos <apoikos at debian.org>
Date:   Tue Oct 17 10:56:17 2017 +0300

    Import elasticsearch-curator_5.2.0.orig.tar.gz
---
 .travis.yml                             |   2 +-
 CONTRIBUTORS                            |   4 +
 curator/_version.py                     |   2 +-
 curator/actions.py                      | 354 +++++++++++++++++++++++++++++++-
 curator/cli.py                          |  31 +--
 curator/defaults/filter_elements.py     |   6 +
 curator/defaults/filtertypes.py         |   1 +
 curator/defaults/option_defaults.py     |  44 +++-
 curator/defaults/settings.py            |   2 +
 curator/indexlist.py                    |  34 ++-
 curator/utils.py                        |  83 +++++++-
 curator/validators/options.py           |  15 ++
 docs/Changelog.rst                      |  43 ++++
 docs/actionclasses.rst                  |  31 ++-
 docs/asciidoc/actions.asciidoc          | 112 ++++++++++
 docs/asciidoc/examples.asciidoc         |  59 +++++-
 docs/asciidoc/filter_elements.asciidoc  |  86 ++++++++
 docs/asciidoc/filters.asciidoc          |   1 +
 docs/asciidoc/inc_kinds.asciidoc        |   2 +-
 docs/asciidoc/index.asciidoc            |   4 +-
 docs/asciidoc/options.asciidoc          | 289 ++++++++++++++++++++++++--
 test/integration/test_delete_indices.py | 164 +++++++++++++++
 test/integration/test_rollover.py       |  52 ++++-
 test/integration/testvars.py            |  17 ++
 test/unit/test_utils.py                 |  42 +++-
 test/unit/testvars.py                   |  11 +-
 26 files changed, 1438 insertions(+), 53 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index da97633..8b7997d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -12,7 +12,7 @@ env:
   - ES_VERSION=5.2.2
   - ES_VERSION=5.3.3
   - ES_VERSION=5.4.3
-  - ES_VERSION=5.5.1
+  - ES_VERSION=5.5.2
 
 os: linux
 
diff --git a/CONTRIBUTORS b/CONTRIBUTORS
index 1b9334f..d2ebbd7 100644
--- a/CONTRIBUTORS
+++ b/CONTRIBUTORS
@@ -79,3 +79,7 @@ Contributors:
 * (dtrv)
 * Christopher "Chief" Najewicz (chiefy)
 * Filipe Gonçalves (basex)
+* Sönke Liebau (soenkeliebau)
+* Timothy Schroder (tschroeder-zendesk)
+* Jared Carey (jpcarey)
+* Juraj Seffer (jurajseffer)
\ No newline at end of file
diff --git a/curator/_version.py b/curator/_version.py
index 1c66eec..9b2ff6c 100644
--- a/curator/_version.py
+++ b/curator/_version.py
@@ -1,2 +1,2 @@
-__version__ = '5.1.2'
+__version__ = '5.2.0'
 
diff --git a/curator/actions.py b/curator/actions.py
index 013f603..312d9e8 100644
--- a/curator/actions.py
+++ b/curator/actions.py
@@ -863,7 +863,7 @@ class Rollover(object):
         self.settings   = extra_settings
         #: Instance variable.
         #: Internal reference to `new_index`
-        self.new_index = new_index
+        self.new_index = parse_date_pattern(new_index) if new_index else new_index
         #: Instance variable.
         #: Internal reference to `wait_for_active_shards`
         self.wait_for_active_shards = wait_for_active_shards
@@ -1744,3 +1744,355 @@ class Restore(object):
                 )
         except Exception as e:
             report_failure(e)
+
+class Shrink(object):
+    def __init__(self, ilo, shrink_node='DETERMINISTIC', node_filters={},
+                number_of_shards=1, number_of_replicas=1,
+                shrink_prefix='', shrink_suffix='-shrink',
+                delete_after=True, post_allocation={},
+                wait_for_active_shards=1,
+                extra_settings={}, wait_for_completion=True, wait_interval=9,
+                max_wait=-1):
+        """
+        :arg ilo: A :class:`curator.indexlist.IndexList` object
+        :arg shrink_node: The node name to use as the shrink target, or
+            ``DETERMINISTIC``, which will use the values in ``node_filters`` to
+            determine which node will be the shrink node.
+        :arg node_filters: If the value of ``shrink_node`` is ``DETERMINISTIC``, 
+            the values in ``node_filters`` will be used while determining which 
+            node to allocate the shards on before performing the shrink.
+        :type node_filters: dict, representing the filters
+        :arg number_of_shards: The number of shards the shrunk index should have
+        :arg number_of_replicas: The number of replicas for the shrunk index
+        :arg shrink_prefix: Prepend the shrunk index with this value
+        :arg shrink_suffix: Append the value to the shrunk index (default: `-shrink`)
+        :arg delete_after: Whether to delete each index after shrinking. (default: `True`)
+        :type delete_after: bool
+        :arg post_allocation: If populated, the `allocation_type`, `key`, and 
+            `value` will be applied to the shrunk index to re-route it.
+        :type post_allocation: dict, with keys `allocation_type`, `key`, and `value`
+        :arg wait_for_active_shards: The number of shards expected to be active before returning.
+        :arg extra_settings:  Permitted root keys are `settings` and `aliases`.  
+            See https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-shrink-index.html
+        :type extra_settings: dict
+        :arg wait_for_active_shards: Wait for active shards before returning.
+        :arg wait_for_completion: Wait (or not) for the operation
+            to complete before returning.  You should not normally change this,
+            ever. (default: `True`)
+        :arg wait_interval: How long in seconds to wait between checks for
+            completion.
+        :arg max_wait: Maximum number of seconds to `wait_for_completion`
+        :type wait_for_completion: bool
+        """
+        self.loggit = logging.getLogger('curator.actions.shrink')
+        verify_index_list(ilo)
+        if not 'permit_masters' in node_filters:
+            node_filters['permit_masters'] = False
+        #: Instance variable. The Elasticsearch Client object derived from `ilo`
+        self.client           = ilo.client
+        #: Instance variable. Internal reference to `ilo`
+        self.index_list       = ilo
+        #: Instance variable. Internal reference to `shrink_node`
+        self.shrink_node      = shrink_node
+        #: Instance variable. Internal reference to `node_filters`
+        self.node_filters     = node_filters
+        #: Instance variable. Internal reference to `shrink_prefix`
+        self.shrink_prefix    = shrink_prefix
+        #: Instance variable. Internal reference to `shrink_suffix`
+        self.shrink_suffix    = shrink_suffix
+        #: Instance variable. Internal reference to `delete_after`
+        self.delete_after     = delete_after
+        #: Instance variable. Internal reference to `post_allocation`
+        self.post_allocation  = post_allocation
+        #: Instance variable. Internal reference to `wait_for_completion`
+        self.wfc              = wait_for_completion
+        #: Instance variable. How many seconds to wait between checks for completion.
+        self.wait_interval    = wait_interval
+        #: Instance variable. How long in seconds to `wait_for_completion` before returning with an exception. A value of -1 means wait forever.
+        self.max_wait         = max_wait
+        #: Instance variable. Internal reference to `number_of_shards`
+        self.number_of_shards = number_of_shards
+        self.wait_for_active_shards = wait_for_active_shards
+        self.shrink_node_name = None
+        self.body = {
+            'settings': {
+                'index.number_of_shards' : number_of_shards,
+                'index.number_of_replicas' : number_of_replicas,
+            }
+        }
+        if extra_settings:
+            self._merge_extra_settings(extra_settings)
+
+    def _merge_extra_settings(self, extra_settings):
+        self.loggit.debug(
+            'Adding extra_settings to shrink body: '
+            '{0}'.format(extra_settings)
+        )
+        # Pop these here, otherwise we could overwrite our default number of
+        # shards and replicas
+        if 'settings' in extra_settings:
+            settings = extra_settings.pop('settings')
+            try:
+                self.body['settings'].update(settings) 
+            except Exception as e:
+                raise ConfigurationError('Unable to apply extra settings "{0}" to shrink body'.format({'settings':settings}))
+        if extra_settings:
+            try: # Apply any remaining keys, should there be any.
+                self.body.update(extra_settings)
+            except Exception as e:
+                raise ConfigurationError('Unable to apply extra settings "{0}" to shrink body'.format(extra_settings))
+
+    def _data_node(self, node_id):
+        roles = node_roles(self.client, node_id)
+        name = node_id_to_name(self.client, node_id)
+        if not 'data' in roles:
+            self.loggit.info('Skipping node "{0}": non-data node'.format(name))
+            return False
+        if 'master' in roles and not self.node_filters['permit_masters']:
+            self.loggit.info('Skipping node "{0}": master node'.format(name))
+            return False
+        elif 'master' in roles and self.node_filters['permit_masters']:
+            self.loggit.warn('Not skipping node "{0}" which is a master node (not recommended), but permit_masters is True'.format(name))
+            return True
+        else: # It does have `data` as a role.
+            return True
+
+    def _exclude_node(self, name):
+        if 'exclude_nodes' in self.node_filters:
+            if name in self.node_filters['exclude_nodes']:
+                self.loggit.info('Excluding node "{0}" due to node_filters'.format(name))
+                return True
+        return False
+
+    def _shrink_target(self, name):
+        return '{0}{1}{2}'.format(self.shrink_prefix, name, self.shrink_suffix)
+
+    def qualify_single_node(self):
+        node_id = name_to_node_id(self.client, self.shrink_node)
+        if node_id:
+            self.shrink_node_id   = node_id
+            self.shrink_node_name = self.shrink_node
+        else:
+            raise ConfigurationError('Unable to find node named: "{0}"'.format(self.shrink_node))
+        if self._exclude_node(self.shrink_node):
+            raise ConfigurationError('Node "{0}" listed for exclusion'.format(self.shrink_node))
+        if not self._data_node(node_id):
+            raise ActionError('Node "{0}" is not usable as a shrink node'.format(self.shrink_node))
+        if not single_data_path(self.client, node_id):
+            raise ActionError(
+                'Node "{0}" has multiple data paths and cannot be used '
+                'for shrink operations.'
+            )
+        self.shrink_node_avail = (
+            self.client.nodes.stats()['nodes'][node_id]['fs']['total']['available_in_bytes']
+        )
+
+    def most_available_node(self):
+        """
+        Determine which data node name has the most available free space, and 
+        meets the other node filters settings.
+
+        :arg client: An :class:`elasticsearch.Elasticsearch` client object
+        """
+        mvn_avail = 0
+        # mvn_total = 0
+        mvn_name = None
+        mvn_id = None
+        nodes = self.client.nodes.stats()['nodes']
+        for node_id in nodes:
+            name = self.client.nodes.stats()['nodes'][node_id]['name']
+            if self._exclude_node(name):
+                self.loggit.debug('Node "{0}" excluded by node filters'.format(name))
+                continue
+            if not self._data_node(node_id):
+                self.loggit.debug('Node "{0}" is not a data node'.format(name))
+                continue
+            if not single_data_path(self.client, node_id):
+                self.loggit.info('Node "{0}" has multiple data paths and will not be used for shrink operations.')
+                continue
+            value = nodes[node_id]['fs']['total']['available_in_bytes']
+            if value > mvn_avail:
+                mvn_name  = name
+                mvn_id    = node_id
+                mvn_avail = value
+                # mvn_total = nodes[node_id]['fs']['total']['total_in_bytes']
+        self.shrink_node_name  = mvn_name
+        self.shrink_node_id    = mvn_id
+        self.shrink_node_avail = mvn_avail
+        # self.shrink_node_total = mvn_total
+
+    def route_index(self, idx, allocation_type, key, value):
+        bkey = 'index.routing.allocation.{0}.{1}'.format(allocation_type, key)
+        routing = { bkey : value } 
+        try:
+            self.client.indices.put_settings(index=idx, body=routing)
+            wait_for_it(self.client, 'allocation', wait_interval=self.wait_interval, max_wait=self.max_wait)
+        except Exception as e:
+            report_failure(e)
+
+    def __log_action(self, error_msg, dry_run=False):
+        if not dry_run:
+            raise ActionError(error_msg)
+        else:
+            self.loggit.warn('DRY-RUN: {0}'.format(error_msg))
+
+    def _block_writes(self, idx):
+        block = { 'index.blocks.write': True }
+        self.client.indices.put_settings(index=idx, body=block)
+
+    def _unblock_writes(self, idx):
+        unblock = { 'index.blocks.write': False }
+        self.client.indices.put_settings(index=idx, body=unblock)
+
+    def _check_space(self, idx, dry_run=False):
+        # Disk watermark calculation is already baked into `available_in_bytes`
+        size = index_size(self.client, idx)
+        avail = self.shrink_node_avail
+        padded = (size * 2) + (32 * 1024)
+        if padded < self.shrink_node_avail:
+            self.loggit.debug('Sufficient space available for 2x the size of index "{0}".  Required: {1}, available: {2}'.format(idx, padded, self.shrink_node_avail))
+        else:
+            error_msg = ('Insufficient space available for 2x the size of index "{0}", shrinking will exceed space available. Required: {1}, available: {2}'.format(idx, padded, self.shrink_node_avail))
+            self.__log_action(error_msg, dry_run)
+
+    def _check_node(self):
+        if self.shrink_node != 'DETERMINISTIC':
+            if not self.shrink_node_name:
+                self.qualify_single_node()
+        else:
+            self.most_available_node()
+        # At this point, we should have the three shrink-node identifying 
+        # instance variables:
+        # - self.shrink_node_name
+        # - self.shrink_node_id
+        # - self.shrink_node_avail
+        # # - self.shrink_node_total - only if needed in the future
+
+    def _check_target_exists(self, idx, dry_run=False):
+        target = self._shrink_target(idx)
+        if self.client.indices.exists(target):
+            error_msg = 'Target index "{0}" already exists'.format(target)
+            self.__log_action(error_msg, dry_run)
+
+    def _check_doc_count(self, idx, dry_run=False):
+        max_docs = 2147483519
+        doc_count = self.client.indices.stats(idx)['indices'][idx]['primaries']['docs']['count']
+        if doc_count > (max_docs * self.number_of_shards):
+            error_msg = ('Too many documents ({0}) to fit in {1} shard(s). Maximum number of docs per shard is {2}'.format(doc_count, self.number_of_shards, max_docs))
+            self.__log_action(error_msg, dry_run)
+
+    def _check_shard_count(self, idx, src_shards, dry_run=False):
+        if self.number_of_shards >= src_shards:
+            error_msg = ('Target number of shards ({0}) must be less than current number of shards ({1}) in index "{2}"'.format(self.number_of_shards, src_shards, idx))
+            self.__log_action(error_msg, dry_run)
+
+    def _check_shard_factor(self, idx, src_shards, dry_run=False):
+        # Find the list of factors of src_shards
+        factors = [x for x in range(1,src_shards+1) if src_shards % x == 0]
+        # Pop the last one, because it will be the value of src_shards
+        factors.pop()
+        if not self.number_of_shards in factors:
+            error_msg = (
+                '"{0}" is not a valid factor of {1} shards.  Valid values are '
+                '{2}'.format(self.number_of_shards, src_shards, factors)
+            )
+            self.__log_action(error_msg, dry_run)
+
+    def _check_all_shards(self, idx):
+        shards = self.client.cluster.state(index=idx)['routing_table']['indices'][idx]['shards']
+        found = []
+        for shardnum in shards:
+            for shard_idx in range(0, len(shards[shardnum])):
+                if shards[shardnum][shard_idx]['node'] == self.shrink_node_id:
+                    found.append({'shard': shardnum, 'primary': shards[shardnum][shard_idx]['primary']})
+        if len(shards) != len(found):
+            self.loggit.debug('Found these shards on node "{0}": {1}'.format(self.shrink_node_name, found))
+            raise ActionError('Unable to shrink index "{0}" as not all shards were found on the designated shrink node ({1}): {2}'.format(idx, self.shrink_node_name, found))
+
+    def pre_shrink_check(self, idx, dry_run=False):
+        self.loggit.debug('BEGIN PRE_SHRINK_CHECK')
+        self.loggit.debug('Check that target exists')
+        self._check_target_exists(idx, dry_run)
+        self.loggit.debug('Check doc count constraints')
+        self._check_doc_count(idx, dry_run)
+        self.loggit.debug('Check shard count')
+        src_shards = int(self.client.indices.get(idx)[idx]['settings']['index']['number_of_shards'])
+        self._check_shard_count(idx, src_shards, dry_run)
+        self.loggit.debug('Check shard factor')
+        self._check_shard_factor(idx, src_shards, dry_run)
+        self.loggit.debug('Check node availability')
+        self._check_node()
+        self.loggit.debug('Check available disk space')
+        self._check_space(idx, dry_run)
+        self.loggit.debug('FINISH PRE_SHRINK_CHECK')
+
+    def do_dry_run(self):
+        """
+        Show what a regular run would do, but don't actually do it.
+        """
+        self.index_list.filter_closed()
+        self.index_list.empty_list_check()
+        try:
+            index_lists = chunk_index_list(self.index_list.indices)
+            for l in index_lists:
+                for idx in l: # Shrink can only be done one at a time...
+                    target = self._shrink_target(idx)
+                    self.pre_shrink_check(idx, dry_run=True)
+                    self.loggit.info('DRY-RUN: Moving shards to shrink node: "{0}"'.format(self.shrink_node_name))
+                    self.loggit.info('DRY-RUN: Shrinking index "{0}" to "{1}" with settings: {2}, wait_for_active_shards={3}'.format(idx, target, self.body, self.wait_for_active_shards))
+                    if self.post_allocation:
+                        self.loggit.info('DRY-RUN: Applying post-shrink allocation rule "{0}" to index "{1}"'.format('index.routing.allocation.{0}.{1}:{2}'.format(self.post_allocation['allocation_type'], self.post_allocation['key'], self.post_allocation['value']), target))
+                    if self.delete_after:
+                        self.loggit.info('DRY-RUN: Deleting source index "{0}"'.format(idx))
+        except Exception as e:
+            report_failure(e)
+
+    def do_action(self):
+        self.index_list.filter_closed()
+        self.index_list.empty_list_check()
+        try:
+            index_lists = chunk_index_list(self.index_list.indices)
+            for l in index_lists:
+                for idx in l: # Shrink can only be done one at a time...
+                    target = self._shrink_target(idx)
+                    self.loggit.info('Source index: {0} -- Target index: {1}'.format(idx, target))
+                    # Pre-check ensures disk space available for each pass of the loop
+                    self.pre_shrink_check(idx)
+                    # Route the index to the shrink node
+                    self.loggit.info('Moving shards to shrink node: "{0}"'.format(self.shrink_node_name))
+                    self.route_index(idx, 'require', '_name', self.shrink_node_name)
+                    # Ensure a copy of each shard is present
+                    self._check_all_shards(idx)
+                    # Block writes on index
+                    self._block_writes(idx)
+                    # Do final health check
+                    if not health_check(self.client, status='green'):
+                        raise ActionError('Unable to proceed with shrink action. Cluster health is not "green"')
+                    # Do the shrink
+                    self.loggit.info('Shrinking index "{0}" to "{1}" with settings: {2}, wait_for_active_shards={3}'.format(idx, target, self.body, self.wait_for_active_shards))
+                    self.client.indices.shrink(index=idx, target=target, body=self.body, wait_for_active_shards=self.wait_for_active_shards)
+                    # Wait for it to complete
+                    if self.wfc:
+                        self.loggit.debug('Wait for shards to complete allocation for index: {0}'.format(target))
+                        wait_for_it(self.client, 'shrink', wait_interval=self.wait_interval, max_wait=self.max_wait)
+                    self.loggit.info('Index "{0}" successfully shrunk to "{1}"'.format(idx, target))
+                    # Do post-shrink steps
+                    # Unblock writes on index (just in case)
+                    self._unblock_writes(idx)
+                    ## Post-allocation, if enabled
+                    if self.post_allocation:
+                        self.loggit.info('Applying post-shrink allocation rule "{0}" to index "{1}"'.format('index.routing.allocation.{0}.{1}:{2}'.format(self.post_allocation['allocation_type'], self.post_allocation['key'], self.post_allocation['value']), target))
+                        self.route_index(target, self.post_allocation['allocation_type'], self.post_allocation['key'], self.post_allocation['value'])
+                    ## Delete, if flagged
+                    if self.delete_after:
+                        self.loggit.info('Deleting source index "{0}"'.format(idx))
+                        self.client.indices.delete(index=idx)
+                    else: # Let's unset the routing we applied here.
+                        self.loggit.info('Unassigning routing for source index: "{0}"'.format(idx))
+                        self.route_index(idx, 'require', '_name', '')
+
+        except Exception as e:
+            # Just in case it fails after attempting to meet this condition
+            self._unblock_writes(idx)
+            report_failure(e)
+        
diff --git a/curator/cli.py b/curator/cli.py
index 93d60b7..86fbff5 100644
--- a/curator/cli.py
+++ b/curator/cli.py
@@ -29,6 +29,7 @@ CLASS_MAP = {
     'restore' : Restore,
     'rollover' : Rollover,
     'snapshot' : Snapshot,
+    'shrink' : Shrink,
 }
 
 def process_action(client, config, **kwargs):
@@ -98,19 +99,9 @@ def process_action(client, config, **kwargs):
         logger.debug('Doing the action here.')
         action_obj.do_action()
 
- at click.command()
- at click.option('--config',
-    help="Path to configuration file. Default: ~/.curator/curator.yml",
-    type=click.Path(exists=True), default=settings.config_file()
-)
- at click.option('--dry-run', is_flag=True, help='Do not perform any changes.')
- at click.argument('action_file', type=click.Path(exists=True), nargs=1)
- at click.version_option(version=__version__)
-def cli(config, dry_run, action_file):
+def run(config, action_file, dry_run=False):
     """
-    Curator for Elasticsearch indices.
-
-    See http://elastic.co/guide/en/elasticsearch/client/curator/current
+    Actually run.
     """
     client_args = process_config(config)
     logger = logging.getLogger(__name__)
@@ -202,3 +193,19 @@ def cli(config, dry_run, action_file):
                     sys.exit(1)
         logger.info('Action ID: {0}, "{1}" completed.'.format(idx, action))
     logger.info('Job completed.')
+
+ at click.command()
+ at click.option('--config',
+    help="Path to configuration file. Default: ~/.curator/curator.yml",
+    type=click.Path(exists=True), default=settings.config_file()
+)
+ at click.option('--dry-run', is_flag=True, help='Do not perform any changes.')
+ at click.argument('action_file', type=click.Path(exists=True), nargs=1)
+ at click.version_option(version=__version__)
+def cli(config, dry_run, action_file):
+    """
+    Curator for Elasticsearch indices.
+
+    See http://elastic.co/guide/en/elasticsearch/client/curator/current
+    """
+    run(config, action_file, dry_run)
\ No newline at end of file
diff --git a/curator/defaults/filter_elements.py b/curator/defaults/filter_elements.py
index f7acc3e..650b823 100644
--- a/curator/defaults/filter_elements.py
+++ b/curator/defaults/filter_elements.py
@@ -122,6 +122,12 @@ def unit_count(**kwargs):
     # filtertype if use_age is set to True.
     return { Required('unit_count'): Coerce(int) }
 
+def unit_count_pattern(**kwargs):
+    # This setting is used with the age filtertype to define, whether
+    # the unit_count value is taken from the configuration or read from
+    # the index name via a regular expression
+    return { Optional('unit_count_pattern'): Any(str, unicode) }
+
 def use_age(**kwargs):
     # Use of this setting requires the additional setting, source.
     return { Optional('use_age', default=False): Any(bool, All(Any(str, unicode), Boolean())) }
diff --git a/curator/defaults/filtertypes.py b/curator/defaults/filtertypes.py
index cc16d17..e75f780 100644
--- a/curator/defaults/filtertypes.py
+++ b/curator/defaults/filtertypes.py
@@ -48,6 +48,7 @@ def age(action, config):
         filter_elements.direction(),
         filter_elements.unit(),
         filter_elements.unit_count(),
+        filter_elements.unit_count_pattern(),
         filter_elements.epoch(),
         filter_elements.exclude(),
     ]
diff --git a/curator/defaults/option_defaults.py b/curator/defaults/option_defaults.py
index e335db0..13eac1c 100644
--- a/curator/defaults/option_defaults.py
+++ b/curator/defaults/option_defaults.py
@@ -27,6 +27,9 @@ def delay():
             )
     }
 
+def delete_after():
+    return { Optional('delete_after', default=True): Any(bool, All(Any(str, unicode), Boolean())) }
+
 def delete_aliases():
     return { Optional('delete_aliases', default=False): Any(bool, All(Any(str, unicode), Boolean())) }
 
@@ -70,7 +73,7 @@ def max_wait(action):
     value = -1
     # if action in ['allocation', 'cluster_routing', 'replicas']:
     #     value = -1
-    # elif action in ['restore', 'snapshot', 'reindex']:
+    # elif action in ['restore', 'snapshot', 'reindex', 'shrink']:
     #     value = -1
     return { Optional('max_wait', default=value): Any(-1, Coerce(int), None) }
 
@@ -93,9 +96,32 @@ def name(action):
 def new_index():
     return { Optional('new_index', default=None): Any(str, unicode) }
 
+def node_filters():
+    return { 
+        Optional('node_filters', default={}): {
+          Optional('permit_masters', default=False): Any(bool, All(Any(str, unicode), Boolean())),
+          Optional('exclude_nodes', default=[]): Any(list, None)
+        }
+    }
+
+def number_of_replicas():
+    return { Optional('number_of_replicas', default=1): All(Coerce(int), Range(min=0, max=10)) }
+
+def number_of_shards():
+    return { Optional('number_of_shards', default=1): All(Coerce(int), Range(min=1, max=99)) }
+
 def partial():
     return { Optional('partial', default=False): Any(bool, All(Any(str, unicode), Boolean())) }
 
+def post_allocation():
+    return { 
+        Optional('post_allocation', default={}): {
+          Required('allocation_type', default='require'): All(Any(str, unicode), Any('require', 'include', 'exclude')),
+          Required('key'): Any(str, unicode),
+          Required('value', default=None): Any(str, unicode, None)
+        }
+    }
+
 def preserve_existing():
     return { Optional('preserve_existing', default=False): Any(bool, All(Any(str, unicode), Boolean())) }
 
@@ -164,6 +190,7 @@ def request_body():
                     Optional('socket_timeout'): Any(str, unicode),
                     Optional('connect_timeout'): Any(str, unicode),
                 },
+                Optional('size'): Coerce(int),
                 Optional('type'): Any(Any(str, unicode), list),
                 Optional('query'): dict,
                 Optional('sort'): dict,
@@ -213,6 +240,15 @@ def cluster_routing_value():
             )
     }
 
+def shrink_node():
+    return { Required('shrink_node'): Any(str, unicode) }
+
+def shrink_prefix():
+    return { Optional('shrink_prefix', default=''): Any(str, unicode, None) }
+
+def shrink_suffix():
+    return { Optional('shrink_suffix', default='-shrink'): Any(str, unicode, None) }
+
 def skip_repo_fs_check():
     return { Optional('skip_repo_fs_check', default=False): Any(bool, All(Any(str, unicode), Boolean())) }
 
@@ -243,11 +279,11 @@ def value():
 
 def wait_for_active_shards(action):
     value = 0
-    if action == 'reindex':
+    if action in ['reindex', 'shrink']:
         value = 1
     return {
         Optional('wait_for_active_shards', default=value): Any(
-            Coerce(int), None)
+            Coerce(int), 'all', None)
     }
 
 def wait_for_completion(action):
@@ -262,7 +298,7 @@ def wait_interval(action):
     maxval = 30
     # if action in ['allocation', 'cluster_routing', 'replicas']:
     value = 3
-    if action in ['restore', 'snapshot', 'reindex']:
+    if action in ['restore', 'snapshot', 'reindex', 'shrink']:
         value = 9
     return { Optional('wait_interval', default=value): Any(All(
                 Coerce(int), Range(min=minval, max=maxval)), None) }
diff --git a/curator/defaults/settings.py b/curator/defaults/settings.py
index 8e618cb..b94e7fb 100644
--- a/curator/defaults/settings.py
+++ b/curator/defaults/settings.py
@@ -54,6 +54,7 @@ def index_actions():
         'reindex',
         'replicas',
         'rollover',
+        'shrink',
         'snapshot',
     ]
 
@@ -118,6 +119,7 @@ def structural_filter_elements():
         Optional('timestring'): Any(str, unicode, None),
         Optional('unit'): Any(str, unicode),
         Optional('unit_count'): Coerce(int),
+        Optional('unit_count_pattern'): Any(str, unicode),
         Optional('use_age'): Boolean(),
         Optional('value'): Any(int, float, str, unicode, bool),
         Optional('week_starts_on'): Any(str, unicode, None),
diff --git a/curator/indexlist.py b/curator/indexlist.py
index 3f82b5d..5c0c246 100644
--- a/curator/indexlist.py
+++ b/curator/indexlist.py
@@ -390,7 +390,7 @@ class IndexList(object):
 
     def filter_by_age(self, source='name', direction=None, timestring=None,
         unit=None, unit_count=None, field=None, stats_result='min_value',
-        epoch=None, exclude=False,
+        epoch=None, exclude=False, unit_count_pattern=False
         ):
         """
         Match `indices` by relative age calculations.
@@ -404,6 +404,8 @@ class IndexList(object):
             ``weeks``, ``months``, or ``years``.
         :arg unit_count: The number of ``unit`` (s). ``unit_count`` * ``unit`` will
             be calculated out to the relative number of seconds.
+        :arg unit_count_pattern: A regular expression whose capture group identifies
+            the value for ``unit_count``.
         :arg field: A timestamp field name.  Only used for ``field_stats`` based
             calculations.
         :arg stats_result: Either `min_value` or `max_value`.  Only used in
@@ -431,6 +433,12 @@ class IndexList(object):
             source=source, timestring=timestring, field=field,
             stats_result=stats_result
         )
+        if unit_count_pattern:
+            try:
+                unit_count_matcher = re.compile(unit_count_pattern)
+            except:
+                # We got an illegal regex, so won't be able to match anything
+                unit_count_matcher = None
         for index in self.working_list():
             try:
                 age = int(self.index_info[index]['age'][self.age_keyfield])
@@ -445,10 +453,30 @@ class IndexList(object):
                 )
                 # Because time adds to epoch, smaller numbers are actually older
                 # timestamps.
+                if unit_count_pattern:
+                    self.loggit.debug('Unit_count_pattern is set, trying to match pattern to index "{0}"'.format(index))
+                    unit_count_from_index = get_unit_count_from_name(index, unit_count_matcher)
+                    if unit_count_from_index:
+                        self.loggit.debug('Pattern matched, applying unit_count of  "{0}"'.format(unit_count_from_index))
+                        adjustedPoR = get_point_of_reference(unit, unit_count_from_index, epoch)
+                        test = 0
+                    elif unit_count == -1:
+                        # Unable to match pattern and unit_count is -1, meaning no fallback, so this
+                        # index is removed from the list
+                        self.loggit.debug('Unable to match pattern and no fallback value set. Removing index "{0}" from actionable list'.format(index))
+                        exclude = True
+                        adjustedPoR = PoR # necessary to avoid exception if the first index is excluded
+                    else:
+                        # Unable to match the pattern and unit_count is set, so fall back to using unit_count
+                        # for determining whether to keep this index in the list
+                        self.loggit.debug('Unable to match pattern using fallback value of "{0}"'.format(unit_count))
+                        adjustedPoR = PoR
+                else:
+                    adjustedPoR = PoR
                 if direction == 'older':
-                    agetest = age < PoR
+                    agetest = age < adjustedPoR
                 else:
-                    agetest = age > PoR
+                    agetest = age > adjustedPoR
                 self.__excludify(agetest, exclude, index, msg)
             except KeyError:
                 self.loggit.debug(
diff --git a/curator/utils.py b/curator/utils.py
index c6f3ddd..e0127b7 100644
--- a/curator/utils.py
+++ b/curator/utils.py
@@ -348,7 +348,19 @@ def get_point_of_reference(unit, count, epoch=None):
         epoch = time.time()
     epoch = fix_epoch(epoch)
     return epoch - multiplier * count
-   
+
+def get_unit_count_from_name(index_name, pattern):
+    if (pattern == None):
+        return None
+    match = pattern.search(index_name)
+    if match:
+        try:
+            return int(match.group(1))
+        except Exception:
+            return None
+    else:
+        return None
+
 def date_range(unit, range_from, range_to, epoch=None, week_starts_on='sunday'):
     """
     Get the epoch start time and end time of a range of ``unit``s, reckoning the 
@@ -610,12 +622,12 @@ def check_version(client):
         logger.error(
             'Elasticsearch version {0} incompatible '
             'with this version of Curator '
-            '({0})'.format(".".join(map(str,version_number)), __version__)
+            '({1})'.format(".".join(map(str,version_number)), __version__)
         )
         raise CuratorException(
             'Elasticsearch version {0} incompatible '
             'with this version of Curator '
-            '({0})'.format(".".join(map(str,version_number)), __version__)
+            '({1})'.format(".".join(map(str,version_number)), __version__)
         )
 
 def check_master(client, master_only=False):
@@ -1565,6 +1577,10 @@ def wait_for_it(
             'function':task_check,
             'args':{'task_id':task_id},
         },
+        'shrink':{
+            'function': health_check,
+            'args': {'status':'green'},
+        },
     }
     wait_actions = list(action_map.keys())
 
@@ -1622,8 +1638,8 @@ def wait_for_it(
         else:
             logger.debug(
                 'Action "{0}" not yet complete, {1} total seconds elapsed. '
-                'Waiting {1} seconds before checking '
-                'again.'.format(action, wait_interval))
+                'Waiting {2} seconds before checking '
+                'again.'.format(action, elapsed, wait_interval))
             time.sleep(wait_interval)
 
     logger.debug('Result: {0}'.format(result))
@@ -1631,4 +1647,59 @@ def wait_for_it(
         raise ActionTimeout(
             'Action "{0}" failed to complete in the max_wait period of '
             '{1} seconds'.format(action, max_wait)
-        )
\ No newline at end of file
+        )
+
+def node_roles(client, node_id):
+    """
+    Return the list of roles assigned to the node identified by ``node_id``
+
+    :arg client: An :class:`elasticsearch.Elasticsearch` client object
+    :rtype: list
+    """
+    return client.nodes.info()['nodes'][node_id]['roles']
+
+def index_size(client, idx):
+    return client.indices.stats(index=idx)['indices'][idx]['total']['store']['size_in_bytes']
+
+def single_data_path(client, node_id):
+    """
+    In order for a shrink to work, it should be on a single filesystem, as 
+    shards cannot span filesystems.  Return `True` if the node has a single
+    filesystem, and `False` otherwise.
+
+    :arg client: An :class:`elasticsearch.Elasticsearch` client object
+    :rtype: bool
+    """
+    return len(client.nodes.stats()['nodes'][node_id]['fs']['data']) == 1 
+
+
+def name_to_node_id(client, name):
+    """
+    Return the node_id of the node identified by ``name``
+
+    :arg client: An :class:`elasticsearch.Elasticsearch` client object
+    :rtype: str
+    """
+    stats = client.nodes.stats()
+    for node in stats['nodes']:
+        if stats['nodes'][node]['name'] == name:
+            logger.debug('Found node_id "{0}" for name "{1}".'.format(node, name))
+            return node
+    logger.error('No node_id found matching name: "{0}"'.format(name))
+    return None
+
+def node_id_to_name(client, node_id):
+    """
+    Return the name of the node identified by ``node_id``
+
+    :arg client: An :class:`elasticsearch.Elasticsearch` client object
+    :rtype: str
+    """
+    stats = client.nodes.stats()
+    name = None
+    if node_id in stats['nodes']:
+        name = stats['nodes'][node_id]['name']
+    else:
+        logger.error('No node_id found matching: "{0}"'.format(node_id))
+    logger.debug('Name associated with node_id "{0}": {1}'.format(node_id, name))
+    return name
diff --git a/curator/validators/options.py b/curator/validators/options.py
index a8ca520..d516a23 100644
--- a/curator/validators/options.py
+++ b/curator/validators/options.py
@@ -108,6 +108,21 @@ def action_specific(action):
             option_defaults.max_wait(action),
             option_defaults.skip_repo_fs_check(),
         ],
+        'shrink' : [
+            option_defaults.shrink_node(),
+            option_defaults.node_filters(),
+            option_defaults.number_of_shards(),
+            option_defaults.number_of_replicas(),
+            option_defaults.shrink_prefix(),
+            option_defaults.shrink_suffix(),
+            option_defaults.delete_after(),
+            option_defaults.post_allocation(),
+            option_defaults.wait_for_active_shards(action),
+            option_defaults.extra_settings(),
+            option_defaults.wait_for_completion(action),
+            option_defaults.wait_interval(action),
+            option_defaults.max_wait(action),
+        ],
     }
     return options[action]
 
diff --git a/docs/Changelog.rst b/docs/Changelog.rst
index c57199b..b9d66f3 100644
--- a/docs/Changelog.rst
+++ b/docs/Changelog.rst
@@ -3,6 +3,38 @@
 Changelog
 =========
 
+5.2.0 (1 September 2017)
+------------------------
+
+**New Features**
+
+  * Shrink action! Apologies to all who have patiently waited for this 
+    feature.  It's been a long time coming, but it is hopefully worth the 
+    wait.  There are a lot of checks and tests associated with this action,
+    as there are many conditions that have to be met in order for a shrink
+    to take place.  Curator will try its best to ensure that all of these
+    conditions are met so you can comfortably rest assured that shrink will
+    work properly unattended.  See the documentation for more information.
+  * The ``cli`` function has been split into ``cli`` and ``run`` functions.  
+    The behavior of ``cli`` will be indistinguishable from previous releases,
+    preserving API integrity.  The new ``run`` function allows lambda and other
+    users to `run` Curator from the API with only a client configuration file
+    and action file as arguments.  Requested in #1031 (untergeek)
+  * Allow use of time/date string interpolation for Rollover index naming.
+    Added in #1010 (tschroeder-zendesk)
+  * New ``unit_count_pattern`` allows you to derive the ``unit_count`` from 
+    the index name itself.  This involves regular expressions, so be sure to
+    do lots of testing in ``--dry-run`` mode before deploying to production.
+    Added by (soenkeliebau) in #997
+
+**Bug Fixes**
+
+  * Reindex ``request_body`` allows for 2 different ``size`` options.  One 
+    limits the number of documents reindexed.  The other is for batch sizing.
+    The batch sizing option was missing from the schema validator.  This has
+    been corrected.  Reported in #1038 (untergeek)
+  * A few sundry logging and notification changes were made.
+
 5.1.2 (08 August 2017)
 ----------------------
 
@@ -51,8 +83,13 @@ Changelog
   * Fix/improve rST API documentation.
   * Thanks to many users who not only found and reported documentation issues,
     but also submitted corrections.
+<<<<<<< HEAD
+=======
+
+>>>>>>> master
 
 5.1.1 (8 June 2017)
+-------------------
 
 **Bug Fixes**
 
@@ -60,6 +97,7 @@ Changelog
     I reverted the string-based comparison as before.
     
 5.1.0 (8 June 2017)
+-------------------
 
 **New Features**
 
@@ -89,6 +127,7 @@ Changelog
     to reflect this.
 
 5.0.4 (16 May 2017)
+-------------------
 
 **Bug Fixes**
 
@@ -97,6 +136,7 @@ Changelog
     #966.  (untergeek)
 
 5.0.3 (15 May 2017)
+-------------------
 
 **Bug Fixes**
 
@@ -106,6 +146,7 @@ Changelog
     Reported in #962. (untergeek)
 
 5.0.2 (4 May 2017)
+------------------
 
 **Bug Fixes**
 
@@ -126,6 +167,7 @@ Changelog
   * Document how to include file paths better. Fixes #944. (untergeek)
 
 5.0.1 (10 April 2017)
+---------------------
 
 **Bug Fixes**
 
@@ -139,6 +181,7 @@ Changelog
   * Address date matching behavior better per #858 (untergeek)
 
 5.0.0 (5 April 2017)
+--------------------
 
 The full feature set of 5.0 (including alpha releases) is included here.
 
diff --git a/docs/actionclasses.rst b/docs/actionclasses.rst
index 7d5766d..24d7b2a 100644
--- a/docs/actionclasses.rst
+++ b/docs/actionclasses.rst
@@ -11,11 +11,17 @@ Action Classes
 * `Allocation`_
 * `Close`_
 * `ClusterRouting`_
+* `CreateIndex`_
 * `DeleteIndices`_
 * `DeleteSnapshots`_
 * `ForceMerge`_
+* `IndexSettings`_
 * `Open`_
+* `Reindex`_
 * `Replicas`_
+* `Restore`_
+* `Rollover`_
+* `Shrink`_
 * `Snapshot`_
 
 
@@ -39,6 +45,11 @@ ClusterRouting
 .. autoclass:: curator.actions.ClusterRouting
   :members:
 
+CreateIndex
+--------------
+.. autoclass:: curator.actions.CreateIndex
+  :members:
+
 DeleteIndices
 -------------
... 1110 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/elasticsearch-curator.git



More information about the Python-modules-commits mailing list