Title: | Simulate with 'mrgsolve' in Parallel |
---|---|
Description: | Simulation from an 'mrgsolve' <https://cran.r-project.org/package=mrgsolve> model using a parallel backend. Input data sets are split (chunked) and simulated in parallel using mclapply() or future_lapply() <https://cran.r-project.org/package=future.apply>. |
Authors: | Kyle Baron |
Maintainer: | Kyle Baron <[email protected]> |
License: | GPL (>=2) |
Version: | 0.2.0.9002 |
Built: | 2024-11-05 04:43:18 UTC |
Source: | https://github.com/kylebaron/mrgsim.parallel |
Multicore lapply in the background
bg_mclapply(X, FUN, mc.cores = 1, ..., .wait = TRUE, .seed = NULL)
bg_mclapply(X, FUN, mc.cores = 1, ..., .wait = TRUE, .seed = NULL)
X |
A list. |
FUN |
The function to be applied to each element of |
mc.cores |
Passed to |
... |
Arguments passed to |
.wait |
If |
.seed |
A |
A list of output data.
ans <- bg_mclapply(seq(10), sqrt, mc.cores = 2)
ans <- bg_mclapply(seq(10), sqrt, mc.cores = 2)
This function uses callr::r_bg()
to simulate a dataset in the background,
optionally in parallel and optionally saving the results directly to
disk in fst
, arrow
or rds
format. Parallelization can be mediated
by the parallel
package on unix or macos or future
on any os.
bg_mrgsim_d( mod, data, nchunk = 1, ..., .locker = NULL, .tag = NULL, .format = c("fst", "feather", "parquet", "rds"), .wait = TRUE, .seed = FALSE, .cores = 1, .plan = NULL )
bg_mrgsim_d( mod, data, nchunk = 1, ..., .locker = NULL, .tag = NULL, .format = c("fst", "feather", "parquet", "rds"), .wait = TRUE, .seed = FALSE, .cores = 1, .plan = NULL )
mod |
A model object. |
data |
Data set to simulate; see |
nchunk |
Number of chunks in which to split the data set; chunking will
be based on the |
... |
Arguments passed to |
.locker |
A directory for saving simulated data; use this to collect results from several different runs in a single folder. |
.tag |
A name to use for the current run; results are saved under
|
.format |
The output format for saving simulations; using format
|
.wait |
If |
.seed |
A |
.cores |
The number of cores to parallelize across; pass 1 to run the simulation sequentially. |
.plan |
The name of a |
bg_mrgsim_d()
returns a processx::process object (follow that link to
see a list of methods). You will have to call process$get_result()
to
retrieve the result. When an output .locker
is not specified, simulated
data are returned; when an output .locker
is specified, the path to
the fst
file on disk is returned. The fst
files should be read with
fst::read_fst()
. When the results are not saved to .locker
, you will
get a single data frame when nchunk
is 1 or a list of data frames when
nchunk
is greater than 1. It is safest to call dplyr::bind_rows()
or
something equivalent on the result if you are expecting data frame.
An r_process
object; see callr::r_bg()
. Call process$get_resuilt()
to
get the actual result (see details
). If a .locker
path is supplied,
the simulated data is saved to disk and a list of file names is returned.
future_mrgsim_d()
, internalize_fst()
, list_fst()
,
head_fst()
, setup_locker()
mod <- mrgsolve::house(delta = 24, end = 168) data <- mrgsolve::expand.ev( amt = c(100, 300, 450), ID = 1:100, ii = 24, addl = 6 ) data <- dplyr::mutate(data, dose = amt) process <- bg_mrgsim_d( mod, data, carry_out = "dose", outvars = "CP", .wait = TRUE ) process$get_result() ds <- file.path(tempdir(), "sims") files <- bg_mrgsim_d( mod, data, carry_out = "dose", .wait = TRUE, .locker = ds, .format = "fst" ) files sims <- internalize_fst(ds) head(sims)
mod <- mrgsolve::house(delta = 24, end = 168) data <- mrgsolve::expand.ev( amt = c(100, 300, 450), ID = 1:100, ii = 24, addl = 6 ) data <- dplyr::mutate(data, dose = amt) process <- bg_mrgsim_d( mod, data, carry_out = "dose", outvars = "CP", .wait = TRUE ) process$get_result() ds <- file.path(tempdir(), "sims") files <- bg_mrgsim_d( mod, data, carry_out = "dose", .wait = TRUE, .locker = ds, .format = "fst" ) files sims <- internalize_fst(ds) head(sims)
Use chunk_by_id to split up a data set by the ID
column; use
chunk_by_row split a data set by rows.
chunk_by_id(data, nchunk, id_col = "ID", mark = NULL) chunk_by_cols(data, nchunk, cols, mark = NULL) chunk_by_row(data, nchunk, mark = NULL)
chunk_by_id(data, nchunk, id_col = "ID", mark = NULL) chunk_by_cols(data, nchunk, cols, mark = NULL) chunk_by_row(data, nchunk, mark = NULL)
data |
A data frame. |
nchunk |
The number of chunks. |
id_col |
Character name specifying the column containing the |
mark |
When populated as a character label, adds a column to the chunked data frames with that name and with value the integer group number. |
cols |
A character vector of columns to use for deriving |
A list of data frames.
x <- expand.grid(ID = 1:10, B = rev(1:10)) chunk_by_id(x, nchunk = 3) chunk_by_row(x, nchunk = 4)
x <- expand.grid(ID = 1:10, B = rev(1:10)) chunk_by_id(x, nchunk = 3) chunk_by_row(x, nchunk = 4)
Add or update the file extension for items in a file_stream
object.
If a file extension exists, it is removed first.
ext_stream(x, ext)
ext_stream(x, ext)
x |
A |
ext |
The new extension. |
format_stream()
, locate_stream()
, new_stream()
, file_stream()
,
file_set()
x <- new_stream(3) x <- ext_stream(x, "feather") x[[1]]$file
x <- new_stream(3) x <- ext_stream(x, "feather") x[[1]]$file
File names have a numbered core that communicates the current file number
as well as the total number of files in the set. For example, 02-20
would
indicate the second file in a set of 20. Other customizations can be added.
file_set(n, where = NULL, prefix = NULL, pad = TRUE, sep = "-", ext = "")
file_set(n, where = NULL, prefix = NULL, pad = TRUE, sep = "-", ext = "")
n |
The number of file names to create. |
where |
An optional output file path. |
prefix |
A character prefix for the file name. |
pad |
If |
sep |
Separator character. |
ext |
A file extension, including the dot. |
By default a list length n
of lists length 2; each sublist contains the
integer file number as i
and the file name as file
.
x <- file_set(3, where = "foo/bar") length(x) x[2] x <- file_set(25, ext = ".feather") x[17]
x <- file_set(3, where = "foo/bar") length(x) x[2] x <- file_set(25, ext = ".feather") x[17]
Optionally, setup a locker storage space on disk with a specific file
format (e.g. fst
or feather
).
file_stream(n, locker = NULL, format = NULL, where = NULL, ...)
file_stream(n, locker = NULL, format = NULL, where = NULL, ...)
n |
The number of file names to generate; must be a single numeric value greater than or equal to 1. |
locker |
Passed to |
format |
Passed to |
where |
An optional file path; this is replaced by |
... |
Additional arguments passed to |
Pass locker
to set up locker space for saving outputs; this involves
clearing the locker
directory (see setup_locker()
for details). Passing
locker
also sets the path for output files. If you want to set up the path
for output files without setting up locker
space, pass where
.
format_stream()
, locate_stream()
, ext_stream()
, new_stream()
,
file_set()
x <- file_stream(3, locker = temp_ds("foo"), format = "fst") x[[1]]
x <- file_stream(3, locker = temp_ds("foo"), format = "fst") x[[1]]
This can be used to check if a file set item has been assigned an output
format (e.g. fst
, feather
, parquet
, qs
or rds
). If the check returns
FALSE
it would signal that data should be returned rather than calling
write_stream()
.
format_is_set(x) is.stream_format(x)
format_is_set(x) is.stream_format(x)
x |
An object, usually a |
Logical indicating if x
inherits from one of the stream format classes. .
The format is set on the file objects inside the list so that the file
object can be used to call a write method. See write_stream()
.
format_stream( x, type = c("fst", "feather", "parquet", "qs", "rds"), set_ext = TRUE, warn = FALSE )
format_stream( x, type = c("fst", "feather", "parquet", "qs", "rds"), set_ext = TRUE, warn = FALSE )
x |
A |
type |
The file format type; if |
set_ext |
If |
warn |
If |
x
is returned with a new class attribute reflecting the expected output
format (fst
, feather
(arrow), parquet
(arrow), qs
or rds
).
format_is_set()
, locate_stream()
, ext_stream()
,
new_stream()
, file_stream()
, file_set()
fs <- new_stream(2) fs <- format_stream(fs, "fst") fs[[1]] format_is_set(fs[[1]])
fs <- new_stream(2) fs <- format_stream(fs, "fst") fs[[1]] format_is_set(fs[[1]])
Get the head of an fst file set
head_fst(path, n = 5, i = 1)
head_fst(path, n = 5, i = 1)
path |
The directory to search. |
n |
Number of rows to show. |
i |
Which output output chunk to show. |
Get the contents of an fst file set
internalize_fst(path, .as_list = FALSE, ...) get_fst(path, .as_list = FALSE, ...)
internalize_fst(path, .as_list = FALSE, ...) get_fst(path, .as_list = FALSE, ...)
path |
The directory to search. |
.as_list |
Should the results be returned as a list ( |
... |
Not used. |
Check if a directory is dedicated locker space
is_locker_dir(where)
is_locker_dir(where)
where |
The locker location. |
Check if an object is a file_set_item
is.file_set_item(x)
is.file_set_item(x)
x |
An object. |
Logical value indicating if x
has the file_set_item
attribute set..
x <- new_stream(2) is.file_set_item(x[[2]])
x <- new_stream(2) is.file_set_item(x[[2]])
Check if an object inherits from file_stream
is.file_stream(x)
is.file_stream(x)
x |
An object. |
Logical value indicating if x
inherits from file_stream
.
x <- new_stream(2) is.file_stream(x)
x <- new_stream(2) is.file_stream(x)
Check if an object inherits from locker_stream
is.locker_stream(x)
is.locker_stream(x)
x |
An object. |
Logical value indicating if x
inherits from locker_stream
.
x <- new_stream(2, locker = temp_ds("locker-stream-example")) is.locker_stream(x)
x <- new_stream(2, locker = temp_ds("locker-stream-example")) is.locker_stream(x)
Use the function to read all of the .fst
files that were saved when
bg_mrgsim_d
was called and .path
was passed along with .format = "fst"
.
list_fst(path)
list_fst(path)
path |
The (full) directory path to search. |
Add or update the directory location for items in a file_stream
object.
If a directory path already exists, it is removed first.
locate_stream(x, where, initialize = FALSE)
locate_stream(x, where, initialize = FALSE)
x |
A |
where |
The new location. |
initialize |
If |
When initialize
is set to TRUE
, the locker space is initialized or
reset. In order to initialize, where
must not exist or it must have been
previously set up as locker space. See setup_locker()
for details.
format_stream()
, ext_stream()
, new_stream()
, file_stream()
,
file_set()
x <- new_stream(5) x <- locate_stream(x, file.path(tempdir(), "foo")) x[[1]]$file
x <- new_stream(5) x <- locate_stream(x, file.path(tempdir(), "foo")) x[[1]]$file
Use this function when running mrgsolve while parallelizing on a multisession worker node where the model dll might not be loaded.
mrgsim_ms(mod, ...) mrgsim_worker(mod, ...)
mrgsim_ms(mod, ...) mrgsim_worker(mod, ...)
mod |
a model object |
... |
passed to |
mrgsim_worker(mrgsolve:::house())
mrgsim_worker(mrgsolve:::house())
Simulate with 'mrgsolve' in Parallel
mrgsim.parallel.mc.able
: if TRUE
, multicore will be used if appropriate.
By stream we mean a list that pre-specifies the output file names,
replicate numbers and possibly input objects for a simulation. Passing
locker
initiates a call to setup_locker()
, which sets up or resets
the output directories.
For the data.frame
method, the data are chunked into a list by columns
listed in cols
. Ideally, this is a singlel column that operates as
a unique ID
across the data set and is used by chunk_by_id()
to
form the chunks. Alternatively, cols
can be multiple column names which
are pasted together to form a unique ID
that is used for splitting
via chunk_by_cols()
.
new_stream(x, ...) ## S3 method for class 'list' new_stream(x, locker = NULL, format = NULL, ...) ## S3 method for class 'data.frame' new_stream(x, nchunk, cols = "ID", locker = NULL, format = NULL, ...) ## S3 method for class 'numeric' new_stream(x, ...) ## S3 method for class 'character' new_stream(x, ...)
new_stream(x, ...) ## S3 method for class 'list' new_stream(x, locker = NULL, format = NULL, ...) ## S3 method for class 'data.frame' new_stream(x, nchunk, cols = "ID", locker = NULL, format = NULL, ...) ## S3 method for class 'numeric' new_stream(x, ...) ## S3 method for class 'character' new_stream(x, ...)
x |
A list or vector to template the stream; for the |
... |
Additional arguments passed to |
locker |
Passed to |
format |
Passed to |
nchunk |
The number of chunks. |
cols |
The name(s) of the column(s) specifying unique IDs to use to
split the |
A list with the following elements:
i
the position number
file
the output file name
x
the input object.
The list has class file_stream
as well as locker_stream
(if locker
was
passed) and a class attribute for the output if format
was passed.
format_stream()
, locate_stream()
, ext_stream()
, file_stream()
,
file_set()
x <- new_stream(3) x[[1]] new_stream(2, locker = file.path(tempdir(), "foo")) df <- data.frame(ID = c(1,2,3,4)) x <- new_stream(df, nchunk = 2) x[[2]] format_is_set(x[[2]]) x <- new_stream(3, format = "fst") format_is_set(x[[2]])
x <- new_stream(3) x[[1]] new_stream(2, locker = file.path(tempdir(), "foo")) df <- data.frame(ID = c(1,2,3,4)) x <- new_stream(df, nchunk = 2) x[[2]] format_is_set(x[[2]]) x <- new_stream(3, format = "fst") format_is_set(x[[2]])
This function removes the the hidden locker file which designates a directory
as a locker. Once the locker is modified this way, it cannot be reset again
by calling setup_locker()
or new_stream()
.
noreset_locker(where)
noreset_locker(where)
where |
The locker location. |
A logical value indicating if write ability was successfully revoked.
setup_locker()
, reset_locker()
, version_locker()
Use future_mrgsim_d()
to simulate with the future
package. Use
mc_mrgsim_d()
to simulate with parallel::mclapply
.
future_mrgsim_d( mod, data, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = TRUE, .parallel = TRUE ) mc_mrgsim_d( mod, data, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = NULL, .parallel = TRUE ) fu_mrgsim_d( mod, data, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = TRUE, .parallel = TRUE ) fu_mrgsim_d0(..., .dry = TRUE)
future_mrgsim_d( mod, data, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = TRUE, .parallel = TRUE ) mc_mrgsim_d( mod, data, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = NULL, .parallel = TRUE ) fu_mrgsim_d( mod, data, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = TRUE, .parallel = TRUE ) fu_mrgsim_d0(..., .dry = TRUE)
mod |
The mrgsolve model object see mrgsolve::mrgmod. |
data |
Data set to simulate; see |
nchunk |
Number of chunks in which to split the data set; chunking will
be based on the |
... |
Passed to |
.as_list |
If |
.p |
Post processing function executed on the worker; arguments should be (1) the simulated output (2) the model object. |
.dry |
If |
.seed |
Passed to |
.parallel |
if |
A data frame or list of simulated data.
mod <- mrgsolve::house() data <- mrgsolve::expand.ev(amt = seq(10)) out <- future_mrgsim_d(mod, data, nchunk = 2)
mod <- mrgsolve::house() data <- mrgsolve::expand.ev(amt = seq(10)) out <- future_mrgsim_d(mod, data, nchunk = 2)
Use future_mrgsim_ei to simulate with the future
package. Use
mc_mrgsim_ei to simulate with parallel::mclapply
.
future_mrgsim_ei( mod, events, idata, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = TRUE, .parallel = TRUE ) fu_mrgsim_ei( mod, events, idata, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = TRUE, .parallel = TRUE ) fu_mrgsim_ei0(..., .dry = TRUE) mc_mrgsim_ei( mod, events, idata, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = NULL, .parallel = TRUE )
future_mrgsim_ei( mod, events, idata, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = TRUE, .parallel = TRUE ) fu_mrgsim_ei( mod, events, idata, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = TRUE, .parallel = TRUE ) fu_mrgsim_ei0(..., .dry = TRUE) mc_mrgsim_ei( mod, events, idata, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = NULL, .parallel = TRUE )
mod |
The mrgsolve model object see mrgsolve::mrgmod. |
events |
An event object from mrgsolve; see |
idata |
An idata set of parameters, one per simulation unit (individual);
see |
nchunk |
Number of chunks in which to split the data set; chunking will
be based on the |
... |
Passed to |
.as_list |
If |
.p |
Post processing function executed on the worker; arguments should be (1) the simulated output (2) the model object. |
.dry |
If |
.seed |
Passed to |
.parallel |
if |
A data frame or list of simulated data.
mod <- mrgsolve::house() events <- mrgsolve::ev(amt = 100) idata <- data.frame(CL = runif(10, 0.5, 1.5)) out <- future_mrgsim_ei(mod, events, idata)
mod <- mrgsolve::house() events <- mrgsolve::ev(amt = 100) idata <- data.frame(CL = runif(10, 0.5, 1.5)) out <- future_mrgsim_ei(mod, events, idata)
This function is called by setup_locker()
to initialize and
re-initialize a locker directory. We call it reset_locker
because it is
expected that the locker space is created once and then repeatedly
reset and simulations are run and re-run.
reset_locker(where, pattern = NULL)
reset_locker(where, pattern = NULL)
where |
The full path to the locker. |
pattern |
A regular expression for finding files to clear from the locker directory. |
For the locker space to be initialized, the where
directory must not
exist; if it exists, there will be an error. It is also an error for
where
to exist and not contain a particular hidden locker file name
that marks the directory as established locker space.
NOTE: when the locker is reset, all contents are cleared according
to the files matched by pattern
. If any un-matched files exist after
clearing the directory, a warning will be issued.
setup_locker()
, noreset_locker()
, version_locker()
A locker is a directory structure where an enclosing folder contains subfolders that in turn contain the results of different simulation runs. When the number of simulation result sets is known, a stream of file names is returned. This function is mainly called by other functions; an exported function and documentation is provided in order to better communicate how the locker works.
setup_locker(where, tag = locker_tag(where))
setup_locker(where, tag = locker_tag(where))
where |
The directory that contains tagged directories of run results. |
tag |
The name of a folder under |
where
must exist when setting up the locker. The directory tag
will be
created under where
and must not exist except if it had previously been
set up using setup_locker
. Existing tag
directories will have a
hidden file in them indicating that they are established simulation output
folders.
When recreating the tag
directory, it will be unlinked and created new.
To not try to set up a locker directory that already contains outputs that
need to be preserved. You can call noreset_locker()
on that directory
to prevent future resets.
The locker location.
reset_locker()
, noreset_locker()
, version_locker()
x <- setup_locker(tempdir(), tag = "my-sims") x
x <- setup_locker(tempdir(), tag = "my-sims") x
Create a path to a dataset in tempdir
temp_ds(tag)
temp_ds(tag)
tag |
The dataset subdirectory. |
Version locker contents
version_locker(where, version = "save", overwrite = FALSE, noreset = FALSE)
version_locker(where, version = "save", overwrite = FALSE, noreset = FALSE)
where |
The locker location. |
version |
A tag to be appended to |
overwrite |
If |
noreset |
If |
A logical value indicating whether or not all files were successfully copied to the backup, invisibly.
reset_locker()
, noreset_locker()
, setup_locker()
locker <- file.path(tempdir(), "version-locker-example") if(dir.exists(locker)) unlink(locker, recursive = TRUE) x <- new_stream(1, locker = locker) cat("test", file = file.path(locker, "1-1")) dir.exists(locker) list.files(locker, all.files = TRUE) y <- version_locker(locker, version = "y") y list.files(y, all.files = TRUE)
locker <- file.path(tempdir(), "version-locker-example") if(dir.exists(locker)) unlink(locker, recursive = TRUE) x <- new_stream(1, locker = locker) cat("test", file = file.path(locker, "1-1")) dir.exists(locker) list.files(locker, all.files = TRUE) y <- version_locker(locker, version = "y") y list.files(y, all.files = TRUE)
This function will write out objects that have been assigned a format
with either format_stream()
or the format
argument to new_stream()
.
See examples.
write_stream(x, ...) ## Default S3 method: write_stream(x, data, ...) ## S3 method for class 'stream_format_fst' write_stream(x, data, dir = NULL, ...) ## S3 method for class 'stream_format_feather' write_stream(x, data, dir = NULL, ...) ## S3 method for class 'stream_format_parquet' write_stream(x, data, dir = NULL, ...) ## S3 method for class 'stream_format_qs' write_stream(x, data, dir = NULL, ...) ## S3 method for class 'stream_format_rds' write_stream(x, data, dir = NULL, ...)
write_stream(x, ...) ## Default S3 method: write_stream(x, data, ...) ## S3 method for class 'stream_format_fst' write_stream(x, data, dir = NULL, ...) ## S3 method for class 'stream_format_feather' write_stream(x, data, dir = NULL, ...) ## S3 method for class 'stream_format_parquet' write_stream(x, data, dir = NULL, ...) ## S3 method for class 'stream_format_qs' write_stream(x, data, dir = NULL, ...) ## S3 method for class 'stream_format_rds' write_stream(x, data, dir = NULL, ...)
x |
A |
... |
Not used. |
data |
An object to write. |
dir |
An optional directory location to be used if not already in
the |
The default method always returns FALSE
; other methods which get invoked
if a format
was set will return TRUE
. So, the user can always call
write_stream()
and check the return value: if TRUE
, the file was written
to disk and the data to not need to be returned; a FALSE
return value
indicates that no format was set and the data should be returned.
Note the write methods can be invoked directly for a specific format
if no format
was set (see examples).
A logical value indicating if the output was written or not.
format_stream()
, ext_stream()
, locate_stream()
, new_stream()
,
file_stream()
ds <- temp_ds("example") fs <- new_stream(2, locker = ds, format = "fst") data <- data.frame(x = rnorm(10)) x <- lapply(fs, write_stream, data = data) list.files(ds) reset_locker(ds) fs <- format_stream(fs, "rds") x <- lapply(fs, write_stream, data = data) list.files(ds)
ds <- temp_ds("example") fs <- new_stream(2, locker = ds, format = "fst") data <- data.frame(x = rnorm(10)) x <- lapply(fs, write_stream, data = data) list.files(ds) reset_locker(ds) fs <- format_stream(fs, "rds") x <- lapply(fs, write_stream, data = data) list.files(ds)