[med-svn] [Git][med-team/biomaj3][master] 4 commits: New upstream version 3.1.14

Olivier Sallou gitlab at salsa.debian.org
Tue Nov 12 11:00:02 GMT 2019



Olivier Sallou pushed to branch master at Debian Med / biomaj3


Commits:
1cf1bd06 by Olivier Sallou at 2019-11-12T10:29:01Z
New upstream version 3.1.14
- - - - -
5ef9d67d by Olivier Sallou at 2019-11-12T10:29:02Z
Update upstream source from tag 'upstream/3.1.14'

Update to upstream version '3.1.14'
with Debian dir efe2dea6ed9d5e510e9ef9d012fdd251224c849f
- - - - -
b84bc377 by Olivier Sallou at 2019-11-12T10:29:48Z
new upstream 3.1.14

- - - - -
11a9fbb0 by Olivier Sallou at 2019-11-12T10:40:36Z
move cli to suggests

- - - - -


13 changed files:

- .travis.yml
- CHANGES.txt
- README.md
- biomaj/bank.py
- biomaj/process/metaprocess.py
- biomaj/workflow.py
- config.yml
- debian/changelog
- debian/control
- global.properties.example
- requirements.txt
- setup.py
- tests/biomaj_tests.py


Changes:

=====================================
.travis.yml
=====================================
@@ -2,15 +2,20 @@ language: python
 sudo: false
 python:
 - '2.7'
-- '3.4'
-- '3.5'
 - '3.6'
+- '3.7'
+- '3.8'
 services:
 - mongodb
 - elasticsearch
 branches:
   except:
   - "/^feature.*$/"
+before_install:
+  - sudo apt-get install -y libgnutls-dev
+addons:
+  apt:
+    update: true
 install:
 - pip install flake8
 - pip install -r requirements.txt


=====================================
CHANGES.txt
=====================================
@@ -1,3 +1,22 @@
+3.1.14:
+  Add repair option
+3.1.13:
+  Add process name and status in logs
+  PR #116 update to use download 3.1.0
+3.1.12:
+  In case of multiple matches for release regexp, try to determine most recent one
+  #115 Correctly use save_as for release file name
+3.1.11:
+  Increase one log level
+  #110 Allow ftps and directftps protocols (needs biomaj-download 3.0.26 and biomaj-core 3.0.19)
+  #111 locked bank after bad update command
+  Ignore UTF-8 errors in release file
+  Add plugin support via biomaj-plugins repo (https://github.com/genouest/biomaj-plugins) to get release and list of files to download from a plugin script.
+  Add support for protocol options in global and bank properties (options.names=x,y   options.x=val options.y=val). Options may be ignored or used differently depending on used protocol.
+3.1.10:
+  Allow to use hardlinks when reusing files from previous releases
+3.1.9:
+  Fix remote.files recursion
 3.1.8:
   Fix uncompress when saved files contains subdirectory
 3.1.7:


=====================================
README.md
=====================================
@@ -49,42 +49,41 @@ Application Features
 ====================
 
 * Synchronisation:
- * Multiple remote protocols (ftp, sftp, http, local copy, etc.)
- * Data transfers integrity check
- * Release versioning using a incremental approach
- * Multi threading
- * Data extraction (gzip, tar, bzip)
- * Data tree directory normalisation
-
+  * Multiple remote protocols (ftp, ftps, http, local copy, etc.)
+  * Data transfers integrity check
+  * Release versioning using a incremental approach
+  * Multi threading
+  * Data extraction (gzip, tar, bzip)
+  * Data tree directory normalisation
+  * Plugins support for custom downloads
 
 * Pre &Post processing :
- * Advanced workflow description (D.A.G)
- * Post-process indexation for various bioinformatics software (blast, srs, fastacmd, readseq, etc.)
- * Easy integration of personal scripts for bank post-processing automation
-
+  * Advanced workflow description (D.A.G)
+  * Post-process indexation for various bioinformatics software (blast, srs, fastacmd, readseq, etc.)
+  * Easy integration of personal scripts for bank post-processing automation
 
 * Supervision:
- * Optional Administration web interface (biomaj-watcher)
- * CLI management
- * Mail alerts for the update cycle supervision
- * Prometheus and Influxdb optional integration
- * Optional consul supervision of processes
+  * Optional Administration web interface (biomaj-watcher)
+  * CLI management
+  * Mail alerts for the update cycle supervision
+  * Prometheus and Influxdb optional integration
+  * Optional consul supervision of processes
 
 
 * Scalability:
- * Monolithic (local install) or microservice architecture (remote access to a BioMAJ server)
- * Microservice installation allows per process scalability and supervision (number of process in charge of download, execution, etc.)
-
+  * Monolithic (local install) or microservice architecture (remote access to a BioMAJ server)
+  * Microservice installation allows per process scalability and supervision (number of process in charge of download, execution, etc.)
 
 * Remote access:
- * Optional FTP server providing authenticated or anonymous data access
+  * Optional FTP server providing authenticated or anonymous data access
 
 Dependencies
 ============
 
 Packages:
- * Debian: libcurl-dev, gcc
- * CentOs: libcurl-devel, openldap-devel, gcc
+
+* Debian: libcurl-dev, gcc
+* CentOs: libcurl-devel, openldap-devel, gcc
 
  Linux tools: tar, unzip, gunzip, bunzip
 
@@ -115,7 +114,6 @@ From packages:
 
     pip install biomaj biomaj-cli biomaj-daemon
 
-
 You should consider using a Python virtual environment (virtualenv) to install BioMAJ.
 
 In tools/examples, copy the global.properties and update it to match your local
@@ -123,13 +121,11 @@ installation.
 
 The tools/process contains example process files (python and shell).
 
-
 Docker
 ======
 
 You can use BioMAJ with Docker (genouest/biomaj)
 
-
     docker pull genouest/biomaj
     docker pull mongo
     docker run --name biomaj-mongodb -d mongo
@@ -147,6 +143,51 @@ No default bank property file or process are available in the container.
 
 Examples are available at https://github.com/genouest/biomaj-data
 
+
+Import bank templates
+=====================
+
+Once biomaj is installed, it is possible to import some bank examples with the biomaj client
+
+
+    # List available templates
+    biomaj-cli ... --data-list
+    # Import a bank template
+    biomaj-cli ... --data-import --bank alu
+    # then edit bank template in config directory if needed and launch bank update
+    biomaj-cli ... --update --bank alu
+
+Plugins
+=======
+
+BioMAJ support python plugins to manage custom downloads where supported protocols
+are not enough (http page with unformatted listing, access to protected pages, etc.).
+
+Example of plugins and how to configure them are available on [biomaj-plugins](https://github.com/genouest/biomaj-plugins) repository.
+
+Plugins can define a specific way to:
+
+* retreive release
+* list remote files to download
+* download remote files
+
+Plugin can define one or many of those features.
+
+Basically, one defined in bank property file:
+
+    # Location of plugins
+    plugins_dir=/opt/biomaj-plugins
+    # Use plugin to fetch release
+    release.plugin=github
+    # List of arguments of plugin function with key=value format, comma separated
+    release.plugin_args=repo=osallou/goterra-cli
+
+Plugins are used when related workflow step is used:
+
+* release.plugin <= returns remote release
+* remote.plugin <= returns list of files to download
+* download.plugin <= download files from list of files
+
 API documentation
 =================
 
@@ -172,7 +213,6 @@ Execute unit tests but disable ones needing network access
 
     nosetests -a '!network'
 
-
 Monitoring
 ==========
 
@@ -195,14 +235,12 @@ A-GPL v3+
 Remarks
 =======
 
-Biomaj uses libcurl, for sftp libcurl must be compiled with sftp support
-
 To delete elasticsearch index:
 
- curl -XDELETE 'http://localhost:9200/biomaj_test/'
+ curl -XDELETE '<http://localhost:9200/biomaj_test/'>
 
 Credits
-======
+=======
 
 Special thanks for tuco at Pasteur Institute for the intensive testing and new ideas.
 Thanks to the old BioMAJ team for the work they have done.


=====================================
biomaj/bank.py
=====================================
@@ -13,6 +13,7 @@ from biomaj.mongo_connector import MongoConnector
 from biomaj.session import Session
 from biomaj.workflow import UpdateWorkflow
 from biomaj.workflow import RemoveWorkflow
+from biomaj.workflow import RepairWorkflow
 from biomaj.workflow import Workflow
 from biomaj.workflow import ReleaseCheckWorkflow
 from biomaj_core.config import BiomajConfig
@@ -502,8 +503,10 @@ class Bank(object):
         # Insert session
         if self.session.get('action') == 'update':
             action = 'last_update_session'
-        if self.session.get('action') == 'remove':
+        elif self.session.get('action') == 'remove':
             action = 'last_remove_session'
+        else:
+            action = 'last_update_session'
 
         cache_dir = self.config.get('cache.dir')
         download_files = self.session.get('download_files')
@@ -1112,6 +1115,74 @@ class Bank(object):
 
         return res
 
+    def repair(self):
+        """
+        Launch a bank repair
+
+        :return: bool
+        """
+        logging.warning('Bank:' + self.name + ':Repair')
+        start_time = datetime.now()
+        start_time = time.mktime(start_time.timetuple())
+
+        if not self.is_owner():
+            logging.error('Not authorized, bank owned by ' + self.bank['properties']['owner'])
+            raise Exception('Not authorized, bank owned by ' + self.bank['properties']['owner'])
+
+        self.run_depends = False
+
+        self.controls()
+        if self.options.get_option('release'):
+            logging.info('Bank:' + self.name + ':Release:' + self.options.get_option('release'))
+            s = self.get_session_from_release(self.options.get_option('release'))
+            # No session in prod
+            if s is None:
+                logging.error('Release does not exists: ' + self.options.get_option('release'))
+                return False
+            self.load_session(UpdateWorkflow.FLOW, s)
+        else:
+            logging.info('Bank:' + self.name + ':Release:latest')
+            self.load_session(UpdateWorkflow.FLOW)
+        self.session.set('action', 'update')
+        res = self.start_repair()
+        self.session.set('workflow_status', res)
+        self.save_session()
+        try:
+            self.__stats()
+        except Exception:
+            logging.exception('Failed to send stats')
+        end_time = datetime.now()
+        end_time = time.mktime(end_time.timetuple())
+        self.history.insert({
+            'bank': self.name,
+            'error': not res,
+            'start': start_time,
+            'end': end_time,
+            'action': 'repair',
+            'updated': self.session.get('update')
+        })
+        return res
+
+    def start_repair(self):
+        """
+        Start an repair workflow
+        """
+        workflow = RepairWorkflow(self)
+        if self.options and self.options.get_option('redis_host'):
+            redis_client = redis.StrictRedis(
+                host=self.options.get_option('redis_host'),
+                port=self.options.get_option('redis_port'),
+                db=self.options.get_option('redis_db'),
+                decode_responses=True
+            )
+            workflow.redis_client = redis_client
+            workflow.redis_prefix = self.options.get_option('redis_prefix')
+            if redis_client.get(self.options.get_option('redis_prefix') + ':' + self.name + ':action:cancel'):
+                logging.warn('Cancel requested, stopping update')
+                redis_client.delete(self.options.get_option('redis_prefix') + ':' + self.name + ':action:cancel')
+                return False
+        return workflow.start()
+
     def update(self, depends=False):
         """
         Launch a bank update
@@ -1165,7 +1236,9 @@ class Bank(object):
                         if not reset:
                             logging.info("Process %s not found in %s" % (str(proc), task['name']))
                             return False
-
+            if not set_to_false:
+                logging.error('No task found named %s' % (self.options.get_option('from_task')))
+                return False
         self.session.set('action', 'update')
         res = self.start_update()
         self.session.set('workflow_status', res)


=====================================
biomaj/process/metaprocess.py
=====================================
@@ -262,8 +262,11 @@ class MetaProcess(threading.Thread):
                     processes_status[bprocess] = res
                     self.set_progress(bmaj_process.name, res)
                     if not res:
+                        logging.info("PROC:META:RUN:PROCESS:ERROR:" + bmaj_process.name)
                         self.global_status = False
                         break
+                    else:
+                        logging.info("PROC:META:RUN:PROCESS:OK:" + bmaj_process.name)
                     if not self.simulate:
                         if self._lock:
                             self._lock.acquire()


=====================================
biomaj/workflow.py
=====================================
@@ -16,7 +16,7 @@ import sys
 from biomaj_core.utils import Utils
 from biomaj_download.downloadclient import DownloadClient
 from biomaj_download.message import downmessage_pb2
-from biomaj_download.download.http import HTTPParse
+from biomaj_download.download.curl import HTTPParse
 from biomaj_download.download.localcopy import LocalDownload
 
 from biomaj.mongo_connector import MongoConnector
@@ -24,6 +24,7 @@ from biomaj.options import Options
 from biomaj.process.processfactory import RemoveProcessFactory, PreProcessFactory, PostProcessFactory
 
 from biomaj_zipkin.zipkin import Zipkin
+from yapsy.PluginManager import PluginManager
 
 
 class Workflow(object):
@@ -334,6 +335,28 @@ class UpdateWorkflow(Workflow):
         logging.debug('New workflow')
         self.session._session['update'] = True
 
+    def _get_plugin(self, name, plugin_args):
+        options = {}
+        plugins_dir = self.bank.config.get('plugins_dir')
+        if not plugins_dir:
+            return None
+        if plugin_args:
+            for plugin_arg in plugin_args.split(','):
+                arg = plugin_arg.split('=', 1)
+                options[arg[0].strip()] = arg[1].strip()
+        requested_plugin = None
+        simplePluginManager = PluginManager()
+        simplePluginManager.setPluginPlaces([plugins_dir])
+        simplePluginManager.collectPlugins()
+        logging.error('Load plugins from %s' % (plugins_dir))
+        for pluginInfo in simplePluginManager.getAllPlugins():
+            if pluginInfo.plugin_object.name() == name:
+                requested_plugin = pluginInfo.plugin_object
+        if requested_plugin is None:
+            return None
+        requested_plugin.configure(options)
+        return requested_plugin
+
     def wf_init(self):
         err = super(UpdateWorkflow, self).wf_init()
         if not err:
@@ -431,6 +454,7 @@ class UpdateWorkflow(Workflow):
         """
         logging.info('Workflow:wf_copydepends')
         deps = self.bank.get_dependencies()
+        cf = self.session.config
         for dep in deps:
             if self.bank.config.get(dep + '.files.move'):
                 logging.info('Worflow:wf_depends:Files:Move:' + self.bank.config.get(dep + '.files.move'))
@@ -443,7 +467,10 @@ class UpdateWorkflow(Workflow):
                     logging.error('Could not find a session update for bank ' + dep)
                     return False
                 # b = self.bank.get_bank(dep, no_log=True)
-                locald = LocalDownload(bdir)
+                locald = LocalDownload(
+                    bdir,
+                    use_hardlinks=cf.get_bool("use_hardlinks", default=False)
+                )
                 (file_list, dir_list) = locald.list()
                 locald.match(self.bank.config.get(dep + '.files.move').split(), file_list, dir_list)
                 bankdepdir = self.bank.session.get_full_release_directory() + "/" + dep
@@ -484,11 +511,38 @@ class UpdateWorkflow(Workflow):
             MongoConnector.banks.update({'name': self.bank.name},
                                         info)
 
+    def __findLastRelease(self, releases):
+        '''
+        Try to find most release from releases input array
+        '''
+        release = releases[0]
+        releaseElts = re.split(r'\.|-', release)
+        logging.debug('found a release %s' % (release))
+        for rel in releases:
+            if rel == release:
+                continue
+            logging.debug('compare next release %s' % (rel))
+            relElts = re.split(r'\.|-', rel)
+            index = 0
+            for relElt in relElts:
+                logging.debug("compare release major,minor,etc. : %s >? %s" % (relElt, releaseElts[index]))
+                try:
+                    if int(relElt) > int(releaseElts[index]):
+                        release = rel
+                        logging.debug("found newer release %s" % (rel))
+                        break
+                except ValueError:
+                    pass
+                finally:
+                    index += 1
+        return release
+
     def wf_release(self):
         """
         Find current release on remote
         """
         logging.info('Workflow:wf_release')
+        release = None
         cf = self.session.config
         if cf.get('ref.release') and self.bank.depends:
             # Bank is a computed bank and we ask to set release to the same
@@ -531,6 +585,28 @@ class UpdateWorkflow(Workflow):
         self.session.previous_release = self.session.get('previous_release')
 
         logging.info('Workflow:wf_release:previous_session:' + str(self.session.previous_release))
+
+        if self.session.config.get('release.plugin'):
+            # Release already set from a previous run or an other bank
+            logging.info('Workflow:wf_release:plugin:' + str(self.session.config.get('release.plugin')))
+            plugin = self._get_plugin(self.session.config.get('release.plugin'), self.session.config.get('release.plugin_args'))
+            if plugin is None:
+                logging.error("Could not load plugin")
+                return False
+            try:
+                plugin_release = plugin.release()
+                logging.info('Workflow:wf_release:plugin:%s:%s' % (self.session.config.get('release.plugin'), plugin_release))
+                self.session.set('release', plugin_release)
+                self.session.set('remoterelease', plugin_release)
+                if self.session.previous_release == self.session.get('release') and not self.session.config.get_bool('release.control', default=False):
+                    logging.info('Workflow:wf_release:same_as_previous_session')
+                    return self.no_need_to_update()
+                else:
+                    return True
+            except Exception as e:
+                logging.exception("Plugin failed to get a release %s" % (str(e)))
+                return False
+
         if self.session.get('release'):
             # Release already set from a previous run or an other bank
             logging.info('Workflow:wf_release:session:' + str(self.session.get('release')))
@@ -603,7 +679,7 @@ class UpdateWorkflow(Workflow):
 
             save_as = None
             method = 'GET'
-            if protocol == 'directhttp' or protocol == 'directhttps' or protocol == 'directftp':
+            if protocol in ['directftp', 'directftps', 'directhttp', 'directhttps']:
                 keys = cf.get('url.params')
                 if keys is not None:
                     params = {}
@@ -631,6 +707,19 @@ class UpdateWorkflow(Workflow):
                     params['protocol'] = str(cf.get('irods.protocol')).strip()
                     params['zone'] = str(cf.get('irods.zone')).strip()
 
+            # Protocol options: as for params, a field contains the name
+            # of the options (options.names) and the values are in another
+            # field named options.<option_name>.
+            protocol_options = {}
+            option_names = cf.get('options.names')
+            if option_names is not None:
+                option_names = option_names.split(',')
+                for option_name in option_names:
+                    option_name = option_name.strip()
+                    param = cf.get('options.' + option_name)
+                    protocol_options[option_name] = param.strip()
+                logging.debug("Protocol options: " + str(protocol_options))
+
             release_downloader = dserv.get_handler(
                 protocol,
                 server,
@@ -643,10 +732,11 @@ class UpdateWorkflow(Workflow):
                 proxy_auth=proxy_auth,
                 save_as=save_as,
                 timeout_download=cf.get('timeout.download'),
-                offline_dir=self.session.get_offline_directory()
+                offline_dir=self.session.get_offline_directory(),
+                protocol_options=protocol_options
             )
 
-            if protocol == 'directhttp' or protocol == 'directhttps' or protocol == 'directftp':
+            if protocol in ['directftp', 'directftps', 'directhttp', 'directhttps']:
                 release_downloader.set_files_to_download(remotes)
             # """"""""""""""""""""""""
 
@@ -684,10 +774,13 @@ class UpdateWorkflow(Workflow):
                 tmp_dir = tempfile.mkdtemp('biomaj')
                 rel_files = release_downloader.download(tmp_dir)
                 rel_file = None
+                rel_file_name = rel_files[0]['name']
+                if 'save_as' in rel_files[0] and rel_files[0]['save_as']:
+                    rel_file_name = rel_files[0]['save_as']
                 if (sys.version_info > (3, 0)):
-                    rel_file = open(tmp_dir + '/' + rel_files[0]['name'], encoding='utf-8')
+                    rel_file = open(tmp_dir + '/' + rel_file_name, encoding='utf-8', errors='ignore')
                 else:
-                    rel_file = open(tmp_dir + '/' + rel_files[0]['name'])
+                    rel_file = open(tmp_dir + '/' + rel_file_name)
                 rel_content = rel_file.read()
                 rel_file.close()
                 shutil.rmtree(tmp_dir)
@@ -696,11 +789,18 @@ class UpdateWorkflow(Workflow):
                     logging.error('release.regexp defined but does not match any file content')
                     self._close_download_service(dserv)
                     return False
+                rels = re.findall(cf.get('release.regexp'), rel_content)
+                if len(rels) == 1:
+                    release = rels[0]
+                else:
+                    release = self.__findLastRelease(rels)
+                '''
                 # If regexp contains matching group, else take whole match
                 if len(rel.groups()) > 0:
                     release = rel.group(1)
                 else:
                     release = rel.group(0)
+                '''
 
             release_downloader.close()
             self._close_download_service(dserv)
@@ -974,6 +1074,7 @@ class UpdateWorkflow(Workflow):
         logging.info("Workflow:wf_download:DownloadSession:" + str(session))
 
         use_remote_list = False
+        use_plugin = False
 
         http_parse = HTTPParse(
             cf.get('http.parse.dir.line'),
@@ -1058,6 +1159,19 @@ class UpdateWorkflow(Workflow):
                 if cf.get('remote.file.' + str(i) + '.name'):
                     save_as = cf.get('remote.file.' + str(i) + '.name')
 
+                # Protocol options: as for params, a field contains the name
+                # of the options (options.names) and the values are in another
+                # field named options.<option_name>.
+                protocol_options = {}
+                option_names = cf.get('remote.file.' + str(i) + '.options.names')
+                if option_names is not None:
+                    option_names = option_names.split(',')
+                    for option_name in option_names:
+                        option_name = option_name.strip()
+                        param = cf.get('remote.file.' + str(i) + '.options.' + option_name)
+                        protocol_options[option_name] = param.strip()
+                    logging.debug("Protocol options: " + str(protocol_options))
+
                 subdownloader = dserv.get_handler(
                     protocol,
                     server,
@@ -1070,7 +1184,8 @@ class UpdateWorkflow(Workflow):
                     proxy_auth=proxy_auth,
                     save_as=save_as,
                     timeout_download=cf.get('timeout.download'),
-                    offline_dir=self.session.get_offline_directory()
+                    offline_dir=self.session.get_offline_directory(),
+                    protocol_options=protocol_options
                 )
                 subdownloader.set_files_to_download(remotes)
 
@@ -1094,7 +1209,7 @@ class UpdateWorkflow(Workflow):
 
             remote_dir = cf.get('remote.dir')
 
-            if protocol == 'directhttp' or protocol == 'directhttps' or protocol == 'directftp':
+            if protocol in ['directftp', 'directftps', 'directhttp', 'directhttps']:
                 keys = cf.get('url.params')
                 if keys is not None:
                     params = {}
@@ -1120,6 +1235,19 @@ class UpdateWorkflow(Workflow):
 
             save_as = cf.get('target.name')
 
+            # Protocol options: as for params, a field contains the name
+            # of the options (options.names) and the values are in another
+            # field named options.<option_name>.
+            protocol_options = {}
+            option_names = cf.get('options.names')
+            if option_names is not None:
+                option_names = option_names.split(',')
+                for option_name in option_names:
+                    option_name = option_name.strip()
+                    param = cf.get('options.' + option_name)
+                    protocol_options[option_name] = param.strip()
+                logging.debug("Protocol options: " + str(protocol_options))
+
             downloader = dserv.get_handler(
                 protocol,
                 server,
@@ -1132,10 +1260,11 @@ class UpdateWorkflow(Workflow):
                 proxy_auth=proxy_auth,
                 save_as=save_as,
                 timeout_download=cf.get('timeout.download'),
-                offline_dir=self.session.get_offline_directory()
+                offline_dir=self.session.get_offline_directory(),
+                protocol_options=protocol_options
             )
 
-            if protocol == 'directhttp' or protocol == 'directhttps' or protocol == 'directftp':
+            if protocol in ['directftp', 'directftps', 'directhttp', 'directhttps']:
                 downloader.set_files_to_download(remotes)
 
             remote_list = cf.get('remote.list', default=None)
@@ -1144,6 +1273,13 @@ class UpdateWorkflow(Workflow):
                 downloader.files_to_download = self._get_list_from_file(remote_list)
                 use_remote_list = True
 
+            remote_plugin = cf.get('remote.plugin', default=None)
+            if remote_plugin is not None:
+                logging.info("Use list from plugin %s" % (remote_plugin))
+                plugin = self._get_plugin(self.session.config.get('remote.plugin'), self.session.config.get('remote.plugin_args'))
+                downloader.files_to_download = plugin.list(self.session.get('release'))
+                use_plugin = True
+
             downloaders.append(downloader)
 
         self._close_download_service(dserv)
@@ -1160,6 +1296,10 @@ class UpdateWorkflow(Workflow):
                 if not downloader.files_to_download:
                     self.session.set('remoterelease', self.session.previous_release)
                     return self.no_need_to_update()
+            elif use_plugin:
+                if not downloader.files_to_download:
+                    self.session.set('remoterelease', self.session.previous_release)
+                    return self.no_need_to_update()
             else:
                 (file_list, dir_list) = downloader.list()
                 downloader.match(cf.get('remote.files', default='.*').split(), file_list, dir_list)
@@ -1169,6 +1309,8 @@ class UpdateWorkflow(Workflow):
                 if 'save_as' not in f or not f['save_as']:
                     f['save_as'] = f['name']
                     for p in cf.get('remote.files', default='.*').split():
+                        if p == '.*' or p == '**/*':
+                            continue
                         if p.startswith('^'):
                             p = p.replace('^', '^/')
                         else:
@@ -1183,7 +1325,6 @@ class UpdateWorkflow(Workflow):
         self.session.set('download_files', downloader.files_to_download)
         self.session._session['stats']['nb_downloaded_files'] = len(files_to_download)
         logging.info('Workflow:wf_download:nb_files_to_download:%d' % (len(files_to_download)))
-
         if self.session.get('release') and self.session.config.get_bool('release.control', default=False):
             if self.session.previous_release == self.session.get('remoterelease'):
                 if self.is_previous_release_content_identical():
@@ -1334,7 +1475,10 @@ class UpdateWorkflow(Workflow):
             logging.debug('Workflow:wf_download:Copy files from ' + last_production_dir)
             for downloader in downloaders:
                 copied_files += downloader.files_to_copy
-                Utils.copy_files(downloader.files_to_copy, offline_dir)
+                Utils.copy_files(
+                    downloader.files_to_copy, offline_dir,
+                    use_hardlinks=cf.get_bool('use_hardlinks', default=False)
+                )
 
         downloader.close()
 
@@ -1352,7 +1496,7 @@ class UpdateWorkflow(Workflow):
                 redis_prefix=self.redis_prefix
             )
             if pool_size:
-                logging.debug('Set rate limiting: %s' % (str(pool_size)))
+                logging.info('Set rate limiting: %s' % (str(pool_size)))
                 dserv.set_rate_limiting(int(pool_size))
 
         else:
@@ -1376,6 +1520,7 @@ class UpdateWorkflow(Workflow):
                 message.bank = self.name
                 message.session = session
                 message.local_dir = offline_dir
+                message.protocol_options.update(downloader.protocol_options)
                 remote_file = downmessage_pb2.DownloadFile.RemoteFile()
                 protocol = downloader.protocol
                 remote_file.protocol = downmessage_pb2.DownloadFile.Protocol.Value(protocol.upper())
@@ -1452,6 +1597,19 @@ class UpdateWorkflow(Workflow):
         logging.info("Workflow:wf_download:Download:Waiting")
         download_error = False
         try:
+            download_plugin = cf.get('download.plugin', default=None)
+            if download_plugin is not None:
+                self.downloaded_files = copied_files
+                logging.info("Use download from plugin %s" % (download_plugin))
+                plugin = self._get_plugin(self.session.config.get('download.plugin'), self.session.config.get('remote.plugin_args'))
+                if not hasattr(plugin, 'download'):
+                    logging.error('Workflow:wf_download:Plugin:%s:Error: plugin does not have a download method' % (download_plugin))
+                    return False
+                res = plugin.download(self.session.get('release'), downloader.files_to_download, offline_dir=offline_dir)
+                self._close_download_service(dserv)
+                self.downloaded_files += downloader.files_to_download
+                return res
+
             download_error = dserv.wait_for_download()
         except Exception as e:
             self._close_download_service(dserv)
@@ -1561,7 +1719,7 @@ class UpdateWorkflow(Workflow):
             self.session.get_release_directory(),
             'flat'
         )
-
+        # We use move=True so there is no need to try to use hardlinks here
         local_files = Utils.copy_files_with_regexp(from_dir, to_dir, regexp, True)
         self.session._session['files'] = local_files
         if len(self.session._session['files']) == 0:
@@ -1829,3 +1987,43 @@ class ReleaseCheckWorkflow(UpdateWorkflow):
 
     def wf_progress(self, task, status):
         return
+
+
+class RepairWorkflow(UpdateWorkflow):
+    """
+    Workflow to repaire a bank i.e. unlock bank and set all steps to OK (after a manual repair for example)
+    """
+
+    def __init__(self, bank):
+        """
+        Instantiate a new workflow
+
+        :param bank: bank on which to apply the workflow
+        :type bank: Bank
+        :param session: session to remove
+        :type session: :class:`biomaj.session.Session`
+        """
+        self.wf_unlock(bank)
+        UpdateWorkflow.__init__(self, bank)
+        self.skip_all = True
+
+    def wf_unlock(self, bank):
+        logging.info('Workflow:wf_unlock')
+        data_dir = bank.session.config.get('data.dir')
+        lock_dir = bank.session.config.get('lock.dir', default=data_dir)
+        lock_file = os.path.join(lock_dir, bank.name + '.lock')
+        maintenance_lock_file = os.path.join(lock_dir, 'biomaj.lock')
+        if os.path.exists(maintenance_lock_file):
+            logging.error('Biomaj is in maintenance')
+            return False
+        if os.path.exists(lock_file):
+            os.remove(lock_file)
+        return True
+
+    def start(self):
+        if self.session.get('release') is None:
+            logging.warn('cannot repair bank, no known release')
+            return True
+        Workflow.start(self)
+        self.wf_over()
+        return True


=====================================
config.yml
=====================================
@@ -1,5 +1,5 @@
 biomaj:
-    global_properties: '/pasteur/services/policy01/banques/biomaj3/global.properties'
+    global_properties: '/etc/biomaj/global.properties'
 
 rabbitmq:
     host: '127.0.0.1'


=====================================
debian/changelog
=====================================
@@ -1,3 +1,9 @@
+biomaj3 (3.1.14-1) unstable; urgency=medium
+
+  * New upstream release 
+
+ -- Olivier Sallou <osallou at debian.org>  Tue, 12 Nov 2019 10:29:09 +0000
+
 biomaj3 (3.1.8-1) unstable; urgency=medium
 
   * New upstream release 


=====================================
debian/control
=====================================
@@ -29,9 +29,9 @@ Architecture: all
 Depends: ${misc:Depends},
          ${python3:Depends},
          unzip
-Recommends: ${python3:Recommends},
-            python3-biomaj3-cli
+Recommends: ${python3:Recommends}
 Suggests: ${python3:Suggests},
+          python3-biomaj3-cli,
           python3-gunicorn,
           mongodb,
           redis-server


=====================================
global.properties.example
=====================================
@@ -31,6 +31,17 @@ redis.port=6379
 redis.db=0
 redis.prefix=biomaj
 
+# options to send to downloaders (protocol dependent)
+# options.names=option1,option2
+# options.option1=value1
+# options.option2=value2
+#
+# Example supported options for ftp/http
+# tcp_keepalive: number of seconds for curl keep alive (maps to curl option TCP_KEEPALIVE, TCP_KEEPINTVL and TCP_KEEPIDLE)
+# ssl_verifyhost: boolean, check or skips SSL host  (maps to curl option SSL_VERIFYHOST)
+# ssl_verifypeer: boolean, SSL checks (maps to curl option SSL_VERIFYPEER)
+# ssl_server_cert: path to certificate (maps to curl option CAINFO).
+
 
 # Influxdb configuration (optional)
 # User and db must be manually created in influxdb before use
@@ -102,6 +113,9 @@ bank.num.threads=4
 #Number of threads to use for downloading and processing
 files.num.threads=4
 
+#Timeout for a download, in seconds (applies for each file, not for the whole downloads)
+timeout.download=3600
+
 #to keep more than one release increase this value
 keep.old.version=0
 
@@ -136,6 +150,9 @@ ftp.active.mode=false
 # Bank default access
 visibility.default=public
 
+# use hard links instead of copy
+# use_hardlinks=0
+
 [loggers]
 keys = root, biomaj
 


=====================================
requirements.txt
=====================================
@@ -1,6 +1,6 @@
-biomaj_core
+biomaj_core>=3.0.19
 biomaj_user
-biomaj_download>=3.0.20
+biomaj_download>=3.1.0
 biomaj_process>=3.0.12
 biomaj_cli
 mock
@@ -15,3 +15,4 @@ elasticsearch
 requests
 redis
 influxdb
+Yapsy==1.12.2


=====================================
setup.py
=====================================
@@ -31,11 +31,12 @@ except UnicodeDecodeError:
 config = {
     'description': 'BioMAJ',
     'long_description': README + '\n\n' + CHANGES,
+    'long_description_content_type': 'text/markdown',
     'author': 'Olivier Sallou',
     'url': 'http://biomaj.genouest.org',
     'download_url': 'http://biomaj.genouest.org',
     'author_email': 'olivier.sallou at irisa.fr',
-    'version': '3.1.8',
+    'version': '3.1.14',
      'classifiers': [
         # How mature is this project? Common values are
         #   3 - Alpha
@@ -72,7 +73,8 @@ config = {
                          'requests',
                          'redis',
                          'elasticsearch',
-                         'influxdb'
+                         'influxdb',
+                         'Yapsy==1.12.2'
                          ],
     'tests_require': ['nose', 'mock'],
     'test_suite': 'nose.collector',


=====================================
tests/biomaj_tests.py
=====================================
@@ -21,12 +21,6 @@ from biomaj.workflow import Workflow
 from biomaj.workflow import UpdateWorkflow
 from biomaj.workflow import ReleaseCheckWorkflow
 from biomaj_core.utils import Utils
-from biomaj_download.download.ftp import FTPDownload
-from biomaj_download.download.direct import DirectFTPDownload
-from biomaj_download.download.direct import DirectHttpDownload
-from biomaj_download.download.http import HTTPDownload
-from biomaj_download.download.localcopy  import LocalDownload
-from biomaj_download.download.downloadthreads import DownloadThread
 from biomaj_core.config import BiomajConfig
 from biomaj.process.processfactory import PostProcessFactory
 from biomaj.process.processfactory import PreProcessFactory
@@ -103,7 +97,11 @@ class UtilsForTest():
       os.chmod(to_file, stat.S_IRWXU)
 
     # Manage local bank test, use bank test subdir as remote
-    properties = ['multi.properties', 'computederror.properties', 'error.properties', 'local.properties', 'localprocess.properties', 'testhttp.properties', 'computed.properties', 'computed2.properties', 'sub1.properties', 'sub2.properties']
+    properties = ['multi.properties', 'computederror.properties',
+                  'error.properties', 'local.properties',
+                  'localprocess.properties', 'testhttp.properties',
+                  'computed.properties', 'computed2.properties',
+                  'sub1.properties', 'sub2.properties']
     for prop in properties:
       from_file = os.path.join(curdir, prop)
       to_file = os.path.join(self.conf_dir, prop)
@@ -421,6 +419,58 @@ class TestBiomajFunctional(unittest.TestCase):
     b.update()
     self.assertTrue(b.session.get('update'))
 
+  def test_update_hardlinks(self):
+    """
+    Update a bank twice with hard links. Files copied from previous release
+    must be links.
+    """
+    b = Bank('local')
+    b.config.set('keep.old.version', '3')
+    b.config.set('use_hardlinks', '1')
+    # Create a file in bank dir (which is the source dir) so we can manipulate
+    # it. The pattern is taken into account by the bank configuration.
+    # Note that this file is created in the source tree so we remove it after
+    # or if this test fails in between.
+    tmp_remote_file = b.config.get('remote.dir') + 'test.safe_to_del'
+    if os.path.exists(tmp_remote_file):
+        os.remove(tmp_remote_file)
+    open(tmp_remote_file, "w")
+    # First update
+    b.update()
+    self.assertTrue(b.session.get('update'))
+    old_release = b.session.get_full_release_directory()
+    # Touch tmp_remote_file to force update. We set the date to tomorrow so we
+    # are sure that a new release will be detected.
+    tomorrow = time.time() + 3660 * 24  # 3660s for safety (leap second, etc.)
+    os.utime(tmp_remote_file, (tomorrow, tomorrow))
+    # Second update
+    try:
+        b.update()
+        self.assertTrue(b.session.get('update'))
+        new_release = b.session.get_full_release_directory()
+        # Test that files in both releases are links to the the same file.
+        # We can't use tmp_remote_file because it's the source of update and we
+        # can't use test.fasta.gz because it is uncompressed and then not the
+        # same file.
+        for f in ['test2.fasta', 'test_100.txt']:
+             file_old_release = os.path.join(old_release, 'flat', f)
+             file_new_release = os.path.join(new_release, 'flat', f)
+             try:
+                 self.assertTrue(os.path.samefile(file_old_release, file_new_release))
+             except AssertionError:
+                 msg = "In %s: copy worked but hardlinks were not used." % self.id()
+                 logging.info(msg)
+        # Test that no links are done for tmp_remote_file
+        file_old_release = os.path.join(old_release, 'flat', 'test.safe_to_del')
+        file_new_release = os.path.join(new_release, 'flat', 'test.safe_to_del')
+        self.assertFalse(os.path.samefile(file_old_release, file_new_release))
+    except Exception:
+        raise
+    finally:
+        # Remove file
+        if os.path.exists(tmp_remote_file):
+            os.remove(tmp_remote_file)
+    
   def test_fromscratch_update(self):
       """
       Try updating twice, at second time, bank should  be updated (force with fromscratc)
@@ -736,6 +786,7 @@ class TestBiomajFunctional(unittest.TestCase):
   def test_multi(self):
     b = Bank('multi')
     res = b.update()
+    self.assertTrue(res)
     with open(os.path.join(b.session.get_full_release_directory(),'flat/test1.json'), 'r') as content_file:
       content = content_file.read()
       my_json = json.loads(content)



View it on GitLab: https://salsa.debian.org/med-team/biomaj3/compare/04403ed8b6c68d7e6fe930bb5d1e812014fea884...11a9fbb0c5c426e54e60d453a9e28b765fc51635

-- 
View it on GitLab: https://salsa.debian.org/med-team/biomaj3/compare/04403ed8b6c68d7e6fe930bb5d1e812014fea884...11a9fbb0c5c426e54e60d453a9e28b765fc51635
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20191112/d5bd911e/attachment-0001.html>


More information about the debian-med-commit mailing list