Parallelisation with nessai

One benefit of the proposal method used in nessai is it allows for simple parallelisation of the likelihood evaluation since new live points are drawn in batches and then stored. The likelihood can therefore be precomputed and stored later use.

Enabling parallelisation

There are two keyword arguments that must be set to enable parallelisation:

  • n_pool: The number of threads to use for evaluating the likelihood

  • max_threads: The maximum number of threads to use, this should be at least 1 larger than n_pool. Extra threads are allocated to PyTorch’s CPU parallelisation.

Note

If running nessai via a job scheduler such as HTCondor, remember to set the number of requested threads accordingly. This should match max_threads.

Specifying a pool

Alternatively, nessai can use a user-defined pool. This is specified by setting the pool argument in NestedSampler or FlowSampler. Some variables must be initialised when creating the pool, this is done using initialise_pool_variables():

from multiprocessing import Pool
from nessai.utils.multiprocessing import initialise_pool_variables

model = GaussianModel()
pool = Pool(
    processes=2,
    initializer=initialise_pool_variables,
    initargs=(model,),
)

pool can then passed to the pool keyword argument when setting up the sampler.

Using ray

ray includes a distributed multiprocessing pool that can also be used with nessai. Simply import ray.util.multiprocessing.Pool instead of the standard pool and initialise using the method described above.

Example usage

#!/usr/bin/env python

"""
Example of parallelising the likelihood evaluation in nessai.

Shows the two methods supported in nessai: setting n_pool or using a
user-defined pool.
"""

import numpy as np
from multiprocessing import Pool

from nessai.flowsampler import FlowSampler
from nessai.model import Model
from nessai.utils import setup_logger
from nessai.utils.multiprocessing import initialise_pool_variables


output = './outdir/parallelisation_example/'
logger = setup_logger(output=output)


# Generate the data
truth = {'mu': 1.7, 'sigma': 0.7}
bounds = {'mu': [-3, 3], 'sigma': [0.01, 3]}
n_points = 1000
data = np.random.normal(truth['mu'], truth['sigma'], size=n_points)


class GaussianLikelihood(Model):
    """
    Gaussian likelihood with the mean and standard deviation as the parameters
    to infer.

    Parameters
    ----------
    data : :obj:`numpy.ndarray`
        Array of data.
    bounds : dict
        The prior bounds.
    """
    def __init__(self, data, bounds):
        self.names = list(bounds.keys())
        self.bounds = bounds
        self.data = data

    def log_prior(self, x):
        """Uniform prior on both parameters."""
        log_p = np.log(self.in_bounds(x))
        for bounds in self.bounds.values():
            log_p -= np.log(bounds[1] - bounds[0])
        return log_p

    def log_likelihood(self, x):
        """Gaussian likelihood."""
        log_l = np.sum(
            - np.log(x['sigma']) -
            0.5 * ((self.data - x['mu']) / x['sigma']) ** 2
        )
        return log_l


# Using n_pool
logger.warning('Running nessai with n_pool')
# Configure the sampler with 3 total threads, 2 of which are used for
# evaluating the likelihood.
fs = FlowSampler(
    GaussianLikelihood(data, bounds),
    output=output,
    resume=False,
    seed=1234,
    max_threads=3,               # Maximum number of threads
    n_pool=2,                    # Threads for evaluating the likelihood
)

# Run the sampler
fs.run()

# Using a user-defined pool
logger.warning('Running nessai with a user-defined pool')

# Must initialise the global variables for the pool prior to starting it
model = GaussianLikelihood(data, bounds)
initialise_pool_variables(model)
# Define the pool
pool = Pool(2)

fs = FlowSampler(
    model,
    output=output,
    resume=False,
    seed=1234,
    pool=pool,                    # User-defined pool
)

# Run the sampler
# The pool will automatically be closed. This can be disabled by passing
# `close_pool=False` to the sampler.
fs.run()

See also