| Title: | Simulate with 'mrgsolve' in Parallel |
|---|---|
| Description: | Simulation from an 'mrgsolve' <https://cran.r-project.org/package=mrgsolve> model using a parallel backend. Input data sets are split (chunked) and simulated in parallel using mclapply() or future_lapply() <https://cran.r-project.org/package=future.apply>. |
| Authors: | Kyle Baron [aut, cre] |
| Maintainer: | Kyle Baron <[email protected]> |
| License: | GPL (>=2) |
| Version: | 0.3.0.9000 |
| Built: | 2026-05-08 07:11:46 UTC |
| Source: | https://github.com/kylebaron/mrgsim.parallel |
Multicore lapply in the background
bg_mclapply(X, FUN, mc.cores = 1, ..., .wait = TRUE, .seed = NULL)bg_mclapply(X, FUN, mc.cores = 1, ..., .wait = TRUE, .seed = NULL)
X |
A list. |
FUN |
The function to be applied to each element of |
mc.cores |
Passed to |
... |
Arguments passed to |
.wait |
If |
.seed |
A |
A list of output data.
ans <- bg_mclapply(seq(10), sqrt, mc.cores = 2)ans <- bg_mclapply(seq(10), sqrt, mc.cores = 2)
This function uses callr::r_bg() to simulate a dataset in the background,
optionally in parallel and optionally saving the results directly to
disk in fst, arrow or rds format. Parallelization can be mediated
by the parallel package on unix or macos or future on any os.
bg_mrgsim_d( mod, data, nchunk = 1, ..., .locker = NULL, .tag = NULL, .format = c("fst", "feather", "parquet", "rds"), .wait = TRUE, .seed = FALSE, .cores = 1, .plan = NULL )bg_mrgsim_d( mod, data, nchunk = 1, ..., .locker = NULL, .tag = NULL, .format = c("fst", "feather", "parquet", "rds"), .wait = TRUE, .seed = FALSE, .cores = 1, .plan = NULL )
mod |
A model object. |
data |
Data set to simulate; see |
nchunk |
Number of chunks in which to split the data set; chunking will
be based on the |
... |
Arguments passed to |
.locker |
A directory for saving simulated data; use this to collect results from several different runs in a single folder. |
.tag |
A name to use for the current run; results are saved under
|
.format |
The output format for saving simulations; using format
|
.wait |
If |
.seed |
A |
.cores |
The number of cores to parallelize across; pass 1 to run the simulation sequentially. |
.plan |
The name of a |
bg_mrgsim_d() returns a processx::process object (follow that link to
see a list of methods). You will have to call process$get_result() to
retrieve the result. When an output .locker is not specified, simulated
data are returned; when an output .locker is specified, the path to
the fst file on disk is returned. The fst files should be read with
fst::read_fst(). When the results are not saved to .locker, you will
get a single data frame when nchunk is 1 or a list of data frames when
nchunk is greater than 1. It is safest to call dplyr::bind_rows() or
something equivalent on the result if you are expecting data frame.
An r_process object; see callr::r_bg(). Call process$get_result() to
get the actual result (see details). If a .locker path is supplied,
the simulated data is saved to disk and a list of file names is returned.
future_mrgsim_d(), internalize_fst(), list_fst(),
head_fst(), setup_locker()
mod <- mrgsolve::house(delta = 24, end = 168) data <- mrgsolve::expand.ev( amt = c(100, 300, 450), ID = 1:100, ii = 24, addl = 6 ) data <- dplyr::mutate(data, dose = amt) process <- bg_mrgsim_d( mod, data, carry_out = "dose", outvars = "CP", .wait = TRUE ) process$get_result() ds <- file.path(tempdir(), "sims") files <- bg_mrgsim_d( mod, data, carry_out = "dose", .wait = TRUE, .locker = ds, .format = "fst" ) files sims <- internalize_fst(ds) head(sims)mod <- mrgsolve::house(delta = 24, end = 168) data <- mrgsolve::expand.ev( amt = c(100, 300, 450), ID = 1:100, ii = 24, addl = 6 ) data <- dplyr::mutate(data, dose = amt) process <- bg_mrgsim_d( mod, data, carry_out = "dose", outvars = "CP", .wait = TRUE ) process$get_result() ds <- file.path(tempdir(), "sims") files <- bg_mrgsim_d( mod, data, carry_out = "dose", .wait = TRUE, .locker = ds, .format = "fst" ) files sims <- internalize_fst(ds) head(sims)
Use chunk_by_id to split up a data set by the ID column; use
chunk_by_row split a data set by rows.
chunk_by_id(data, nchunk, id_col = "ID", mark = NULL) chunk_by_cols(data, nchunk, cols, mark = NULL) chunk_by_row(data, nchunk, mark = NULL)chunk_by_id(data, nchunk, id_col = "ID", mark = NULL) chunk_by_cols(data, nchunk, cols, mark = NULL) chunk_by_row(data, nchunk, mark = NULL)
data |
A data frame. |
nchunk |
The number of chunks. |
id_col |
Character name specifying the column containing the |
mark |
When populated as a character label, adds a column to the chunked data frames with that name and with value the integer group number. |
cols |
A character vector of columns to use for deriving |
A list of data frames.
x <- expand.grid(ID = 1:10, B = rev(1:10)) chunk_by_id(x, nchunk = 3) chunk_by_row(x, nchunk = 4)x <- expand.grid(ID = 1:10, B = rev(1:10)) chunk_by_id(x, nchunk = 3) chunk_by_row(x, nchunk = 4)
Add or update the file extension for items in a file_stream object.
If a file extension exists, it is removed first.
ext_stream(x, ext)ext_stream(x, ext)
x |
A |
ext |
The new extension. |
format_stream(), locate_stream(), new_stream(), file_stream(),
file_set()
x <- new_stream(3) x <- ext_stream(x, "feather") x[[1]]$filex <- new_stream(3) x <- ext_stream(x, "feather") x[[1]]$file
File names have a numbered core that communicates the current file number
as well as the total number of files in the set. For example, 02-20 would
indicate the second file in a set of 20. Other customizations can be added.
file_set(n, where = NULL, prefix = NULL, pad = TRUE, sep = "-", ext = "")file_set(n, where = NULL, prefix = NULL, pad = TRUE, sep = "-", ext = "")
n |
The number of file names to create. |
where |
An optional output file path. |
prefix |
A character prefix for the file name. |
pad |
If |
sep |
Separator character. |
ext |
A file extension, including the dot. |
By default a list length n of lists length 2; each sublist contains the
integer file number as i and the file name as file.
x <- file_set(3, where = "foo/bar") length(x) x[2] x <- file_set(25, ext = ".feather") x[17]x <- file_set(3, where = "foo/bar") length(x) x[2] x <- file_set(25, ext = ".feather") x[17]
Optionally, setup a locker storage space on disk with a specific file
format (e.g. fst or feather).
file_stream(n, locker = NULL, format = NULL, where = NULL, ...)file_stream(n, locker = NULL, format = NULL, where = NULL, ...)
n |
The number of file names to generate; must be a single numeric value greater than or equal to 1. |
locker |
Passed to |
format |
Passed to |
where |
An optional file path; this is replaced by |
... |
Additional arguments passed to |
Pass locker to set up locker space for saving outputs; this involves
clearing the locker directory (see setup_locker() for details). Passing
locker also sets the path for output files. If you want to set up the path
for output files without setting up locker space, pass where.
format_stream(), locate_stream(), ext_stream(), new_stream(),
file_set()
x <- file_stream(3, locker = temp_ds("foo"), format = "fst") x[[1]]x <- file_stream(3, locker = temp_ds("foo"), format = "fst") x[[1]]
This can be used to check if a file set item has been assigned an output
format (e.g. fst, feather, parquet, qs or rds). If the check returns
FALSE it would signal that data should be returned rather than calling
write_stream().
format_is_set(x) is.stream_format(x)format_is_set(x) is.stream_format(x)
x |
An object, usually a |
Logical indicating if x inherits from one of the stream format classes. .
The format is set on the file objects inside the list so that the file
object can be used to call a write method. See write_stream().
format_stream( x, type = c("fst", "feather", "parquet", "qs", "rds"), set_ext = TRUE, warn = FALSE )format_stream( x, type = c("fst", "feather", "parquet", "qs", "rds"), set_ext = TRUE, warn = FALSE )
x |
A |
type |
The file format type; if |
set_ext |
If |
warn |
If |
x is returned with a new class attribute reflecting the expected output
format (fst, feather (arrow), parquet (arrow), qs or rds).
format_is_set(), locate_stream(), ext_stream(),
new_stream(), file_stream(), file_set()
fs <- new_stream(2) fs <- format_stream(fs, "fst") fs[[1]] format_is_set(fs[[1]])fs <- new_stream(2) fs <- format_stream(fs, "fst") fs[[1]] format_is_set(fs[[1]])
Get the head of an fst file set
head_fst(path, n = 5, i = 1)head_fst(path, n = 5, i = 1)
path |
The directory to search. |
n |
Number of rows to show. |
i |
Which output output chunk to show. |
Get the contents of an fst file set
internalize_fst(path, .as_list = FALSE, ...) get_fst(path, .as_list = FALSE, ...)internalize_fst(path, .as_list = FALSE, ...) get_fst(path, .as_list = FALSE, ...)
path |
The directory to search. |
.as_list |
Should the results be returned as a list ( |
... |
Not used. |
Check if a directory is dedicated locker space
is_locker_dir(where)is_locker_dir(where)
where |
The locker location. |
Check if an object is a file_set_item
is.file_set_item(x)is.file_set_item(x)
x |
An object. |
Logical value indicating if x has the file_set_item attribute set..
x <- new_stream(2) is.file_set_item(x[[2]])x <- new_stream(2) is.file_set_item(x[[2]])
Check if an object inherits from file_stream
is.file_stream(x)is.file_stream(x)
x |
An object. |
Logical value indicating if x inherits from file_stream.
x <- new_stream(2) is.file_stream(x)x <- new_stream(2) is.file_stream(x)
Check if an object inherits from locker_stream
is.locker_stream(x)is.locker_stream(x)
x |
An object. |
Logical value indicating if x inherits from locker_stream.
x <- new_stream(2, locker = temp_ds("locker-stream-example")) is.locker_stream(x)x <- new_stream(2, locker = temp_ds("locker-stream-example")) is.locker_stream(x)
Use the function to read all of the .fst files that were saved when
bg_mrgsim_d was called and .path was passed along with .format = "fst".
list_fst(path)list_fst(path)
path |
The (full) directory path to search. |
Add or update the directory location for items in a file_stream object.
If a directory path already exists, it is removed first.
locate_stream(x, where, initialize = FALSE)locate_stream(x, where, initialize = FALSE)
x |
A |
where |
The new location. |
initialize |
If |
When initialize is set to TRUE, the locker space is initialized or
reset. In order to initialize, where must not exist or it must have been
previously set up as locker space. See setup_locker() for details.
format_stream(), ext_stream(), new_stream(), file_stream(),
file_set()
x <- new_stream(5) x <- locate_stream(x, file.path(tempdir(), "foo")) x[[1]]$filex <- new_stream(5) x <- locate_stream(x, file.path(tempdir(), "foo")) x[[1]]$file
Use this function when running mrgsolve while parallelizing on a multisession worker node where the model dll might not be loaded.
mrgsim_ms(mod, ...) mrgsim_worker(mod, ...)mrgsim_ms(mod, ...) mrgsim_worker(mod, ...)
mod |
a model object |
... |
passed to |
mrgsim_worker(mrgsolve:::house())mrgsim_worker(mrgsolve:::house())
Simulate with 'mrgsolve' in Parallel
mrgsim.parallel.mc.able: if TRUE, multicore will be used if appropriate.
By stream we mean a list that pre-specifies the output file names,
replicate numbers and possibly input objects for a simulation. Passing
locker initiates a call to setup_locker(), which sets up or resets
the output directories.
For the data.frame method, the data are chunked into a list by columns
listed in cols. Ideally, this is a singlel column that operates as
a unique ID across the data set and is used by chunk_by_id() to
form the chunks. Alternatively, cols can be multiple column names which
are pasted together to form a unique ID that is used for splitting
via chunk_by_cols().
new_stream(x, ...) ## S3 method for class 'list' new_stream(x, locker = NULL, format = NULL, ...) ## S3 method for class 'data.frame' new_stream(x, nchunk, cols = "ID", locker = NULL, format = NULL, ...) ## S3 method for class 'numeric' new_stream(x, ...) ## S3 method for class 'character' new_stream(x, ...)new_stream(x, ...) ## S3 method for class 'list' new_stream(x, locker = NULL, format = NULL, ...) ## S3 method for class 'data.frame' new_stream(x, nchunk, cols = "ID", locker = NULL, format = NULL, ...) ## S3 method for class 'numeric' new_stream(x, ...) ## S3 method for class 'character' new_stream(x, ...)
x |
A list or vector to template the stream; for the |
... |
Additional arguments passed to |
locker |
Passed to |
format |
Passed to |
nchunk |
The number of chunks. |
cols |
The name(s) of the column(s) specifying unique IDs to use to
split the |
A list with the following elements:
i the position number
file the output file name
x the input object.
The list has class file_stream as well as locker_stream (if locker was
passed) and a class attribute for the output if format was passed.
format_stream(), locate_stream(), ext_stream(), file_stream(),
file_set()
x <- new_stream(3) x[[1]] new_stream(2, locker = file.path(tempdir(), "foo")) df <- data.frame(ID = c(1,2,3,4)) x <- new_stream(df, nchunk = 2) x[[2]] format_is_set(x[[2]]) x <- new_stream(3, format = "fst") format_is_set(x[[2]])x <- new_stream(3) x[[1]] new_stream(2, locker = file.path(tempdir(), "foo")) df <- data.frame(ID = c(1,2,3,4)) x <- new_stream(df, nchunk = 2) x[[2]] format_is_set(x[[2]]) x <- new_stream(3, format = "fst") format_is_set(x[[2]])
This function removes the the hidden locker file which designates a directory
as a locker. Once the locker is modified this way, it cannot be reset again
by calling setup_locker() or new_stream().
noreset_locker(where)noreset_locker(where)
where |
The locker location. |
A logical value indicating if write ability was successfully revoked.
setup_locker(), reset_locker(), version_locker()
Use future_mrgsim_d() to simulate with the future package. Use
mc_mrgsim_d() to simulate with parallel::mclapply.
future_mrgsim_d( mod, data, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = TRUE, .parallel = TRUE ) mc_mrgsim_d( mod, data, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = NULL, .parallel = TRUE ) fu_mrgsim_d( mod, data, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = TRUE, .parallel = TRUE ) fu_mrgsim_d0(..., .dry = TRUE)future_mrgsim_d( mod, data, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = TRUE, .parallel = TRUE ) mc_mrgsim_d( mod, data, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = NULL, .parallel = TRUE ) fu_mrgsim_d( mod, data, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = TRUE, .parallel = TRUE ) fu_mrgsim_d0(..., .dry = TRUE)
mod |
The mrgsolve model object see mrgsolve::mrgmod. |
data |
Data set to simulate; see |
nchunk |
Number of chunks in which to split the data set; chunking will
be based on the |
... |
Passed to |
.as_list |
If |
.p |
Post processing function executed on the worker; arguments should be (1) the simulated output (2) the model object. |
.dry |
If |
.seed |
Passed to |
.parallel |
if |
A data frame or list of simulated data.
mod <- mrgsolve::house() data <- mrgsolve::expand.ev(amt = seq(10)) out <- future_mrgsim_d(mod, data, nchunk = 2)mod <- mrgsolve::house() data <- mrgsolve::expand.ev(amt = seq(10)) out <- future_mrgsim_d(mod, data, nchunk = 2)
Use future_mrgsim_ei to simulate with the future package. Use
mc_mrgsim_ei to simulate with parallel::mclapply.
future_mrgsim_ei( mod, events, idata, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = TRUE, .parallel = TRUE ) fu_mrgsim_ei( mod, events, idata, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = TRUE, .parallel = TRUE ) fu_mrgsim_ei0(..., .dry = TRUE) mc_mrgsim_ei( mod, events, idata, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = NULL, .parallel = TRUE )future_mrgsim_ei( mod, events, idata, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = TRUE, .parallel = TRUE ) fu_mrgsim_ei( mod, events, idata, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = TRUE, .parallel = TRUE ) fu_mrgsim_ei0(..., .dry = TRUE) mc_mrgsim_ei( mod, events, idata, nchunk = 4, ..., .as_list = FALSE, .p = NULL, .dry = FALSE, .seed = NULL, .parallel = TRUE )
mod |
The mrgsolve model object see mrgsolve::mrgmod. |
events |
An event object from mrgsolve; see |
idata |
An idata set of parameters, one per simulation unit (individual);
see |
nchunk |
Number of chunks in which to split the data set; chunking will
be based on the |
... |
Passed to |
.as_list |
If |
.p |
Post processing function executed on the worker; arguments should be (1) the simulated output (2) the model object. |
.dry |
If |
.seed |
Passed to |
.parallel |
if |
A data frame or list of simulated data.
mod <- mrgsolve::house() events <- mrgsolve::ev(amt = 100) idata <- data.frame(CL = runif(10, 0.5, 1.5)) out <- future_mrgsim_ei(mod, events, idata)mod <- mrgsolve::house() events <- mrgsolve::ev(amt = 100) idata <- data.frame(CL = runif(10, 0.5, 1.5)) out <- future_mrgsim_ei(mod, events, idata)
This function is called by setup_locker() to initialize and
re-initialize a locker directory. We call it reset_locker because it is
expected that the locker space is created once and then repeatedly
reset and simulations are run and re-run.
reset_locker(where, pattern = NULL)reset_locker(where, pattern = NULL)
where |
The full path to the locker. |
pattern |
A regular expression for finding files to clear from the locker directory. |
For the locker space to be initialized, the where directory must not
exist; if it exists, there will be an error. It is also an error for
where to exist and not contain a particular hidden locker file name
that marks the directory as established locker space.
NOTE: when the locker is reset, all contents are cleared according
to the files matched by pattern. If any un-matched files exist after
clearing the directory, a warning will be issued.
setup_locker(), noreset_locker(), version_locker()
A locker is a directory structure where an enclosing folder contains subfolders that in turn contain the results of different simulation runs. When the number of simulation result sets is known, a stream of file names is returned. This function is mainly called by other functions; an exported function and documentation is provided in order to better communicate how the locker works.
setup_locker(where, tag = locker_tag(where))setup_locker(where, tag = locker_tag(where))
where |
The directory that contains tagged directories of run results. |
tag |
The name of a folder under |
where must exist when setting up the locker. The directory tag will be
created under where and must not exist except if it had previously been
set up using setup_locker. Existing tag directories will have a
hidden file in them indicating that they are established simulation output
folders.
When recreating the tag directory, it will be unlinked and created new.
To not try to set up a locker directory that already contains outputs that
need to be preserved. You can call noreset_locker() on that directory
to prevent future resets.
The locker location.
reset_locker(), noreset_locker(), version_locker()
x <- setup_locker(tempdir(), tag = "my-sims") xx <- setup_locker(tempdir(), tag = "my-sims") x
Create a path to a dataset in tempdir
temp_ds(tag)temp_ds(tag)
tag |
The dataset subdirectory. |
Version locker contents
version_locker(where, version = "save", overwrite = FALSE, noreset = FALSE)version_locker(where, version = "save", overwrite = FALSE, noreset = FALSE)
where |
The locker location. |
version |
A tag to be appended to |
overwrite |
If |
noreset |
If |
A logical value indicating whether or not all files were successfully copied to the backup, invisibly.
reset_locker(), noreset_locker(), setup_locker()
locker <- file.path(tempdir(), "version-locker-example") if(dir.exists(locker)) unlink(locker, recursive = TRUE) x <- new_stream(1, locker = locker) cat("test", file = file.path(locker, "1-1")) dir.exists(locker) list.files(locker, all.files = TRUE) y <- version_locker(locker, version = "y") y list.files(y, all.files = TRUE)locker <- file.path(tempdir(), "version-locker-example") if(dir.exists(locker)) unlink(locker, recursive = TRUE) x <- new_stream(1, locker = locker) cat("test", file = file.path(locker, "1-1")) dir.exists(locker) list.files(locker, all.files = TRUE) y <- version_locker(locker, version = "y") y list.files(y, all.files = TRUE)
This function will write out objects that have been assigned a format
with either format_stream() or the format argument to new_stream().
See examples.
write_stream(x, ...) ## Default S3 method: write_stream(x, data, ...) ## S3 method for class 'stream_format_fst' write_stream(x, data, dir = NULL, ...) ## S3 method for class 'stream_format_feather' write_stream(x, data, dir = NULL, ...) ## S3 method for class 'stream_format_parquet' write_stream(x, data, dir = NULL, ...) ## S3 method for class 'stream_format_qs' write_stream(x, data, dir = NULL, ...) ## S3 method for class 'stream_format_rds' write_stream(x, data, dir = NULL, ...)write_stream(x, ...) ## Default S3 method: write_stream(x, data, ...) ## S3 method for class 'stream_format_fst' write_stream(x, data, dir = NULL, ...) ## S3 method for class 'stream_format_feather' write_stream(x, data, dir = NULL, ...) ## S3 method for class 'stream_format_parquet' write_stream(x, data, dir = NULL, ...) ## S3 method for class 'stream_format_qs' write_stream(x, data, dir = NULL, ...) ## S3 method for class 'stream_format_rds' write_stream(x, data, dir = NULL, ...)
x |
A |
... |
Not used. |
data |
An object to write. |
dir |
An optional directory location to be used if not already in
the |
The default method always returns FALSE; other methods which get invoked
if a format was set will return TRUE. So, the user can always call
write_stream() and check the return value: if TRUE, the file was written
to disk and the data to not need to be returned; a FALSE return value
indicates that no format was set and the data should be returned.
Note the write methods can be invoked directly for a specific format
if no format was set (see examples).
A logical value indicating if the output was written or not.
format_stream(), ext_stream(), locate_stream(), new_stream(),
file_stream()
ds <- temp_ds("example") fs <- new_stream(2, locker = ds, format = "fst") data <- data.frame(x = rnorm(10)) x <- lapply(fs, write_stream, data = data) list.files(ds) reset_locker(ds) fs <- format_stream(fs, "rds") x <- lapply(fs, write_stream, data = data) list.files(ds)ds <- temp_ds("example") fs <- new_stream(2, locker = ds, format = "fst") data <- data.frame(x = rnorm(10)) x <- lapply(fs, write_stream, data = data) list.files(ds) reset_locker(ds) fs <- format_stream(fs, "rds") x <- lapply(fs, write_stream, data = data) list.files(ds)