"""Dask cache utilities.
"""
import warnings
from contextlib import contextmanager
from distutils.version import LooseVersion
from typing import Callable, ContextManager, Optional
import dask
import dask.array as da
from dask.cache import Cache
from .. import utils
def create_dask_cache(
nbytes: Optional[int] = None, mem_fraction: float = 0.1
) -> Cache:
"""Create a dask cache at utils.dask_cache if one doesn't already exist.
Parameters
----------
nbytes : int, optional
The desired size of the cache, in bytes. If ``None``, the cache size
will autodetermined as fraction of the total memory in the system,
using ``mem_fraction``. If ``nbytes`` is 0, cache object will be
created, but not caching will occur. by default, cache size is
autodetermined using ``mem_fraction``.
mem_fraction : float, optional
The fraction (from 0 to 1) of total memory to use for the dask cache.
by default, 10% of total memory is used.
Returns
-------
dask_cache : dask.cache.Cache
An instance of a Dask Cache
"""
import psutil
if nbytes is None:
nbytes = psutil.virtual_memory().total * mem_fraction
if not (
hasattr(utils, 'dask_cache') and isinstance(utils.dask_cache, Cache)
):
utils.dask_cache = Cache(nbytes)
utils.dask_cache.register()
return utils.dask_cache
[docs]def resize_dask_cache(
nbytes: Optional[int] = None, mem_fraction: float = None
) -> Cache:
"""Create or resize the dask cache used for opportunistic caching.
The cache object is an instance of a :class:`Cache`, (which
wraps a :class:`cachey.Cache`), and is made available at
:attr:`napari.utils.dask_cache`.
See `Dask opportunistic caching
<https://docs.dask.org/en/latest/caching.html>`_
Parameters
----------
nbytes : int, optional
The desired size of the cache, in bytes. If ``None``, the cache size
will autodetermined as fraction of the total memory in the system,
using ``mem_fraction``. If ``nbytes`` is 0. The cache is turned off.
by default, cache size is autodetermined using ``mem_fraction``.
mem_fraction : float, optional
The fraction (from 0 to 1) of total memory to use for the dask cache.
Returns
-------
dask_cache : dask.cache.Cache
An instance of a Dask Cache
Examples
--------
>>> from napari.utils import resize_dask_cache
>>> cache = resize_dask_cache() # use 50% of total memory by default
>>> # dask.Cache wraps cachey.Cache
>>> assert isinstance(cache.cache, cachey.Cache)
>>> # useful attributes
>>> cache.cache.available_bytes # full size of cache
>>> cache.cache.total_bytes # currently used bytes
"""
import psutil
if nbytes is None and mem_fraction is not None:
nbytes = psutil.virtual_memory().total * mem_fraction
# if we don't have a cache already, create one. If neither nbytes nor
# mem_fraction was provided, it will use the default size as determined in
# create_cache.
if not (
hasattr(utils, 'dask_cache') and isinstance(utils.dask_cache, Cache)
):
return create_dask_cache(nbytes)
else: # we already have a cache
# if the cache has already been registered, then calling
# resize_dask_cache() without supplying either mem_fraction or nbytes
# is a no-op:
if (
nbytes is not None
and nbytes != utils.dask_cache.cache.available_bytes
):
utils.dask_cache.cache.resize(nbytes)
return utils.dask_cache
def _is_dask_data(data) -> bool:
"""Return True if data is a dask array or a list/tuple of dask arrays."""
return isinstance(data, da.Array) or (
isinstance(data, (list, tuple))
and any(isinstance(i, da.Array) for i in data)
)
def configure_dask(data) -> Callable[[], ContextManager[dict]]:
"""Spin up cache and return context manager that optimizes Dask indexing.
This function determines whether data is a dask array or list of dask
arrays and prepares some optimizations if so.
When a delayed dask array is given to napari, there are couple things that
need to be done to optimize performance.
1. Opportunistic caching needs to be enabled, such that we don't recompute
(or "re-read") data that has already been computed or read.
2. Dask task fusion must be turned off to prevent napari from triggering
new io on data that has already been read from disk. For example, with a
4D timelapse of 3D stacks, napari may actually *re-read* the entire 3D
tiff file every time the Z plane index is changed. Turning of Dask task
fusion with ``optimization.fuse.active == False`` prevents this.
.. note::
Turning off task fusion requires Dask version 2.15.0 or later.
For background and context, see `napari/napari#718
<https://github.com/napari/napari/issues/718>`_, `napari/napari#1124
<https://github.com/napari/napari/pull/1124>`_, and `dask/dask#6084
<https://github.com/dask/dask/pull/6084>`_.
For details on Dask task fusion, see the documentation on `Dask
Optimization <https://docs.dask.org/en/latest/optimize.html>`_.
Parameters
----------
data : Any
data, as passed to a ``Layer.__init__`` method.
Returns
-------
ContextManager
A context manager that can be used to optimize dask indexing
Examples
--------
>>> data = dask.array.ones((10,10,10))
>>> optimized_slicing = configure_dask(data)
>>> with optimized_slicing():
... data[0, 2].compute()
"""
if _is_dask_data(data):
create_dask_cache() # creates one if it doesn't exist
if dask.__version__ < LooseVersion('2.15.0'):
warnings.warn(
'For best performance with Dask arrays in napari, please '
'upgrade Dask to v2.15.0 or later. Current version is '
f'{dask.__version__}'
)
def dask_optimized_slicing():
with dask.config.set({"optimization.fuse.active": False}) as cfg:
yield cfg
else:
def dask_optimized_slicing():
yield {}
return contextmanager(dask_optimized_slicing)