Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallelization in filterAndTrim for Window OS #1812

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Update filter.R
Added parallelization in filterAndTrim for Window OS via PSOCK sockets.
  • Loading branch information
cylyau committed Sep 8, 2023
commit ebe8296dc087716ed125c85e8a9d10bfec1d0d5d
67 changes: 47 additions & 20 deletions R/filter.R
Original file line number Diff line number Diff line change
Expand Up @@ -444,37 444,64 @@ filterAndTrim <- function(fwd, filt, rev=NULL, filt.rev=NULL, compress=TRUE,
if(any(c(filt,filt.rev) %in% c(fwd, rev))) stop("Output files must be distinct from the input files.")
}
# Parse multithreading
if(multithread && .Platform$OS.type=="unix") {
if(multithread){
OMP <- FALSE
ncores <- detectCores()
if(is.numeric(multithread)) ncores <- multithread
if(is.na(ncores)) ncores <- 1
if(ncores>1) verbose <- FALSE
} else {
}else{
ncores <- 1
if (multithread && .Platform$OS.type=="windows") {
message("Multithreading has been DISABLED, as forking is not supported on .Platform$OS.type 'windows'")
}
}

# Filter and Trim
# Switch between using mcmapply or clusterMap depending on availability of forking
if(PAIRED) {
rval <- mcmapply(fastqPairedFilter,
mapply(c, fwd, rev, SIMPLIFY=FALSE), mapply(c, filt, filt.rev, SIMPLIFY=FALSE),
MoreArgs = list(truncQ=truncQ, truncLen=truncLen, trimLeft=trimLeft, trimRight=trimRight,
maxLen=maxLen, minLen=minLen, maxN=maxN, minQ=minQ, maxEE=maxEE,
rm.phix=rm.phix, rm.lowcomplex=rm.lowcomplex, orient.fwd=orient.fwd,
matchIDs=matchIDs, id.sep=id.sep, id.field=id.field, n=n, OMP=OMP,
qualityType=qualityType, compress=compress, verbose=verbose),
mc.cores=ncores, mc.silent=TRUE)
if(!multithread | .Platform$OS.type == "unix"){
rval <- mcmapply(fastqPairedFilter,
mapply(c, fwd, rev, SIMPLIFY=FALSE), mapply(c, filt, filt.rev, SIMPLIFY=FALSE),
MoreArgs = list(truncQ=truncQ, truncLen=truncLen, trimLeft=trimLeft, trimRight=trimRight,
maxLen=maxLen, minLen=minLen, maxN=maxN, minQ=minQ, maxEE=maxEE,
rm.phix=rm.phix, rm.lowcomplex=rm.lowcomplex, orient.fwd=orient.fwd,
matchIDs=matchIDs, id.sep=id.sep, id.field=id.field, n=n, OMP=OMP,
qualityType=qualityType, compress=compress, verbose=verbose),
mc.cores=ncores, mc.silent=TRUE)
}else if(.Platform$OS.type == "windows"){
cl <- makeCluster(ncores, type = "PSOCK")
rval <- clusterMap(cl = cl,
fun = fastqPairedFilter,
mapply(c, fwd, rev, SIMPLIFY=FALSE), mapply(c, filt, filt.rev, SIMPLIFY=FALSE),
MoreArgs = list(truncQ=truncQ, truncLen=truncLen, trimLeft=trimLeft, trimRight=trimRight,
maxLen=maxLen, minLen=minLen, maxN=maxN, minQ=minQ, maxEE=maxEE,
rm.phix=rm.phix, rm.lowcomplex=rm.lowcomplex, orient.fwd=orient.fwd,
matchIDs=matchIDs, id.sep=id.sep, id.field=id.field, n=n, OMP=OMP,
qualityType=qualityType, compress=compress, verbose=verbose),
.scheduling = "dynamic")
stopCluster(cl)
}
} else {
rval <- mcmapply(fastqFilter,
fwd, filt,
MoreArgs = list(truncQ=truncQ, truncLen=truncLen, trimLeft=trimLeft, trimRight=trimRight,
maxLen=maxLen, minLen=minLen, maxN=maxN, minQ=minQ, maxEE=maxEE,
rm.phix=rm.phix, rm.lowcomplex=rm.lowcomplex, orient.fwd=orient.fwd,
n=n, OMP=OMP, qualityType=qualityType, compress=compress, verbose=verbose),
mc.cores=ncores, mc.silent=TRUE)
if(!multithread | .Platform$OS.type == "unix"){
rval <- mcmapply(fastqFilter,
fwd, filt,
MoreArgs = list(truncQ=truncQ, truncLen=truncLen, trimLeft=trimLeft, trimRight=trimRight,
maxLen=maxLen, minLen=minLen, maxN=maxN, minQ=minQ, maxEE=maxEE,
rm.phix=rm.phix, rm.lowcomplex=rm.lowcomplex, orient.fwd=orient.fwd,
n=n, OMP=OMP, qualityType=qualityType, compress=compress, verbose=verbose),
mc.cores=ncores, mc.silent=TRUE)
}else if(.Platform$OS.type == "windows"){
cl <- makeCluster(ncores, type = "PSOCK")
rval <- clusterMap(cl = cl,
fun = fastqFilter,
fwd, filt,
MoreArgs = list(truncQ=truncQ, truncLen=truncLen, trimLeft=trimLeft, trimRight=trimRight,
maxLen=maxLen, minLen=minLen, maxN=maxN, minQ=minQ, maxEE=maxEE,
rm.phix=rm.phix, rm.lowcomplex=rm.lowcomplex, orient.fwd=orient.fwd,
n=n, OMP=OMP, qualityType=qualityType, compress=compress, verbose=verbose),
mc.cores=ncores, mc.silent=TRUE)
stopCluster(cl)
}
}

# Check if expected matrix was returned, if not there are errors
if(!is(rval, "matrix")) {
if(is(rval, "list")) { # Mix of errors and not
Expand Down