Parallelisation with nessai
One benefit of the proposal method used in nessai
is it allows for simple parallelisation of the likelihood evaluation since new live points are drawn in batches and then stored. The likelihood can therefore be precomputed and stored later use.
Enabling parallelisation
There are two keyword arguments that must be set to enable parallelisation:
n_pool
: The number of threads to use for evaluating the likelihoodmax_threads
: The maximum number of threads to use, this should be at least 1 larger thann_pool
. Extra threads are allocated to PyTorch’s CPU parallelisation.
Note
If running nessai
via a job scheduler such as HTCondor, remember to set the number of requested threads accordingly. This should match max_threads
.
Specifying a pool
Alternatively, nessai
can use a user-defined pool. This is specified by setting the pool
argument in NestedSampler
or FlowSampler
. Some variables must be initialised when creating the pool, this is done using initialise_pool_variables()
:
from multiprocessing import Pool
from nessai.utils.multiprocessing import initialise_pool_variables
model = GaussianModel()
pool = Pool(
processes=2,
initializer=initialise_pool_variables,
initargs=(model,),
)
pool
can then passed to the pool
keyword argument when setting up the sampler.
Using ray
ray
includes a distributed multiprocessing pool that can also be used with nessai
. Simply import ray.util.multiprocessing.Pool
instead of the standard pool and initialise using the method described above.
Example usage
#!/usr/bin/env python
"""
Example of parallelising the likelihood evaluation in nessai.
Shows the two methods supported in nessai: setting n_pool or using a
user-defined pool.
"""
import numpy as np
from multiprocessing import Pool
from nessai.flowsampler import FlowSampler
from nessai.model import Model
from nessai.utils import setup_logger
from nessai.utils.multiprocessing import initialise_pool_variables
output = './outdir/parallelisation_example/'
logger = setup_logger(output=output)
# Generate the data
truth = {'mu': 1.7, 'sigma': 0.7}
bounds = {'mu': [-3, 3], 'sigma': [0.01, 3]}
n_points = 1000
data = np.random.normal(truth['mu'], truth['sigma'], size=n_points)
class GaussianLikelihood(Model):
"""
Gaussian likelihood with the mean and standard deviation as the parameters
to infer.
Parameters
----------
data : :obj:`numpy.ndarray`
Array of data.
bounds : dict
The prior bounds.
"""
def __init__(self, data, bounds):
self.names = list(bounds.keys())
self.bounds = bounds
self.data = data
def log_prior(self, x):
"""Uniform prior on both parameters."""
log_p = np.log(self.in_bounds(x))
for bounds in self.bounds.values():
log_p -= np.log(bounds[1] - bounds[0])
return log_p
def log_likelihood(self, x):
"""Gaussian likelihood."""
log_l = np.sum(
- np.log(x['sigma']) -
0.5 * ((self.data - x['mu']) / x['sigma']) ** 2
)
return log_l
# Using n_pool
logger.warning('Running nessai with n_pool')
# Configure the sampler with 3 total threads, 2 of which are used for
# evaluating the likelihood.
fs = FlowSampler(
GaussianLikelihood(data, bounds),
output=output,
resume=False,
seed=1234,
max_threads=3, # Maximum number of threads
n_pool=2, # Threads for evaluating the likelihood
)
# Run the sampler
fs.run()
# Using a user-defined pool
logger.warning('Running nessai with a user-defined pool')
# Must initialise the global variables for the pool prior to starting it
model = GaussianLikelihood(data, bounds)
initialise_pool_variables(model)
# Define the pool
pool = Pool(2)
fs = FlowSampler(
model,
output=output,
resume=False,
seed=1234,
pool=pool, # User-defined pool
)
# Run the sampler
# The pool will automatically be closed. This can be disabled by passing
# `close_pool=False` to the sampler.
fs.run()