[med-svn] [r-bioc-biocparallel] 01/05: New upstream version 1.12.0
Andreas Tille
tille at debian.org
Tue Nov 7 16:46:07 UTC 2017
This is an automated email from the git hooks/post-receive script.
tille pushed a commit to branch master
in repository r-bioc-biocparallel.
commit 72db2ab7a0574edae628c7e9a4e596594e4eb331
Author: Andreas Tille <tille at debian.org>
Date: Tue Nov 7 17:29:23 2017 +0100
New upstream version 1.12.0
---
DESCRIPTION | 17 ++-
NAMESPACE | 8 +-
NEWS | 13 +-
R/SnowParam-class.R | 10 +-
R/bploop.R | 13 +-
R/ipcmutex.R | 43 ++++++
R/progress.R | 36 +++--
R/register.R | 34 +++--
build/BiocParallel.pdf | Bin 212948 -> 217955 bytes
build/vignette.rds | Bin 289 -> 290 bytes
inst/doc/Errors_Logs_And_Debugging.R | 64 ++++-----
inst/doc/Errors_Logs_And_Debugging.pdf | Bin 149232 -> 180054 bytes
inst/doc/Introduction_To_BiocParallel.R | 81 ++++++-----
inst/doc/Introduction_To_BiocParallel.Rnw | 57 +++++++-
inst/doc/Introduction_To_BiocParallel.pdf | Bin 185989 -> 217744 bytes
inst/unitTests/test_ipcmutex.R | 24 ++++
man/ipcmutex.Rd | 160 +++++++++++++++++++++
src/ipcmutex.cpp | 224 +++++++++++++++++++++++++++++
vignettes/Introduction_To_BiocParallel.Rnw | 57 +++++++-
19 files changed, 728 insertions(+), 113 deletions(-)
diff --git a/DESCRIPTION b/DESCRIPTION
index 150348a..bde033b 100644
--- a/DESCRIPTION
+++ b/DESCRIPTION
@@ -1,7 +1,7 @@
Package: BiocParallel
Type: Package
Title: Bioconductor facilities for parallel evaluation
-Version: 1.10.1
+Version: 1.12.0
Authors at R: c(
person("Bioconductor Package Maintainer",
email="maintainer at bioconductor.org", role="cre"),
@@ -16,12 +16,14 @@ URL: https://github.com/Bioconductor/BiocParallel
BugReports: https://github.com/Bioconductor/BiocParallel/issues
biocViews: Infrastructure
License: GPL-2 | GPL-3
+SystemRequirements: C++11
Depends: methods
Imports: stats, utils, futile.logger, parallel, snow
Suggests: BiocGenerics, tools, foreach, BatchJobs, BBmisc, doParallel,
- Rmpi, GenomicRanges, RNAseqData.HNRNPC.bam.chr14, Rsamtools,
- GenomicAlignments, ShortRead, codetools, RUnit, BiocStyle,
- knitr
+ Rmpi, GenomicRanges, RNAseqData.HNRNPC.bam.chr14,
+ TxDb.Hsapiens.UCSC.hg19.knownGene, VariantAnnotation,
+ Rsamtools, GenomicAlignments, ShortRead, codetools, RUnit,
+ BiocStyle, knitr
Collate: AllGenerics.R BiocParallelParam-class.R bploop.R
ErrorHandling.R log.R bpbackend-methods.R bpisup-methods.R
bplapply-methods.R bpmapply-methods.R bpiterate-methods.R
@@ -30,10 +32,11 @@ Collate: AllGenerics.R BiocParallelParam-class.R bploop.R
bpaggregate-methods.R bpvalidate.R SnowParam-class.R
MulticoreParam-class.R register.R SerialParam-class.R
DoparParam-class.R SnowParam-utils.R BatchJobsParam-class.R
- progress.R utilities.R
+ progress.R ipcmutex.R utilities.R
+LinkingTo: BH
VignetteBuilder: knitr
-NeedsCompilation: no
-Packaged: 2017-05-02 23:22:15 UTC; biocbuild
+NeedsCompilation: yes
+Packaged: 2017-10-30 23:36:29 UTC; biocbuild
Author: Bioconductor Package Maintainer [cre],
Martin Morgan [aut],
Valerie Obenchain [aut],
diff --git a/NAMESPACE b/NAMESPACE
index 9c4ae00..0063fd3 100644
--- a/NAMESPACE
+++ b/NAMESPACE
@@ -1,3 +1,5 @@
+useDynLib("BiocParallel", .registration = TRUE)
+
import(methods)
importFrom(stats, setNames, terms)
@@ -38,7 +40,11 @@ export(
## helpers
bploop, # worker, manager loops
multicoreWorkers, snowWorkers, bpvalidate, bpok,
- bprunMPIslave
+ bprunMPIslave,
+
+ ## ipcmutex
+ ipcid, ipcremove, ipclock, ipctrylock, ipcunlock, ipclocked,
+ ipcyield, ipcvalue, ipcreset
)
### - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
diff --git a/NEWS b/NEWS
index c315f19..f497985 100644
--- a/NEWS
+++ b/NEWS
@@ -1,11 +1,20 @@
-CHANGES IN VERSION 1.10
+CHANGES IN VERSION 1.11
-----------------------
BUG FIXES
- o (v. 1.10.1) Change registered backend initialization to first
+ o (v. 1.11.1) Change registered backend initialization to first
invocation, rather than on load.
+ o (v 1.11.8) Ensure registry is initiailized before (public) use.
+ Issue #65
+
+NEW FEATURES
+
+ o (v. 1.11.2) bpiterate() gains a progress counter.
+
+ o (v. 1.11.5) ipclock(), etc: inter-process locks and counters
+
CHANGES IN VERSION 1.9
----------------------
diff --git a/R/SnowParam-class.R b/R/SnowParam-class.R
index 4ff13b1..63f8c66 100644
--- a/R/SnowParam-class.R
+++ b/R/SnowParam-class.R
@@ -243,6 +243,13 @@ setReplaceMethod("bpworkers", c("SnowParam", "numeric"),
x
})
+setReplaceMethod("bpworkers", c("SnowParam", "character"),
+ function(x, value)
+{
+ x$workers <- x$.clusterargs$spec <- value
+ x
+})
+
setMethod("bpRNGseed", "SnowParam",
function(x)
{
@@ -286,7 +293,8 @@ setMethod("bpstart", "SnowParam",
cargs$spec <- if (is.numeric(cargs$spec)) {
nnodes
} else cargs$spec[seq_len(nnodes)]
- cargs$snowlib <- find.package("BiocParallel")
+ if (is.null(cargs$snowlib))
+ cargs$snowlib <- find.package("BiocParallel")
if (!is.null(cargs$useRscript) && !cargs$useRscript)
cargs$scriptdir <- find.package("BiocParallel")
diff --git a/R/bploop.R b/R/bploop.R
index fb63cf8..cc3a105 100644
--- a/R/bploop.R
+++ b/R/bploop.R
@@ -15,11 +15,11 @@
}
.send_VALUE <-
- function(node, tag, value, success, time, log, gc, sout)
+ function(node, tag, value, success, time, log, sout)
{
data <- list(type = "VALUE", tag = tag,
value = value, success = success,
- time = time, log = log, gc = gc, sout = sout)
+ time = time, log = log, sout = sout)
parallel:::sendData(node, data)
TRUE
}
@@ -66,7 +66,6 @@ bploop <- function(manager, ...)
file <- textConnection("sout", "w", local=TRUE)
sink(file, type="message")
sink(file, type="output")
- gc(reset=TRUE)
t1 <- proc.time()
value <- tryCatch({
do.call(msg$data$fun, msg$data$args)
@@ -82,10 +81,9 @@ bploop <- function(manager, ...)
success <- !(inherits(value, "bperror") || !all(bpok(value)))
log <- .log_buffer_get()
- gc <- gc()
.send_VALUE(manager, msg$data$tag, value, success, t2 - t1,
- log, gc, sout)
+ log, sout)
}
}, interrupt = function(e) {
NULL
@@ -276,6 +274,10 @@ bploop.iterate <-
running <- logical(workers)
reducer <- .reducer(REDUCE, init, reduce.in.order)
+ progress <- .progress(active=bpprogressbar(BPPARAM), iterate=TRUE)
+ on.exit(progress$term(), TRUE)
+ progress$init()
+
## initial load
for (i in seq_len(workers)) {
value <- ITER()
@@ -293,6 +295,7 @@ bploop.iterate <-
## collect
d <- .recv1(cl, "bpiterate")
+ progress$step()
value <- d$value$value
njob <- d$value$tag
diff --git a/R/ipcmutex.R b/R/ipcmutex.R
new file mode 100644
index 0000000..62d967d
--- /dev/null
+++ b/R/ipcmutex.R
@@ -0,0 +1,43 @@
+## Utilities
+
+ipcid <- function(id) {
+ uuid <- .Call(.ipc_uuid)
+ if (!missing(id))
+ uuid <- paste(as.character(id), uuid, sep="-")
+ uuid
+}
+
+ipcremove <- function(id) {
+ invisible(.Call(.ipc_remove, id))
+}
+
+## Locks
+
+ipclocked <- function(id)
+ .Call(.ipc_locked, id)
+
+ipclock <- function(id) {
+ .Call(.ipc_lock, id)
+}
+
+ipctrylock <- function(id) {
+ .Call(.ipc_try_lock, id)
+}
+
+ipcunlock <- function(id) {
+ .Call(.ipc_unlock, id)
+}
+
+## Counters
+
+ipcyield <- function(id) {
+ .Call(.ipc_yield, id)
+}
+
+ipcvalue <- function(id) {
+ .Call(.ipc_value, id)
+}
+
+ipcreset <- function(id, n = 1) {
+ invisible(.Call(.ipc_reset, id, n))
+}
diff --git a/R/progress.R b/R/progress.R
index 43733f9..529ff5c 100644
--- a/R/progress.R
+++ b/R/progress.R
@@ -2,26 +2,36 @@
### progress bar
### -------------------------------------------------------------------------
-## derived from plyr::progress_text()
-.progress <- function(style = 3, active = TRUE, ...) {
- ntasks <- 0
- txt <- NULL
- max <- 0
+.progress <- function(style = 3, active = TRUE, iterate = FALSE, ...) {
if (active) {
- list(
- init = function(x) {
+ ntasks <- 0L
+ if (iterate) {
+ list(init = function(x) {
+ message("iteration: ", appendLF=FALSE)
+ }, step = function() {
+ ntasks <<- ntasks + 1L
+ erase <- paste(rep("\b", ceiling(log10(ntasks))), collapse="")
+ message(erase, ntasks, appendLF = FALSE)
+ }, term = function() {
+ message() # new line
+ })
+ } else {
+ ## derived from plyr::progress_text()
+ txt <- NULL
+ max <- 0
+ list(init = function(x) {
txt <<- txtProgressBar(max = x, style = style, ...)
setTxtProgressBar(txt, 0)
max <<- x
- },
- step = function() {
- ntasks <<- ntasks + 1
+ }, step = function() {
+ ntasks <<- ntasks + 1L
setTxtProgressBar(txt, ntasks)
if (ntasks == max) cat("\n")
- },
- term = function() close(txt)
- )
+ }, term = function() {
+ close(txt)
+ })
+ }
} else {
list(
init = function(x) NULL,
diff --git a/R/register.R b/R/register.R
index 4a89e21..8de7c63 100644
--- a/R/register.R
+++ b/R/register.R
@@ -18,27 +18,29 @@
invisible(registered())
},
registered = function(bpparamClass) {
- if (length(bpparams) == 0L)
- .registry_init()
if (missing(bpparamClass))
- bpparams
- else bpparams[[bpparamClass]]
+ .self$bpparams
+ else .self$bpparams[[bpparamClass]]
})
)$new() # Singleton
+.register <- .registry$register
+
+.registered <- .registry$registered
+
.registry_init <- function() {
multicore <- (parallel::detectCores() - 2L) > 1L
tryCatch({
if ((.Platform$OS.type == "windows") && multicore) {
- register(getOption("SnowParam", SnowParam()), TRUE)
- register(getOption("SerialParam", SerialParam()), FALSE)
+ .register(getOption("SnowParam", SnowParam()), TRUE)
+ .register(getOption("SerialParam", SerialParam()), FALSE)
} else if (multicore) {
## linux / mac
- register(getOption("MulticoreParam", MulticoreParam()), TRUE)
- register(getOption("SnowParam", SnowParam()), FALSE)
- register(getOption("SerialParam", SerialParam()), FALSE)
+ .register(getOption("MulticoreParam", MulticoreParam()), TRUE)
+ .register(getOption("SnowParam", SnowParam()), FALSE)
+ .register(getOption("SerialParam", SerialParam()), FALSE)
} else {
- register(getOption("SerialParam", SerialParam()), TRUE)
+ .register(getOption("SerialParam", SerialParam()), TRUE)
}
}, error=function(err) {
message("'BiocParallel' did not register default ",
@@ -47,9 +49,17 @@
})
}
-register <- .registry$register
+register <- function(BPPARAM, default = TRUE) {
+ if (length(.registry$bpparams) == 0L)
+ .registry_init()
+ .register(BPPARAM, default = default)
+}
-registered <- .registry$registered
+registered <- function(bpparamClass) {
+ if (length(.registry$bpparams) == 0L)
+ .registry_init()
+ .registered(bpparamClass)
+}
bpparam <- function(bpparamClass) {
if (missing(bpparamClass))
diff --git a/build/BiocParallel.pdf b/build/BiocParallel.pdf
index cb3f200..0aa50ac 100644
Binary files a/build/BiocParallel.pdf and b/build/BiocParallel.pdf differ
diff --git a/build/vignette.rds b/build/vignette.rds
index eae036f..7072b29 100644
Binary files a/build/vignette.rds and b/build/vignette.rds differ
diff --git a/inst/doc/Errors_Logs_And_Debugging.R b/inst/doc/Errors_Logs_And_Debugging.R
index 062e792..6b9b281 100644
--- a/inst/doc/Errors_Logs_And_Debugging.R
+++ b/inst/doc/Errors_Logs_And_Debugging.R
@@ -1,34 +1,34 @@
-## ----style, eval=TRUE, echo=FALSE, results="asis"---------------------------------------
+## ----style, eval=TRUE, echo=FALSE, results="asis"--------------------------
BiocStyle::latex()
-## ----biocLite, eval=FALSE---------------------------------------------------------------
+## ----biocLite, eval=FALSE--------------------------------------------------
# source("http://bioconductor.org/biocLite.R")
# biocLite("BiocParallel")
-## ----load-------------------------------------------------------------------------------
+## ----load------------------------------------------------------------------
library(BiocParallel)
-## ----errors_constructor-----------------------------------------------------------------
+## ----errors_constructor----------------------------------------------------
param <- SnowParam()
param
-## ----errors_stopOnError-----------------------------------------------------------------
+## ----errors_stopOnError----------------------------------------------------
param <- SnowParam(2, stop.on.error = TRUE)
param
bpstopOnError(param) <- FALSE
-## ----errors_6tasksA_stopOnError---------------------------------------------------------
+## ----errors_6tasksA_stopOnError--------------------------------------------
X <- list(1, "2", 3, 4, 5, 6)
param <- SnowParam(3, tasks = length(X), stop.on.error = TRUE)
-## ----errors_6tasksA_stopOnError_output--------------------------------------------------
+## ----errors_6tasksA_stopOnError_output-------------------------------------
res <- tryCatch({
bplapply(X, sqrt, BPPARAM = param)
}, error=identity)
res
attr(res, "result")
-## ----errors_6tasks_nonstopOnError-------------------------------------------------------
+## ----errors_6tasks_nonstopOnError------------------------------------------
X <- list("1", 2, 3, 4, 5, 6)
param <- SnowParam(3, tasks = length(X), stop.on.error = FALSE)
res <- tryCatch({
@@ -37,55 +37,55 @@ res <- tryCatch({
res
attr(res, "result")
-## ----error_bptry------------------------------------------------------------------------
+## ----error_bptry-----------------------------------------------------------
bptry({
bplapply(X, sqrt, BPPARAM=param)
})
-## ----errors_3tasksA_stopOnError---------------------------------------------------------
+## ----errors_3tasksA_stopOnError--------------------------------------------
X <- list(1, 2, "3", 4, 5, 6)
param <- SnowParam(3, stop.on.error = TRUE)
-## ----errors_3tasksA_stopOnError_output--------------------------------------------------
+## ----errors_3tasksA_stopOnError_output-------------------------------------
bptry(bplapply(X, sqrt, BPPARAM = param))
-## ----errors_bpok_bplapply---------------------------------------------------------------
+## ----errors_bpok_bplapply--------------------------------------------------
param <- SnowParam(2, stop.on.error=FALSE)
result <- bptry(bplapply(list(1, "2", 3), sqrt, BPPARAM=param))
-## ----errors_bpok------------------------------------------------------------------------
+## ----errors_bpok-----------------------------------------------------------
bpok(result)
-## ----errors_traceback-------------------------------------------------------------------
+## ----errors_traceback------------------------------------------------------
tail(attr(result[[which(!bpok(result))]], "traceback"))
-## ----redo_error-------------------------------------------------------------------------
+## ----redo_error------------------------------------------------------------
X <- list(1, "2", 3)
param <- SnowParam(2, stop.on.error=FALSE)
result <- bptry(bplapply(X, sqrt, BPPARAM=param))
result
-## ----errors_BPREDO_input----------------------------------------------------------------
+## ----errors_BPREDO_input---------------------------------------------------
X.redo <- list(1, 2, 3)
-## ----redo_run---------------------------------------------------------------------------
+## ----redo_run--------------------------------------------------------------
bplapply(X.redo, sqrt, BPREDO=result, BPPARAM=param)
-## ----logs_constructor-------------------------------------------------------------------
+## ----logs_constructor------------------------------------------------------
param <- SnowParam(stop.on.error=FALSE)
param
-## ----logs_accessors---------------------------------------------------------------------
+## ----logs_accessors--------------------------------------------------------
bplog(param) <- TRUE
bpthreshold(param) <- "TRACE"
param
-## ----logs_bplapply----------------------------------------------------------------------
+## ----logs_bplapply---------------------------------------------------------
tryCatch({
bplapply(list(1, "2", 3), sqrt, BPPARAM = param)
}, error=function(e) invisible(e))
-## ----logs_FUN---------------------------------------------------------------------------
+## ----logs_FUN--------------------------------------------------------------
FUN <- function(i) {
futile.logger::flog.debug(paste("value of 'i':", i))
@@ -100,21 +100,21 @@ FUN <- function(i) {
}
}
-## ----logs_FUN_WARN----------------------------------------------------------------------
+## ----logs_FUN_WARN---------------------------------------------------------
param <- SnowParam(2, log = TRUE, threshold = "WARN", stop.on.error=FALSE)
result <- bplapply(list(1, "2", integer()), FUN, BPPARAM = param)
simplify2array(result)
-## ----logs_FUN_DEBUG---------------------------------------------------------------------
+## ----logs_FUN_DEBUG--------------------------------------------------------
param <- SnowParam(2, log = TRUE, threshold = "DEBUG", stop.on.error=FALSE)
result <- bplapply(list(1, "2", integer()), FUN, BPPARAM = param)
simplify2array(result)
-## ----timeout_constructor----------------------------------------------------------------
+## ----timeout_constructor---------------------------------------------------
param <- SnowParam(timeout = 20, stop.on.error=FALSE)
param
-## ----timeout_setter---------------------------------------------------------------------
+## ----timeout_setter--------------------------------------------------------
param <- SnowParam(timeout = 2, stop.on.error=FALSE)
fun <- function(i) {
Sys.sleep(i)
@@ -122,13 +122,13 @@ fun <- function(i) {
}
bptry(bplapply(1:3, fun, BPPARAM = param))
-## ----debug_sqrtabs----------------------------------------------------------------------
+## ----debug_sqrtabs---------------------------------------------------------
fun1 <- function(x) {
v <- abs(x)
sapply(1:length(v), function(i) sqrt(v[i]))
}
-## ----debug_fun1_debug-------------------------------------------------------------------
+## ----debug_fun1_debug------------------------------------------------------
fun2 <- function(x) {
v <- abs(x)
futile.logger::flog.debug(
@@ -140,23 +140,23 @@ fun2 <- function(x) {
})
}
-## ----debug_param_debug------------------------------------------------------------------
+## ----debug_param_debug-----------------------------------------------------
param <- SnowParam(3, log = TRUE, threshold = "DEBUG")
-## ----debug_DEBUG------------------------------------------------------------------------
+## ----debug_DEBUG-----------------------------------------------------------
res <- bplapply(list(c(1,3), numeric(), 6), fun2, BPPARAM = param)
res
-## ----debug_sqrt-------------------------------------------------------------------------
+## ----debug_sqrt------------------------------------------------------------
res <- bptry({
bplapply(list(1, "2", 3), sqrt,
BPPARAM = SnowParam(3, stop.on.error=FALSE))
})
result
-## ----debug_sqrt_wrap--------------------------------------------------------------------
+## ----debug_sqrt_wrap-------------------------------------------------------
fun3 <- function(i) sqrt(i)
-## ----sessionInfo, results="asis"--------------------------------------------------------
+## ----sessionInfo, results="asis"-------------------------------------------
toLatex(sessionInfo())
diff --git a/inst/doc/Errors_Logs_And_Debugging.pdf b/inst/doc/Errors_Logs_And_Debugging.pdf
index 67eea69..b0d7573 100644
Binary files a/inst/doc/Errors_Logs_And_Debugging.pdf and b/inst/doc/Errors_Logs_And_Debugging.pdf differ
diff --git a/inst/doc/Introduction_To_BiocParallel.R b/inst/doc/Introduction_To_BiocParallel.R
index 84056ce..002ae9f 100644
--- a/inst/doc/Introduction_To_BiocParallel.R
+++ b/inst/doc/Introduction_To_BiocParallel.R
@@ -1,112 +1,129 @@
-## ----style, eval=TRUE, echo=FALSE, results="asis"---------------------------------------
+## ----style, eval=TRUE, echo=FALSE, results="asis"--------------------------
BiocStyle::latex()
-## ----biocLite, eval=FALSE---------------------------------------------------------------
+## ----setup, echo=FALSE-----------------------------------------------------
+suppressPackageStartupMessages({
+ library(BiocParallel)
+ library(BatchJobs)
+ library(VariantAnnotation)
+ library(GenomicAlignments)
+ library(RNAseqData.HNRNPC.bam.chr14)
+ library(TxDb.Hsapiens.UCSC.hg19.knownGene)
+})
+
+## ----biocLite, eval=FALSE--------------------------------------------------
# source("http://bioconductor.org/biocLite.R")
# biocLite("BiocParallel")
-## ----BiocParallel-----------------------------------------------------------------------
+## ----BiocParallel----------------------------------------------------------
library(BiocParallel)
-## ----quickstart_FUN---------------------------------------------------------------------
+## ----quickstart_FUN--------------------------------------------------------
FUN <- function(x) { round(sqrt(x), 4) }
-## ----quickstart_registry----------------------------------------------------------------
+## ----quickstart_registry---------------------------------------------------
registered()
-## ----configure_registry, eval=FALSE-----------------------------------------------------
+## ----configure_registry, eval=FALSE----------------------------------------
# options(MulticoreParam=quote(MulticoreParam(workers=4)))
-## ----quickstart_bplapply_default, eval=FALSE--------------------------------------------
+## ----quickstart_bplapply_default, eval=FALSE-------------------------------
# bplapply(1:4, FUN)
-## ----quickstart_snow--------------------------------------------------------------------
+## ----quickstart_snow-------------------------------------------------------
param <- SnowParam(workers = 2, type = "SOCK")
bplapply(1:4, FUN, BPPARAM = param)
-## ----BiocParallelParam_SerialParam------------------------------------------------------
+## ----BiocParallelParam_SerialParam-----------------------------------------
serialParam <- SerialParam()
serialParam
-## ----BiocParallelParam_MulticoreParam---------------------------------------------------
+## ----BiocParallelParam_MulticoreParam--------------------------------------
multicoreParam <- MulticoreParam(workers = 8)
multicoreParam
-## ----register_registered----------------------------------------------------------------
+## ----register_registered---------------------------------------------------
registered()
-## ----register_bpparam-------------------------------------------------------------------
+## ----register_bpparam------------------------------------------------------
bpparam()
-## ----register_BatchJobsParam------------------------------------------------------------
+## ----register_BatchJobsParam-----------------------------------------------
+default <- registered()
register(BatchJobsParam(workers = 10), default = TRUE)
-## ----register_BatchJobsParam2-----------------------------------------------------------
+## ----register_BatchJobsParam2----------------------------------------------
names(registered())
bpparam()
-## ----error-vignette, eval=FALSE---------------------------------------------------------
+## ----register_restore------------------------------------------------------
+for (param in rev(default))
+ register(param)
+
+## ----error-vignette, eval=FALSE--------------------------------------------
# browseVignettes("BiocParallel")
-## ----use_cases_data---------------------------------------------------------------------
+## ----use_cases_data--------------------------------------------------------
library(RNAseqData.HNRNPC.bam.chr14)
fls <- RNAseqData.HNRNPC.bam.chr14_BAMFILES
-## ----forking_gr, message=FALSE----------------------------------------------------------
+## ----forking_gr, message=FALSE---------------------------------------------
library(GenomicAlignments) ## for GenomicRanges and readGAlignments()
gr <- GRanges("chr14", IRanges((1000:3999)*5000, width=1000))
-## ----forking_param----------------------------------------------------------------------
+## ----forking_param---------------------------------------------------------
param <- ScanBamParam(which=range(gr))
-## ----forking_FUN------------------------------------------------------------------------
+## ----forking_FUN-----------------------------------------------------------
FUN <- function(fl, param) {
gal <- readGAlignments(fl, param = param)
sum(countOverlaps(gr, gal))
}
-## ----forking_default_multicore----------------------------------------------------------
+## ----forking_default_multicore---------------------------------------------
MulticoreParam()
-## ----cluster_FUN------------------------------------------------------------------------
+## ----cluster_FUN-----------------------------------------------------------
FUN <- function(fl, param, gr) {
- library(GenomicAlignments)
+ suppressPackageStartupMessages({
+ library(GenomicAlignments)
+ })
gal <- readGAlignments(fl, param = param)
sum(countOverlaps(gr, gal))
}
-## ----cluster_snow_param-----------------------------------------------------------------
+## ----cluster_snow_param----------------------------------------------------
snow <- SnowParam(workers = 2, type = "SOCK")
-## ----cluster_bplapply-------------------------------------------------------------------
+## ----cluster_bplapply------------------------------------------------------
bplapply(fls[1:3], FUN, BPPARAM = snow, param = param, gr = gr)
-## ----ad_hoc_sock_snow_param-------------------------------------------------------------
+## ----ad_hoc_sock_snow_param------------------------------------------------
hosts <- c("rhino01", "rhino01", "rhino02")
param <- SnowParam(workers = hosts, type = "SOCK")
-## ----cluster-MPI-work, eval=FALSE-------------------------------------------------------
+## ----cluster-MPI-work, eval=FALSE------------------------------------------
# library(BiocParallel)
# library(Rmpi)
# FUN <- function(i) system("hostname", intern=TRUE)
-## ----cluster-MPI, eval=FALSE------------------------------------------------------------
+## ----cluster-MPI, eval=FALSE-----------------------------------------------
# param <- SnowParam(mpi.universe.size() - 1, "MPI")
# register(param)
-## ----cluster-MPI-do, eval=FALSE---------------------------------------------------------
+## ----cluster-MPI-do, eval=FALSE--------------------------------------------
# xx <- bplapply(1:100, FUN)
# table(unlist(xx))
# mpi.quit()
-## ----cluster-MPI-bpstart, eval=FALSE----------------------------------------------------
+## ----cluster-MPI-bpstart, eval=FALSE---------------------------------------
# param <- bpstart(SnowParam(mpi.universe.size() - 1, "MPI"))
# register(param)
# xx <- bplapply(1:100, FUN)
# bpstop(param)
# mpi.quit()
-## ----cluster-BatchJobs, eval=FALSE------------------------------------------------------
+## ----cluster-BatchJobs, eval=FALSE-----------------------------------------
# ## define work to be done
# FUN <- function(i) system("hostname", intern=TRUE)
#
@@ -123,10 +140,10 @@ param <- SnowParam(workers = hosts, type = "SOCK")
# xx <- bplapply(1:100, FUN)
# table(unlist(xx))
-## ----devel-bplapply---------------------------------------------------------------------
+## ----devel-bplapply--------------------------------------------------------
system.time(x <- bplapply(1:3, function(i) { Sys.sleep(i); i }))
unlist(x)
-## ----sessionInfo, results="asis"--------------------------------------------------------
+## ----sessionInfo, results="asis"-------------------------------------------
toLatex(sessionInfo())
diff --git a/inst/doc/Introduction_To_BiocParallel.Rnw b/inst/doc/Introduction_To_BiocParallel.Rnw
index 2f76959..5b6c06b 100644
--- a/inst/doc/Introduction_To_BiocParallel.Rnw
+++ b/inst/doc/Introduction_To_BiocParallel.Rnw
@@ -10,6 +10,17 @@
BiocStyle::latex()
@
+<<setup, echo=FALSE>>=
+suppressPackageStartupMessages({
+ library(BiocParallel)
+ library(BatchJobs)
+ library(VariantAnnotation)
+ library(GenomicAlignments)
+ library(RNAseqData.HNRNPC.bam.chr14)
+ library(TxDb.Hsapiens.UCSC.hg19.knownGene)
+})
+@
+
\newcommand{\BiocParallel}{\Biocpkg{BiocParallel}}
\title{Introduction to \BiocParallel}
@@ -163,9 +174,22 @@ except \Rcode{MulticoreParam} which is Unix and Mac only.
default number of workers equals the value of the global option
\Rcode{mc.cores} (e.g., \Rcode{getOption("mc.cores")}) or, if that
is not set, the number of cores returned by
- \Rcode{parallel::detectCores() - 2}. Based on facilities originally
- implemented in the \CRANpkg{multicore} package and subsequently the
- \CRANpkg{parallel} package in base \R{}.
+ \Rcode{parallel::detectCores() - 2}.
+
+ \Rcode{MulticoreParam} uses 'forked' processes with 'copy-on-change'
+ semantics -- memory is only copied when it is changed. This makes it
+ very efficient to invoke compared to other back-ends. A subtle cost,
+ though, is that \R's garbage collector runs periodically, and
+ 'marks' memory as in use. This effectively triggers a copy of the
+ marked memory. \R's generational garbage collector is triggered at
+ difficult-to-predict times; the effect in a long-running forked
+ process is that the memory is eventually copied. See
+ \href{https://support.bioconductor.org/p/70196/#70509}{this post}
+ for additional details.
+
+ Based on facilities originally implemented in the
+ \CRANpkg{multicore} package and subsequently the \CRANpkg{parallel}
+ package in base \R{}.
\item{\Rcode{SnowParam}: }
@@ -234,6 +258,7 @@ bpparam()
Add a specialized instance with \Rcode{register}. When
\Rcode{default} is TRUE, the new instance becomes the default.
<<register_BatchJobsParam>>=
+default <- registered()
register(BatchJobsParam(workers = 10), default = TRUE)
@
@@ -244,6 +269,13 @@ names(registered())
bpparam()
@
+Restore the original registry
+
+<<register_restore>>=
+for (param in rev(default))
+ register(param)
+@
+
\subsection{Functions}
\subsubsection{Parallel looping, vectorized and aggregate operations}
@@ -321,6 +353,12 @@ Logging":
browseVignettes("BiocParallel")
@
+\subsubsection{Locks and counters}
+
+Inter-process (i.e., single machine) locks and counters are supported
+using \Rcode{ipclock()}, \Rcode{ipcyield()}, and friends. Use these to
+synchronize computation, e.g., allowing only a single process to write
+to a file at a time.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -399,9 +437,13 @@ for use with the \Rpackage{foreach} package.
To re-run the counting example, FUN needs to modified such that `gr` is
passed as a formal argument and required libraries are loaded on each worker.
+(In general, this is not necessary for functions defined in a package name
+space, see Section~\ref{sec:developers}.)
<<cluster_FUN>>=
FUN <- function(fl, param, gr) {
- library(GenomicAlignments)
+ suppressPackageStartupMessages({
+ library(GenomicAlignments)
+ })
gal <- readGAlignments(fl, param = param)
sum(countOverlaps(gr, gal))
}
@@ -885,6 +927,7 @@ genomics problems in \R{}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
\section{For developers}
+\label{sec:developers}
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -920,6 +963,12 @@ Developers wishing to invoke back-ends other than
required packages, data, and functions are available and loaded on the
remote nodes.
+In \Rcode{bplapply()}, the environment of \Rcode{FUN} (other than the
+global environment) is serialized to the workers. A consequence is
+that, when \Rcode{FUN} is inside a package name space, other functions
+available in the name space are available to \Rcode{FUN} on the
+workers.
+
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
\section{\Rcode{sessionInfo()}}
diff --git a/inst/doc/Introduction_To_BiocParallel.pdf b/inst/doc/Introduction_To_BiocParallel.pdf
index e962245..748f296 100644
Binary files a/inst/doc/Introduction_To_BiocParallel.pdf and b/inst/doc/Introduction_To_BiocParallel.pdf differ
diff --git a/inst/unitTests/test_ipcmutex.R b/inst/unitTests/test_ipcmutex.R
new file mode 100644
index 0000000..ed2d2ae
--- /dev/null
+++ b/inst/unitTests/test_ipcmutex.R
@@ -0,0 +1,24 @@
+test_ipclock <- function()
+{
+ id <- ipcid()
+ on.exit(ipcremove(id))
+ result <- bplapply(1:5, function(i, id) {
+ BiocParallel::ipclock(id)
+ Sys.sleep(.1)
+ time <- Sys.time()
+ BiocParallel::ipcunlock(id)
+ time
+ }, id, BPPARAM=SnowParam(2))
+ d <- diff(range(unlist(result, use.names=FALSE)))
+ checkTrue(d > 0.4)
+}
+
+test_ipccounter <- function()
+{
+ id <- ipcid()
+ on.exit(ipcremove(id))
+ result <- bplapply(1:5, function(i, id) {
+ BiocParallel::ipcyield(id)
+ }, id, BPPARAM=SnowParam(2))
+ checkIdentical(sort(unlist(result, use.names=FALSE)), 1:5)
+}
diff --git a/man/ipcmutex.Rd b/man/ipcmutex.Rd
new file mode 100644
index 0000000..76ea5de
--- /dev/null
+++ b/man/ipcmutex.Rd
@@ -0,0 +1,160 @@
+\name{ipcmutex}
+\alias{ipclocked}
+\alias{ipclock}
+\alias{ipctrylock}
+\alias{ipcunlock}
+\alias{ipcid}
+\alias{ipcremove}
+\alias{ipcyield}
+\alias{ipcvalue}
+\alias{ipcreset}
+\title{Inter-process locks and counters}
+
+\description{
+
+ Functions documented on this page enable locks and counters between
+ processes on the \emph{same} computer.
+
+ Use \code{ipcid()} to generate a unique mutex or counter identifier. A
+ mutex or counter with the same \code{id}, including those in different
+ processes, share the same state.
+
+ \code{ipcremove()} removes external state associated with mutex or
+ counters created with \code{id}.
+
+ \code{ipclock()} blocks until the lock is
+ obtained. \code{ipctrylock()} tries to obtain the lock, returning
+ immediately if it is not available. \code{ipcunlock()} releases the
+ lock. \code{ipclocked()} queries the lock to determine whether it is
+ currently held.
+
+ \code{ipcyield()} returns the current counter, and increments the
+ value for subsequent calls. \code{ipcvalue()} returns the current
+ counter without incrementing. \code{ipcreset()} sets the counter to
+ \code{n}, such that the next call to \code{ipcyield()} or
+ \code{ipcvalue()} returns \code{n}.
+
+}
+
+\usage{
+## Utilities
+
+ipcid(id)
+
+ipcremove(id)
+
+## Locks
+
+ipclock(id)
+
+ipctrylock(id)
+
+ipcunlock(id)
+
+ipclocked(id)
+
+## Counters
+
+ipcyield(id)
+
+ipcvalue(id)
+
+ipcreset(id, n = 1)
+
+}
+\arguments{
+
+ \item{id}{character(1) identifier string for mutex or
+ counter. \code{ipcid()} ensures that the identifier is universally
+ unique.}
+
+ \item{n}{integer(1) value from which \code{ipcyield()} will
+ increment.}
+
+}
+\value{
+ Locks:
+
+ \code{ipclock()} creates a named lock, returning \code{TRUE}
+ on success.
+
+ \code{trylock()} returns \code{TRUE} if the lock is
+ obtained, \code{FALSE} otherwise.
+
+ \code{ipcunlock()} returns \code{TRUE} on success,
+ \code{FALSE} (e.g., because there is nothing to unlock)
+ otherwise.
+
+ \code{ipclocked()} returns \code{TRUE} when \code{id} is locked, and
+ \code{FALSE} otherwise.
+
+ Counters:
+
+ \code{ipcyield()} returns an integer(1) value representing the next
+ number in sequence. The first value returned is 1.
+
+ \code{ipcvalue()} returns the value to be returned by the next call to
+ \code{ipcyield()}, without incrementing the counter. If the counter is
+ no longer available, \code{ipcyield()} returns \code{NA}.
+
+ \code{ipcreset()} returns \code{n}, invisibly.
+
+ Utilities:
+
+ \code{ipcid()} returns a character(1) unique identifier, with
+ \code{id} (if not missing) prepended.
+
+ \code{ipcremove()} returns (invisibly) \code{TRUE} if external
+ resources were released or \code{FALSE} if not (e.g., because the
+ resources has already been released).
+
+}
+\examples{
+ipcid()
+
+## Locks
+
+id <- ipcid()
+
+ipclock(id)
+ipctrylock(id)
+ipcunlock(id)
+ipctrylock(id)
+ipclocked(id)
+
+ipcremove(id)
+
+id <- ipcid()
+result <- bplapply(1:5, function(i, id) {
+ BiocParallel::ipclock(id)
+ Sys.sleep(1)
+ time <- Sys.time()
+ BiocParallel::ipcunlock(id)
+ time
+}, id)
+ipcremove(id)
+diff(sort(unlist(result, use.names=FALSE)))
+
+## Counters
+
+id <- ipcid()
+
+ipcyield(id)
+ipcyield(id)
+
+ipcvalue(id)
+ipcyield(id)
+
+ipcreset(id, 10)
+ipcvalue(id)
+ipcyield(id)
+
+ipcremove(id)
+
+id <- ipcid()
+result <- bplapply(1:5, function(i, id) {
+ BiocParallel::ipcyield(id)
+}, id)
+ipcremove(id)
+sort(unlist(result, use.names=FALSE))
+}
diff --git a/src/ipcmutex.cpp b/src/ipcmutex.cpp
new file mode 100644
index 0000000..2fda999
--- /dev/null
+++ b/src/ipcmutex.cpp
@@ -0,0 +1,224 @@
+#include <boost/uuid/uuid_generators.hpp>
+#include <boost/uuid/uuid_io.hpp>
+
+static boost::uuids::random_generator uuid_generator =
+ boost::uuids::random_generator();
+
+std::string uuid_generate()
+{
+ return boost::uuids::to_string(uuid_generator());
+}
+
+#include <boost/interprocess/managed_shared_memory.hpp>
+#include <boost/interprocess/sync/interprocess_mutex.hpp>
+
+using namespace boost::interprocess;
+
+class IpcMutex
+{
+
+protected:
+
+ managed_shared_memory *shm;
+
+private:
+
+ interprocess_mutex *mtx;
+ bool *locked;
+
+public:
+
+ IpcMutex(const char *id) {
+ shm = new managed_shared_memory{open_or_create, id, 1024};
+ mtx = shm->find_or_construct<interprocess_mutex>("mtx")();
+ locked = shm->find_or_construct<bool>("locked")();
+ }
+
+ ~IpcMutex() {
+ delete shm;
+ }
+
+ bool is_locked() {
+ return *locked;
+ }
+
+ bool lock() {
+ mtx->lock();
+ *locked = true;
+ return *locked;
+ }
+
+ bool try_lock() {
+ *locked = mtx->try_lock();
+ return *locked;
+ }
+
+ bool unlock() {
+ mtx->unlock();
+ *locked = false;
+ return *locked;
+ }
+
+};
+
+class IpcCounter : IpcMutex
+{
+
+private:
+
+ int *i;
+
+public:
+
+ IpcCounter(const char *id) : IpcMutex(id) {
+ i = shm->find_or_construct<int>("i")();
+ }
+
+ ~IpcCounter() {}
+
+ int value() {
+ return *i + 1;
+ }
+
+ int reset(int n) {
+ lock();
+ *i = n - 1;
+ unlock();
+ return n;
+ }
+
+ int yield() {
+ int result;
+ lock();
+ result = ++(*i);
+ unlock();
+ return result;
+ }
+
+};
+
+#include <Rinternals.h>
+
+// internal
+
+const char *ipc_id(SEXP id_sexp)
+{
+ bool test =
+ IS_SCALAR(id_sexp, STRSXP) && (R_NaString != STRING_ELT(id_sexp, 0));
+ if (!test)
+ Rf_error("'id' must be character(1) and not NA");
+ return CHAR(STRING_ELT(id_sexp, 0));
+}
+
+int ipc_n(SEXP n_sexp)
+{
+ PROTECT(n_sexp = Rf_coerceVector(n_sexp, INTSXP));
+ bool test = IS_SCALAR(n_sexp, INTSXP) && (R_NaInt != Rf_asInteger(n_sexp));
+ if (!test)
+ Rf_error("'n' cannot be coerced to integer(1) and not NA");
+ int n = INTEGER(n_sexp)[0];
+ UNPROTECT(1);
+ return n;
+}
+
+// utilities
+
+SEXP ipc_remove(SEXP id_sexp) {
+ const char *id = ipc_id(id_sexp);
+ bool status = shared_memory_object::remove(id);
+ return Rf_ScalarLogical(status);
+}
+
+// uuid
+
+SEXP ipc_uuid()
+{
+ std::string uuid = uuid_generate();
+ return Rf_mkString(uuid.c_str());
+}
+
+// mutex
+
+SEXP ipc_locked(SEXP id_sexp)
+{
+ IpcMutex mutex = IpcMutex(ipc_id(id_sexp));
+ bool status = mutex.is_locked();
+ return Rf_ScalarLogical(status);
+}
+
+SEXP ipc_lock(SEXP id_sexp)
+{
+ IpcMutex mutex = IpcMutex(ipc_id(id_sexp));
+ mutex.lock();
+ return Rf_ScalarLogical(true);
+}
+
+SEXP ipc_try_lock(SEXP id_sexp)
+{
+ IpcMutex mutex = IpcMutex(ipc_id(id_sexp));
+ bool status = mutex.try_lock();
+ return Rf_ScalarLogical(status);
+}
+
+SEXP ipc_unlock(SEXP id_sexp)
+{
+ IpcMutex mutex = IpcMutex(ipc_id(id_sexp));
+ bool status = mutex.unlock();
+ return Rf_ScalarLogical(status);
+}
+
+// count
+
+SEXP ipc_value(SEXP id_sexp)
+{
+ IpcCounter cnt = IpcCounter(ipc_id(id_sexp));
+ return Rf_ScalarInteger(cnt.value());
+}
+
+SEXP ipc_reset(SEXP id_sexp, SEXP n_sexp)
+{
+ IpcCounter cnt = IpcCounter(ipc_id(id_sexp));
+ int n = ipc_n(n_sexp);
+ return Rf_ScalarInteger(cnt.reset(n));
+}
+
+SEXP ipc_yield(SEXP id_sexp)
+{
+ IpcCounter cnt = IpcCounter(ipc_id(id_sexp));
+ return Rf_ScalarInteger(cnt.yield());
+}
+
+// expose to R
+
+#include <R_ext/Rdynload.h>
+
+extern "C" {
+
+ static const R_CallMethodDef callMethods[] = {
+ // uuid
+ {".ipc_uuid", (DL_FUNC) & ipc_uuid, 0},
+ // lock
+ {".ipc_lock", (DL_FUNC) & ipc_lock, 1},
+ {".ipc_try_lock", (DL_FUNC) & ipc_try_lock, 1},
+ {".ipc_unlock", (DL_FUNC) & ipc_unlock, 1},
+ {".ipc_locked", (DL_FUNC) & ipc_locked, 1},
+ // counter
+ {".ipc_yield", (DL_FUNC) & ipc_yield, 1},
+ {".ipc_value", (DL_FUNC) & ipc_value, 1},
+ {".ipc_reset", (DL_FUNC) & ipc_reset, 2},
+ // cleanup
+ {".ipc_remove", (DL_FUNC) & ipc_remove, 1},
+ {NULL, NULL, 0}
+ };
+
+ void R_init_BiocParallel(DllInfo *info)
+ {
+ R_registerRoutines(info, NULL, callMethods, NULL, NULL);
+ }
+
+ void R_unload_BiocParallel(DllInfo *info)
+ {
+ (void) info;
+ }
+
+}
diff --git a/vignettes/Introduction_To_BiocParallel.Rnw b/vignettes/Introduction_To_BiocParallel.Rnw
index 2f76959..5b6c06b 100644
--- a/vignettes/Introduction_To_BiocParallel.Rnw
+++ b/vignettes/Introduction_To_BiocParallel.Rnw
@@ -10,6 +10,17 @@
BiocStyle::latex()
@
+<<setup, echo=FALSE>>=
+suppressPackageStartupMessages({
+ library(BiocParallel)
+ library(BatchJobs)
+ library(VariantAnnotation)
+ library(GenomicAlignments)
+ library(RNAseqData.HNRNPC.bam.chr14)
+ library(TxDb.Hsapiens.UCSC.hg19.knownGene)
+})
+@
+
\newcommand{\BiocParallel}{\Biocpkg{BiocParallel}}
\title{Introduction to \BiocParallel}
@@ -163,9 +174,22 @@ except \Rcode{MulticoreParam} which is Unix and Mac only.
default number of workers equals the value of the global option
\Rcode{mc.cores} (e.g., \Rcode{getOption("mc.cores")}) or, if that
is not set, the number of cores returned by
- \Rcode{parallel::detectCores() - 2}. Based on facilities originally
- implemented in the \CRANpkg{multicore} package and subsequently the
- \CRANpkg{parallel} package in base \R{}.
+ \Rcode{parallel::detectCores() - 2}.
+
+ \Rcode{MulticoreParam} uses 'forked' processes with 'copy-on-change'
+ semantics -- memory is only copied when it is changed. This makes it
+ very efficient to invoke compared to other back-ends. A subtle cost,
+ though, is that \R's garbage collector runs periodically, and
+ 'marks' memory as in use. This effectively triggers a copy of the
+ marked memory. \R's generational garbage collector is triggered at
+ difficult-to-predict times; the effect in a long-running forked
+ process is that the memory is eventually copied. See
+ \href{https://support.bioconductor.org/p/70196/#70509}{this post}
+ for additional details.
+
+ Based on facilities originally implemented in the
+ \CRANpkg{multicore} package and subsequently the \CRANpkg{parallel}
+ package in base \R{}.
\item{\Rcode{SnowParam}: }
@@ -234,6 +258,7 @@ bpparam()
Add a specialized instance with \Rcode{register}. When
\Rcode{default} is TRUE, the new instance becomes the default.
<<register_BatchJobsParam>>=
+default <- registered()
register(BatchJobsParam(workers = 10), default = TRUE)
@
@@ -244,6 +269,13 @@ names(registered())
bpparam()
@
+Restore the original registry
+
+<<register_restore>>=
+for (param in rev(default))
+ register(param)
+@
+
\subsection{Functions}
\subsubsection{Parallel looping, vectorized and aggregate operations}
@@ -321,6 +353,12 @@ Logging":
browseVignettes("BiocParallel")
@
+\subsubsection{Locks and counters}
+
+Inter-process (i.e., single machine) locks and counters are supported
+using \Rcode{ipclock()}, \Rcode{ipcyield()}, and friends. Use these to
+synchronize computation, e.g., allowing only a single process to write
+to a file at a time.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -399,9 +437,13 @@ for use with the \Rpackage{foreach} package.
To re-run the counting example, FUN needs to modified such that `gr` is
passed as a formal argument and required libraries are loaded on each worker.
+(In general, this is not necessary for functions defined in a package name
+space, see Section~\ref{sec:developers}.)
<<cluster_FUN>>=
FUN <- function(fl, param, gr) {
- library(GenomicAlignments)
+ suppressPackageStartupMessages({
+ library(GenomicAlignments)
+ })
gal <- readGAlignments(fl, param = param)
sum(countOverlaps(gr, gal))
}
@@ -885,6 +927,7 @@ genomics problems in \R{}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
\section{For developers}
+\label{sec:developers}
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -920,6 +963,12 @@ Developers wishing to invoke back-ends other than
required packages, data, and functions are available and loaded on the
remote nodes.
+In \Rcode{bplapply()}, the environment of \Rcode{FUN} (other than the
+global environment) is serialized to the workers. A consequence is
+that, when \Rcode{FUN} is inside a package name space, other functions
+available in the name space are available to \Rcode{FUN} on the
+workers.
+
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
\section{\Rcode{sessionInfo()}}
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/r-bioc-biocparallel.git
More information about the debian-med-commit
mailing list