[med-svn] [Git][med-team/biomaj3][upstream] New upstream version 3.1.14
Olivier Sallou
gitlab at salsa.debian.org
Tue Nov 12 11:00:11 GMT 2019
Olivier Sallou pushed to branch upstream at Debian Med / biomaj3
Commits:
1cf1bd06 by Olivier Sallou at 2019-11-12T10:29:01Z
New upstream version 3.1.14
- - - - -
11 changed files:
- .travis.yml
- CHANGES.txt
- README.md
- biomaj/bank.py
- biomaj/process/metaprocess.py
- biomaj/workflow.py
- config.yml
- global.properties.example
- requirements.txt
- setup.py
- tests/biomaj_tests.py
Changes:
=====================================
.travis.yml
=====================================
@@ -2,15 +2,20 @@ language: python
sudo: false
python:
- '2.7'
-- '3.4'
-- '3.5'
- '3.6'
+- '3.7'
+- '3.8'
services:
- mongodb
- elasticsearch
branches:
except:
- "/^feature.*$/"
+before_install:
+ - sudo apt-get install -y libgnutls-dev
+addons:
+ apt:
+ update: true
install:
- pip install flake8
- pip install -r requirements.txt
=====================================
CHANGES.txt
=====================================
@@ -1,3 +1,22 @@
+3.1.14:
+ Add repair option
+3.1.13:
+ Add process name and status in logs
+ PR #116 update to use download 3.1.0
+3.1.12:
+ In case of multiple matches for release regexp, try to determine most recent one
+ #115 Correctly use save_as for release file name
+3.1.11:
+ Increase one log level
+ #110 Allow ftps and directftps protocols (needs biomaj-download 3.0.26 and biomaj-core 3.0.19)
+ #111 locked bank after bad update command
+ Ignore UTF-8 errors in release file
+ Add plugin support via biomaj-plugins repo (https://github.com/genouest/biomaj-plugins) to get release and list of files to download from a plugin script.
+ Add support for protocol options in global and bank properties (options.names=x,y options.x=val options.y=val). Options may be ignored or used differently depending on used protocol.
+3.1.10:
+ Allow to use hardlinks when reusing files from previous releases
+3.1.9:
+ Fix remote.files recursion
3.1.8:
Fix uncompress when saved files contains subdirectory
3.1.7:
=====================================
README.md
=====================================
@@ -49,42 +49,41 @@ Application Features
====================
* Synchronisation:
- * Multiple remote protocols (ftp, sftp, http, local copy, etc.)
- * Data transfers integrity check
- * Release versioning using a incremental approach
- * Multi threading
- * Data extraction (gzip, tar, bzip)
- * Data tree directory normalisation
-
+ * Multiple remote protocols (ftp, ftps, http, local copy, etc.)
+ * Data transfers integrity check
+ * Release versioning using a incremental approach
+ * Multi threading
+ * Data extraction (gzip, tar, bzip)
+ * Data tree directory normalisation
+ * Plugins support for custom downloads
* Pre &Post processing :
- * Advanced workflow description (D.A.G)
- * Post-process indexation for various bioinformatics software (blast, srs, fastacmd, readseq, etc.)
- * Easy integration of personal scripts for bank post-processing automation
-
+ * Advanced workflow description (D.A.G)
+ * Post-process indexation for various bioinformatics software (blast, srs, fastacmd, readseq, etc.)
+ * Easy integration of personal scripts for bank post-processing automation
* Supervision:
- * Optional Administration web interface (biomaj-watcher)
- * CLI management
- * Mail alerts for the update cycle supervision
- * Prometheus and Influxdb optional integration
- * Optional consul supervision of processes
+ * Optional Administration web interface (biomaj-watcher)
+ * CLI management
+ * Mail alerts for the update cycle supervision
+ * Prometheus and Influxdb optional integration
+ * Optional consul supervision of processes
* Scalability:
- * Monolithic (local install) or microservice architecture (remote access to a BioMAJ server)
- * Microservice installation allows per process scalability and supervision (number of process in charge of download, execution, etc.)
-
+ * Monolithic (local install) or microservice architecture (remote access to a BioMAJ server)
+ * Microservice installation allows per process scalability and supervision (number of process in charge of download, execution, etc.)
* Remote access:
- * Optional FTP server providing authenticated or anonymous data access
+ * Optional FTP server providing authenticated or anonymous data access
Dependencies
============
Packages:
- * Debian: libcurl-dev, gcc
- * CentOs: libcurl-devel, openldap-devel, gcc
+
+* Debian: libcurl-dev, gcc
+* CentOs: libcurl-devel, openldap-devel, gcc
Linux tools: tar, unzip, gunzip, bunzip
@@ -115,7 +114,6 @@ From packages:
pip install biomaj biomaj-cli biomaj-daemon
-
You should consider using a Python virtual environment (virtualenv) to install BioMAJ.
In tools/examples, copy the global.properties and update it to match your local
@@ -123,13 +121,11 @@ installation.
The tools/process contains example process files (python and shell).
-
Docker
======
You can use BioMAJ with Docker (genouest/biomaj)
-
docker pull genouest/biomaj
docker pull mongo
docker run --name biomaj-mongodb -d mongo
@@ -147,6 +143,51 @@ No default bank property file or process are available in the container.
Examples are available at https://github.com/genouest/biomaj-data
+
+Import bank templates
+=====================
+
+Once biomaj is installed, it is possible to import some bank examples with the biomaj client
+
+
+ # List available templates
+ biomaj-cli ... --data-list
+ # Import a bank template
+ biomaj-cli ... --data-import --bank alu
+ # then edit bank template in config directory if needed and launch bank update
+ biomaj-cli ... --update --bank alu
+
+Plugins
+=======
+
+BioMAJ support python plugins to manage custom downloads where supported protocols
+are not enough (http page with unformatted listing, access to protected pages, etc.).
+
+Example of plugins and how to configure them are available on [biomaj-plugins](https://github.com/genouest/biomaj-plugins) repository.
+
+Plugins can define a specific way to:
+
+* retreive release
+* list remote files to download
+* download remote files
+
+Plugin can define one or many of those features.
+
+Basically, one defined in bank property file:
+
+ # Location of plugins
+ plugins_dir=/opt/biomaj-plugins
+ # Use plugin to fetch release
+ release.plugin=github
+ # List of arguments of plugin function with key=value format, comma separated
+ release.plugin_args=repo=osallou/goterra-cli
+
+Plugins are used when related workflow step is used:
+
+* release.plugin <= returns remote release
+* remote.plugin <= returns list of files to download
+* download.plugin <= download files from list of files
+
API documentation
=================
@@ -172,7 +213,6 @@ Execute unit tests but disable ones needing network access
nosetests -a '!network'
-
Monitoring
==========
@@ -195,14 +235,12 @@ A-GPL v3+
Remarks
=======
-Biomaj uses libcurl, for sftp libcurl must be compiled with sftp support
-
To delete elasticsearch index:
- curl -XDELETE 'http://localhost:9200/biomaj_test/'
+ curl -XDELETE '<http://localhost:9200/biomaj_test/'>
Credits
-======
+=======
Special thanks for tuco at Pasteur Institute for the intensive testing and new ideas.
Thanks to the old BioMAJ team for the work they have done.
=====================================
biomaj/bank.py
=====================================
@@ -13,6 +13,7 @@ from biomaj.mongo_connector import MongoConnector
from biomaj.session import Session
from biomaj.workflow import UpdateWorkflow
from biomaj.workflow import RemoveWorkflow
+from biomaj.workflow import RepairWorkflow
from biomaj.workflow import Workflow
from biomaj.workflow import ReleaseCheckWorkflow
from biomaj_core.config import BiomajConfig
@@ -502,8 +503,10 @@ class Bank(object):
# Insert session
if self.session.get('action') == 'update':
action = 'last_update_session'
- if self.session.get('action') == 'remove':
+ elif self.session.get('action') == 'remove':
action = 'last_remove_session'
+ else:
+ action = 'last_update_session'
cache_dir = self.config.get('cache.dir')
download_files = self.session.get('download_files')
@@ -1112,6 +1115,74 @@ class Bank(object):
return res
+ def repair(self):
+ """
+ Launch a bank repair
+
+ :return: bool
+ """
+ logging.warning('Bank:' + self.name + ':Repair')
+ start_time = datetime.now()
+ start_time = time.mktime(start_time.timetuple())
+
+ if not self.is_owner():
+ logging.error('Not authorized, bank owned by ' + self.bank['properties']['owner'])
+ raise Exception('Not authorized, bank owned by ' + self.bank['properties']['owner'])
+
+ self.run_depends = False
+
+ self.controls()
+ if self.options.get_option('release'):
+ logging.info('Bank:' + self.name + ':Release:' + self.options.get_option('release'))
+ s = self.get_session_from_release(self.options.get_option('release'))
+ # No session in prod
+ if s is None:
+ logging.error('Release does not exists: ' + self.options.get_option('release'))
+ return False
+ self.load_session(UpdateWorkflow.FLOW, s)
+ else:
+ logging.info('Bank:' + self.name + ':Release:latest')
+ self.load_session(UpdateWorkflow.FLOW)
+ self.session.set('action', 'update')
+ res = self.start_repair()
+ self.session.set('workflow_status', res)
+ self.save_session()
+ try:
+ self.__stats()
+ except Exception:
+ logging.exception('Failed to send stats')
+ end_time = datetime.now()
+ end_time = time.mktime(end_time.timetuple())
+ self.history.insert({
+ 'bank': self.name,
+ 'error': not res,
+ 'start': start_time,
+ 'end': end_time,
+ 'action': 'repair',
+ 'updated': self.session.get('update')
+ })
+ return res
+
+ def start_repair(self):
+ """
+ Start an repair workflow
+ """
+ workflow = RepairWorkflow(self)
+ if self.options and self.options.get_option('redis_host'):
+ redis_client = redis.StrictRedis(
+ host=self.options.get_option('redis_host'),
+ port=self.options.get_option('redis_port'),
+ db=self.options.get_option('redis_db'),
+ decode_responses=True
+ )
+ workflow.redis_client = redis_client
+ workflow.redis_prefix = self.options.get_option('redis_prefix')
+ if redis_client.get(self.options.get_option('redis_prefix') + ':' + self.name + ':action:cancel'):
+ logging.warn('Cancel requested, stopping update')
+ redis_client.delete(self.options.get_option('redis_prefix') + ':' + self.name + ':action:cancel')
+ return False
+ return workflow.start()
+
def update(self, depends=False):
"""
Launch a bank update
@@ -1165,7 +1236,9 @@ class Bank(object):
if not reset:
logging.info("Process %s not found in %s" % (str(proc), task['name']))
return False
-
+ if not set_to_false:
+ logging.error('No task found named %s' % (self.options.get_option('from_task')))
+ return False
self.session.set('action', 'update')
res = self.start_update()
self.session.set('workflow_status', res)
=====================================
biomaj/process/metaprocess.py
=====================================
@@ -262,8 +262,11 @@ class MetaProcess(threading.Thread):
processes_status[bprocess] = res
self.set_progress(bmaj_process.name, res)
if not res:
+ logging.info("PROC:META:RUN:PROCESS:ERROR:" + bmaj_process.name)
self.global_status = False
break
+ else:
+ logging.info("PROC:META:RUN:PROCESS:OK:" + bmaj_process.name)
if not self.simulate:
if self._lock:
self._lock.acquire()
=====================================
biomaj/workflow.py
=====================================
@@ -16,7 +16,7 @@ import sys
from biomaj_core.utils import Utils
from biomaj_download.downloadclient import DownloadClient
from biomaj_download.message import downmessage_pb2
-from biomaj_download.download.http import HTTPParse
+from biomaj_download.download.curl import HTTPParse
from biomaj_download.download.localcopy import LocalDownload
from biomaj.mongo_connector import MongoConnector
@@ -24,6 +24,7 @@ from biomaj.options import Options
from biomaj.process.processfactory import RemoveProcessFactory, PreProcessFactory, PostProcessFactory
from biomaj_zipkin.zipkin import Zipkin
+from yapsy.PluginManager import PluginManager
class Workflow(object):
@@ -334,6 +335,28 @@ class UpdateWorkflow(Workflow):
logging.debug('New workflow')
self.session._session['update'] = True
+ def _get_plugin(self, name, plugin_args):
+ options = {}
+ plugins_dir = self.bank.config.get('plugins_dir')
+ if not plugins_dir:
+ return None
+ if plugin_args:
+ for plugin_arg in plugin_args.split(','):
+ arg = plugin_arg.split('=', 1)
+ options[arg[0].strip()] = arg[1].strip()
+ requested_plugin = None
+ simplePluginManager = PluginManager()
+ simplePluginManager.setPluginPlaces([plugins_dir])
+ simplePluginManager.collectPlugins()
+ logging.error('Load plugins from %s' % (plugins_dir))
+ for pluginInfo in simplePluginManager.getAllPlugins():
+ if pluginInfo.plugin_object.name() == name:
+ requested_plugin = pluginInfo.plugin_object
+ if requested_plugin is None:
+ return None
+ requested_plugin.configure(options)
+ return requested_plugin
+
def wf_init(self):
err = super(UpdateWorkflow, self).wf_init()
if not err:
@@ -431,6 +454,7 @@ class UpdateWorkflow(Workflow):
"""
logging.info('Workflow:wf_copydepends')
deps = self.bank.get_dependencies()
+ cf = self.session.config
for dep in deps:
if self.bank.config.get(dep + '.files.move'):
logging.info('Worflow:wf_depends:Files:Move:' + self.bank.config.get(dep + '.files.move'))
@@ -443,7 +467,10 @@ class UpdateWorkflow(Workflow):
logging.error('Could not find a session update for bank ' + dep)
return False
# b = self.bank.get_bank(dep, no_log=True)
- locald = LocalDownload(bdir)
+ locald = LocalDownload(
+ bdir,
+ use_hardlinks=cf.get_bool("use_hardlinks", default=False)
+ )
(file_list, dir_list) = locald.list()
locald.match(self.bank.config.get(dep + '.files.move').split(), file_list, dir_list)
bankdepdir = self.bank.session.get_full_release_directory() + "/" + dep
@@ -484,11 +511,38 @@ class UpdateWorkflow(Workflow):
MongoConnector.banks.update({'name': self.bank.name},
info)
+ def __findLastRelease(self, releases):
+ '''
+ Try to find most release from releases input array
+ '''
+ release = releases[0]
+ releaseElts = re.split(r'\.|-', release)
+ logging.debug('found a release %s' % (release))
+ for rel in releases:
+ if rel == release:
+ continue
+ logging.debug('compare next release %s' % (rel))
+ relElts = re.split(r'\.|-', rel)
+ index = 0
+ for relElt in relElts:
+ logging.debug("compare release major,minor,etc. : %s >? %s" % (relElt, releaseElts[index]))
+ try:
+ if int(relElt) > int(releaseElts[index]):
+ release = rel
+ logging.debug("found newer release %s" % (rel))
+ break
+ except ValueError:
+ pass
+ finally:
+ index += 1
+ return release
+
def wf_release(self):
"""
Find current release on remote
"""
logging.info('Workflow:wf_release')
+ release = None
cf = self.session.config
if cf.get('ref.release') and self.bank.depends:
# Bank is a computed bank and we ask to set release to the same
@@ -531,6 +585,28 @@ class UpdateWorkflow(Workflow):
self.session.previous_release = self.session.get('previous_release')
logging.info('Workflow:wf_release:previous_session:' + str(self.session.previous_release))
+
+ if self.session.config.get('release.plugin'):
+ # Release already set from a previous run or an other bank
+ logging.info('Workflow:wf_release:plugin:' + str(self.session.config.get('release.plugin')))
+ plugin = self._get_plugin(self.session.config.get('release.plugin'), self.session.config.get('release.plugin_args'))
+ if plugin is None:
+ logging.error("Could not load plugin")
+ return False
+ try:
+ plugin_release = plugin.release()
+ logging.info('Workflow:wf_release:plugin:%s:%s' % (self.session.config.get('release.plugin'), plugin_release))
+ self.session.set('release', plugin_release)
+ self.session.set('remoterelease', plugin_release)
+ if self.session.previous_release == self.session.get('release') and not self.session.config.get_bool('release.control', default=False):
+ logging.info('Workflow:wf_release:same_as_previous_session')
+ return self.no_need_to_update()
+ else:
+ return True
+ except Exception as e:
+ logging.exception("Plugin failed to get a release %s" % (str(e)))
+ return False
+
if self.session.get('release'):
# Release already set from a previous run or an other bank
logging.info('Workflow:wf_release:session:' + str(self.session.get('release')))
@@ -603,7 +679,7 @@ class UpdateWorkflow(Workflow):
save_as = None
method = 'GET'
- if protocol == 'directhttp' or protocol == 'directhttps' or protocol == 'directftp':
+ if protocol in ['directftp', 'directftps', 'directhttp', 'directhttps']:
keys = cf.get('url.params')
if keys is not None:
params = {}
@@ -631,6 +707,19 @@ class UpdateWorkflow(Workflow):
params['protocol'] = str(cf.get('irods.protocol')).strip()
params['zone'] = str(cf.get('irods.zone')).strip()
+ # Protocol options: as for params, a field contains the name
+ # of the options (options.names) and the values are in another
+ # field named options.<option_name>.
+ protocol_options = {}
+ option_names = cf.get('options.names')
+ if option_names is not None:
+ option_names = option_names.split(',')
+ for option_name in option_names:
+ option_name = option_name.strip()
+ param = cf.get('options.' + option_name)
+ protocol_options[option_name] = param.strip()
+ logging.debug("Protocol options: " + str(protocol_options))
+
release_downloader = dserv.get_handler(
protocol,
server,
@@ -643,10 +732,11 @@ class UpdateWorkflow(Workflow):
proxy_auth=proxy_auth,
save_as=save_as,
timeout_download=cf.get('timeout.download'),
- offline_dir=self.session.get_offline_directory()
+ offline_dir=self.session.get_offline_directory(),
+ protocol_options=protocol_options
)
- if protocol == 'directhttp' or protocol == 'directhttps' or protocol == 'directftp':
+ if protocol in ['directftp', 'directftps', 'directhttp', 'directhttps']:
release_downloader.set_files_to_download(remotes)
# """"""""""""""""""""""""
@@ -684,10 +774,13 @@ class UpdateWorkflow(Workflow):
tmp_dir = tempfile.mkdtemp('biomaj')
rel_files = release_downloader.download(tmp_dir)
rel_file = None
+ rel_file_name = rel_files[0]['name']
+ if 'save_as' in rel_files[0] and rel_files[0]['save_as']:
+ rel_file_name = rel_files[0]['save_as']
if (sys.version_info > (3, 0)):
- rel_file = open(tmp_dir + '/' + rel_files[0]['name'], encoding='utf-8')
+ rel_file = open(tmp_dir + '/' + rel_file_name, encoding='utf-8', errors='ignore')
else:
- rel_file = open(tmp_dir + '/' + rel_files[0]['name'])
+ rel_file = open(tmp_dir + '/' + rel_file_name)
rel_content = rel_file.read()
rel_file.close()
shutil.rmtree(tmp_dir)
@@ -696,11 +789,18 @@ class UpdateWorkflow(Workflow):
logging.error('release.regexp defined but does not match any file content')
self._close_download_service(dserv)
return False
+ rels = re.findall(cf.get('release.regexp'), rel_content)
+ if len(rels) == 1:
+ release = rels[0]
+ else:
+ release = self.__findLastRelease(rels)
+ '''
# If regexp contains matching group, else take whole match
if len(rel.groups()) > 0:
release = rel.group(1)
else:
release = rel.group(0)
+ '''
release_downloader.close()
self._close_download_service(dserv)
@@ -974,6 +1074,7 @@ class UpdateWorkflow(Workflow):
logging.info("Workflow:wf_download:DownloadSession:" + str(session))
use_remote_list = False
+ use_plugin = False
http_parse = HTTPParse(
cf.get('http.parse.dir.line'),
@@ -1058,6 +1159,19 @@ class UpdateWorkflow(Workflow):
if cf.get('remote.file.' + str(i) + '.name'):
save_as = cf.get('remote.file.' + str(i) + '.name')
+ # Protocol options: as for params, a field contains the name
+ # of the options (options.names) and the values are in another
+ # field named options.<option_name>.
+ protocol_options = {}
+ option_names = cf.get('remote.file.' + str(i) + '.options.names')
+ if option_names is not None:
+ option_names = option_names.split(',')
+ for option_name in option_names:
+ option_name = option_name.strip()
+ param = cf.get('remote.file.' + str(i) + '.options.' + option_name)
+ protocol_options[option_name] = param.strip()
+ logging.debug("Protocol options: " + str(protocol_options))
+
subdownloader = dserv.get_handler(
protocol,
server,
@@ -1070,7 +1184,8 @@ class UpdateWorkflow(Workflow):
proxy_auth=proxy_auth,
save_as=save_as,
timeout_download=cf.get('timeout.download'),
- offline_dir=self.session.get_offline_directory()
+ offline_dir=self.session.get_offline_directory(),
+ protocol_options=protocol_options
)
subdownloader.set_files_to_download(remotes)
@@ -1094,7 +1209,7 @@ class UpdateWorkflow(Workflow):
remote_dir = cf.get('remote.dir')
- if protocol == 'directhttp' or protocol == 'directhttps' or protocol == 'directftp':
+ if protocol in ['directftp', 'directftps', 'directhttp', 'directhttps']:
keys = cf.get('url.params')
if keys is not None:
params = {}
@@ -1120,6 +1235,19 @@ class UpdateWorkflow(Workflow):
save_as = cf.get('target.name')
+ # Protocol options: as for params, a field contains the name
+ # of the options (options.names) and the values are in another
+ # field named options.<option_name>.
+ protocol_options = {}
+ option_names = cf.get('options.names')
+ if option_names is not None:
+ option_names = option_names.split(',')
+ for option_name in option_names:
+ option_name = option_name.strip()
+ param = cf.get('options.' + option_name)
+ protocol_options[option_name] = param.strip()
+ logging.debug("Protocol options: " + str(protocol_options))
+
downloader = dserv.get_handler(
protocol,
server,
@@ -1132,10 +1260,11 @@ class UpdateWorkflow(Workflow):
proxy_auth=proxy_auth,
save_as=save_as,
timeout_download=cf.get('timeout.download'),
- offline_dir=self.session.get_offline_directory()
+ offline_dir=self.session.get_offline_directory(),
+ protocol_options=protocol_options
)
- if protocol == 'directhttp' or protocol == 'directhttps' or protocol == 'directftp':
+ if protocol in ['directftp', 'directftps', 'directhttp', 'directhttps']:
downloader.set_files_to_download(remotes)
remote_list = cf.get('remote.list', default=None)
@@ -1144,6 +1273,13 @@ class UpdateWorkflow(Workflow):
downloader.files_to_download = self._get_list_from_file(remote_list)
use_remote_list = True
+ remote_plugin = cf.get('remote.plugin', default=None)
+ if remote_plugin is not None:
+ logging.info("Use list from plugin %s" % (remote_plugin))
+ plugin = self._get_plugin(self.session.config.get('remote.plugin'), self.session.config.get('remote.plugin_args'))
+ downloader.files_to_download = plugin.list(self.session.get('release'))
+ use_plugin = True
+
downloaders.append(downloader)
self._close_download_service(dserv)
@@ -1160,6 +1296,10 @@ class UpdateWorkflow(Workflow):
if not downloader.files_to_download:
self.session.set('remoterelease', self.session.previous_release)
return self.no_need_to_update()
+ elif use_plugin:
+ if not downloader.files_to_download:
+ self.session.set('remoterelease', self.session.previous_release)
+ return self.no_need_to_update()
else:
(file_list, dir_list) = downloader.list()
downloader.match(cf.get('remote.files', default='.*').split(), file_list, dir_list)
@@ -1169,6 +1309,8 @@ class UpdateWorkflow(Workflow):
if 'save_as' not in f or not f['save_as']:
f['save_as'] = f['name']
for p in cf.get('remote.files', default='.*').split():
+ if p == '.*' or p == '**/*':
+ continue
if p.startswith('^'):
p = p.replace('^', '^/')
else:
@@ -1183,7 +1325,6 @@ class UpdateWorkflow(Workflow):
self.session.set('download_files', downloader.files_to_download)
self.session._session['stats']['nb_downloaded_files'] = len(files_to_download)
logging.info('Workflow:wf_download:nb_files_to_download:%d' % (len(files_to_download)))
-
if self.session.get('release') and self.session.config.get_bool('release.control', default=False):
if self.session.previous_release == self.session.get('remoterelease'):
if self.is_previous_release_content_identical():
@@ -1334,7 +1475,10 @@ class UpdateWorkflow(Workflow):
logging.debug('Workflow:wf_download:Copy files from ' + last_production_dir)
for downloader in downloaders:
copied_files += downloader.files_to_copy
- Utils.copy_files(downloader.files_to_copy, offline_dir)
+ Utils.copy_files(
+ downloader.files_to_copy, offline_dir,
+ use_hardlinks=cf.get_bool('use_hardlinks', default=False)
+ )
downloader.close()
@@ -1352,7 +1496,7 @@ class UpdateWorkflow(Workflow):
redis_prefix=self.redis_prefix
)
if pool_size:
- logging.debug('Set rate limiting: %s' % (str(pool_size)))
+ logging.info('Set rate limiting: %s' % (str(pool_size)))
dserv.set_rate_limiting(int(pool_size))
else:
@@ -1376,6 +1520,7 @@ class UpdateWorkflow(Workflow):
message.bank = self.name
message.session = session
message.local_dir = offline_dir
+ message.protocol_options.update(downloader.protocol_options)
remote_file = downmessage_pb2.DownloadFile.RemoteFile()
protocol = downloader.protocol
remote_file.protocol = downmessage_pb2.DownloadFile.Protocol.Value(protocol.upper())
@@ -1452,6 +1597,19 @@ class UpdateWorkflow(Workflow):
logging.info("Workflow:wf_download:Download:Waiting")
download_error = False
try:
+ download_plugin = cf.get('download.plugin', default=None)
+ if download_plugin is not None:
+ self.downloaded_files = copied_files
+ logging.info("Use download from plugin %s" % (download_plugin))
+ plugin = self._get_plugin(self.session.config.get('download.plugin'), self.session.config.get('remote.plugin_args'))
+ if not hasattr(plugin, 'download'):
+ logging.error('Workflow:wf_download:Plugin:%s:Error: plugin does not have a download method' % (download_plugin))
+ return False
+ res = plugin.download(self.session.get('release'), downloader.files_to_download, offline_dir=offline_dir)
+ self._close_download_service(dserv)
+ self.downloaded_files += downloader.files_to_download
+ return res
+
download_error = dserv.wait_for_download()
except Exception as e:
self._close_download_service(dserv)
@@ -1561,7 +1719,7 @@ class UpdateWorkflow(Workflow):
self.session.get_release_directory(),
'flat'
)
-
+ # We use move=True so there is no need to try to use hardlinks here
local_files = Utils.copy_files_with_regexp(from_dir, to_dir, regexp, True)
self.session._session['files'] = local_files
if len(self.session._session['files']) == 0:
@@ -1829,3 +1987,43 @@ class ReleaseCheckWorkflow(UpdateWorkflow):
def wf_progress(self, task, status):
return
+
+
+class RepairWorkflow(UpdateWorkflow):
+ """
+ Workflow to repaire a bank i.e. unlock bank and set all steps to OK (after a manual repair for example)
+ """
+
+ def __init__(self, bank):
+ """
+ Instantiate a new workflow
+
+ :param bank: bank on which to apply the workflow
+ :type bank: Bank
+ :param session: session to remove
+ :type session: :class:`biomaj.session.Session`
+ """
+ self.wf_unlock(bank)
+ UpdateWorkflow.__init__(self, bank)
+ self.skip_all = True
+
+ def wf_unlock(self, bank):
+ logging.info('Workflow:wf_unlock')
+ data_dir = bank.session.config.get('data.dir')
+ lock_dir = bank.session.config.get('lock.dir', default=data_dir)
+ lock_file = os.path.join(lock_dir, bank.name + '.lock')
+ maintenance_lock_file = os.path.join(lock_dir, 'biomaj.lock')
+ if os.path.exists(maintenance_lock_file):
+ logging.error('Biomaj is in maintenance')
+ return False
+ if os.path.exists(lock_file):
+ os.remove(lock_file)
+ return True
+
+ def start(self):
+ if self.session.get('release') is None:
+ logging.warn('cannot repair bank, no known release')
+ return True
+ Workflow.start(self)
+ self.wf_over()
+ return True
=====================================
config.yml
=====================================
@@ -1,5 +1,5 @@
biomaj:
- global_properties: '/pasteur/services/policy01/banques/biomaj3/global.properties'
+ global_properties: '/etc/biomaj/global.properties'
rabbitmq:
host: '127.0.0.1'
=====================================
global.properties.example
=====================================
@@ -31,6 +31,17 @@ redis.port=6379
redis.db=0
redis.prefix=biomaj
+# options to send to downloaders (protocol dependent)
+# options.names=option1,option2
+# options.option1=value1
+# options.option2=value2
+#
+# Example supported options for ftp/http
+# tcp_keepalive: number of seconds for curl keep alive (maps to curl option TCP_KEEPALIVE, TCP_KEEPINTVL and TCP_KEEPIDLE)
+# ssl_verifyhost: boolean, check or skips SSL host (maps to curl option SSL_VERIFYHOST)
+# ssl_verifypeer: boolean, SSL checks (maps to curl option SSL_VERIFYPEER)
+# ssl_server_cert: path to certificate (maps to curl option CAINFO).
+
# Influxdb configuration (optional)
# User and db must be manually created in influxdb before use
@@ -102,6 +113,9 @@ bank.num.threads=4
#Number of threads to use for downloading and processing
files.num.threads=4
+#Timeout for a download, in seconds (applies for each file, not for the whole downloads)
+timeout.download=3600
+
#to keep more than one release increase this value
keep.old.version=0
@@ -136,6 +150,9 @@ ftp.active.mode=false
# Bank default access
visibility.default=public
+# use hard links instead of copy
+# use_hardlinks=0
+
[loggers]
keys = root, biomaj
=====================================
requirements.txt
=====================================
@@ -1,6 +1,6 @@
-biomaj_core
+biomaj_core>=3.0.19
biomaj_user
-biomaj_download>=3.0.20
+biomaj_download>=3.1.0
biomaj_process>=3.0.12
biomaj_cli
mock
@@ -15,3 +15,4 @@ elasticsearch
requests
redis
influxdb
+Yapsy==1.12.2
=====================================
setup.py
=====================================
@@ -31,11 +31,12 @@ except UnicodeDecodeError:
config = {
'description': 'BioMAJ',
'long_description': README + '\n\n' + CHANGES,
+ 'long_description_content_type': 'text/markdown',
'author': 'Olivier Sallou',
'url': 'http://biomaj.genouest.org',
'download_url': 'http://biomaj.genouest.org',
'author_email': 'olivier.sallou at irisa.fr',
- 'version': '3.1.8',
+ 'version': '3.1.14',
'classifiers': [
# How mature is this project? Common values are
# 3 - Alpha
@@ -72,7 +73,8 @@ config = {
'requests',
'redis',
'elasticsearch',
- 'influxdb'
+ 'influxdb',
+ 'Yapsy==1.12.2'
],
'tests_require': ['nose', 'mock'],
'test_suite': 'nose.collector',
=====================================
tests/biomaj_tests.py
=====================================
@@ -21,12 +21,6 @@ from biomaj.workflow import Workflow
from biomaj.workflow import UpdateWorkflow
from biomaj.workflow import ReleaseCheckWorkflow
from biomaj_core.utils import Utils
-from biomaj_download.download.ftp import FTPDownload
-from biomaj_download.download.direct import DirectFTPDownload
-from biomaj_download.download.direct import DirectHttpDownload
-from biomaj_download.download.http import HTTPDownload
-from biomaj_download.download.localcopy import LocalDownload
-from biomaj_download.download.downloadthreads import DownloadThread
from biomaj_core.config import BiomajConfig
from biomaj.process.processfactory import PostProcessFactory
from biomaj.process.processfactory import PreProcessFactory
@@ -103,7 +97,11 @@ class UtilsForTest():
os.chmod(to_file, stat.S_IRWXU)
# Manage local bank test, use bank test subdir as remote
- properties = ['multi.properties', 'computederror.properties', 'error.properties', 'local.properties', 'localprocess.properties', 'testhttp.properties', 'computed.properties', 'computed2.properties', 'sub1.properties', 'sub2.properties']
+ properties = ['multi.properties', 'computederror.properties',
+ 'error.properties', 'local.properties',
+ 'localprocess.properties', 'testhttp.properties',
+ 'computed.properties', 'computed2.properties',
+ 'sub1.properties', 'sub2.properties']
for prop in properties:
from_file = os.path.join(curdir, prop)
to_file = os.path.join(self.conf_dir, prop)
@@ -421,6 +419,58 @@ class TestBiomajFunctional(unittest.TestCase):
b.update()
self.assertTrue(b.session.get('update'))
+ def test_update_hardlinks(self):
+ """
+ Update a bank twice with hard links. Files copied from previous release
+ must be links.
+ """
+ b = Bank('local')
+ b.config.set('keep.old.version', '3')
+ b.config.set('use_hardlinks', '1')
+ # Create a file in bank dir (which is the source dir) so we can manipulate
+ # it. The pattern is taken into account by the bank configuration.
+ # Note that this file is created in the source tree so we remove it after
+ # or if this test fails in between.
+ tmp_remote_file = b.config.get('remote.dir') + 'test.safe_to_del'
+ if os.path.exists(tmp_remote_file):
+ os.remove(tmp_remote_file)
+ open(tmp_remote_file, "w")
+ # First update
+ b.update()
+ self.assertTrue(b.session.get('update'))
+ old_release = b.session.get_full_release_directory()
+ # Touch tmp_remote_file to force update. We set the date to tomorrow so we
+ # are sure that a new release will be detected.
+ tomorrow = time.time() + 3660 * 24 # 3660s for safety (leap second, etc.)
+ os.utime(tmp_remote_file, (tomorrow, tomorrow))
+ # Second update
+ try:
+ b.update()
+ self.assertTrue(b.session.get('update'))
+ new_release = b.session.get_full_release_directory()
+ # Test that files in both releases are links to the the same file.
+ # We can't use tmp_remote_file because it's the source of update and we
+ # can't use test.fasta.gz because it is uncompressed and then not the
+ # same file.
+ for f in ['test2.fasta', 'test_100.txt']:
+ file_old_release = os.path.join(old_release, 'flat', f)
+ file_new_release = os.path.join(new_release, 'flat', f)
+ try:
+ self.assertTrue(os.path.samefile(file_old_release, file_new_release))
+ except AssertionError:
+ msg = "In %s: copy worked but hardlinks were not used." % self.id()
+ logging.info(msg)
+ # Test that no links are done for tmp_remote_file
+ file_old_release = os.path.join(old_release, 'flat', 'test.safe_to_del')
+ file_new_release = os.path.join(new_release, 'flat', 'test.safe_to_del')
+ self.assertFalse(os.path.samefile(file_old_release, file_new_release))
+ except Exception:
+ raise
+ finally:
+ # Remove file
+ if os.path.exists(tmp_remote_file):
+ os.remove(tmp_remote_file)
+
def test_fromscratch_update(self):
"""
Try updating twice, at second time, bank should be updated (force with fromscratc)
@@ -736,6 +786,7 @@ class TestBiomajFunctional(unittest.TestCase):
def test_multi(self):
b = Bank('multi')
res = b.update()
+ self.assertTrue(res)
with open(os.path.join(b.session.get_full_release_directory(),'flat/test1.json'), 'r') as content_file:
content = content_file.read()
my_json = json.loads(content)
View it on GitLab: https://salsa.debian.org/med-team/biomaj3/commit/1cf1bd0650107a6cf53d0174f13146f982afdacb
--
View it on GitLab: https://salsa.debian.org/med-team/biomaj3/commit/1cf1bd0650107a6cf53d0174f13146f982afdacb
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20191112/ad91f710/attachment-0001.html>
More information about the debian-med-commit
mailing list