Parallel execution
from __future__ import annotations
from pathlib import Path
from mxlpy.parallel import Cache, parallelise
from typing import Any
from collections.abc import Hashable
import pickle
Parallelization¶
mxlpy allows easy parallelisation of functions using the parallel module.
The API of this is built in a way to allow for easy mapping, saving results and stopping execution of long running functions.
Sharp edge: be aware that the usual issues of multiprocessing on Windows apply
Basic API¶
The most basic call to parallelise just requires
- A function that consumes an input and returns it's result
- A tuple of
(key, input)
def fn(x: int) -> int:
return x**2
res = parallelise(fn, inputs=[("a", 1), ("b", 2)])
print(dict(res))
0%| | 0/2 [00:00<?, ?it/s]
100%|██████████| 2/2 [00:00<00:00, 13.39it/s]
{'a': 1, 'b': 4}
Restricting cores¶
By default parallelise will use all available CPU cores.
In case you want to limit that, you can use the max_workers parameter to set an explicit number.
res = parallelise(
fn,
inputs=[("a", 1), ("b", 2)],
max_workers=2,
)
print(dict(res))
0%| | 0/2 [00:00<?, ?it/s]
100%|██████████| 2/2 [00:00<00:00, 16.46it/s]
{'a': 1, 'b': 4}
Caching results¶
By supplying the Cache class, you can automatically save and retrieve your calculated values.
The default settings are
- Write results into
.cachefolder - Write and load a
picklefile, using{key}.pofinputsas the name
res = parallelise(
fn,
inputs=[("a", 1), ("b", 2)],
cache=Cache(),
)
print(dict(res))
0%| | 0/2 [00:00<?, ?it/s]
100%|██████████| 2/2 [00:00<00:00, 14.37it/s]
{'a': 1, 'b': 4}
You can overrwite all of this.
Shown here is how you set a custom temporary directory
res = parallelise(
fn,
inputs=[("a", 1), ("b", 2)],
cache=Cache(
tmp_dir=Path(".cache"),
),
)
print(dict(res))
0%| | 0/2 [00:00<?, ?it/s]
100%|██████████| 2/2 [00:00<00:00, 15.65it/s]
{'a': 1, 'b': 4}
And shown here is how you can implement custom load and save functions.
Be careful to always update both load_fn and save_fn if you change one of them
def name_fn(k: Hashable) -> str:
return f"{k}.p"
def load_fn(file: Path) -> Any:
with file.open("rb") as fp:
return pickle.load(fp) # nosec
def save_fn(file: Path, data: Any) -> None:
with file.open("wb") as fp:
pickle.dump(data, fp)
res = parallelise(
fn,
inputs=[("a", 1), ("b", 2)],
cache=Cache(
name_fn=name_fn,
load_fn=load_fn,
save_fn=save_fn,
),
)
print(dict(res))
0%| | 0/2 [00:00<?, ?it/s]
100%|██████████| 2/2 [00:00<00:00, 15.11it/s]
{'a': 1, 'b': 4}
Timeouts¶
You can set a timeout for long-running functions.
This timeout runs per function call, so you can recover all other runs without any problems.
E.g. check below how a is retrieved, while b caused a timeout and is thus missing.
import time
def delay_for_seconds(t: int) -> int:
time.sleep(t)
return t
res = parallelise(
delay_for_seconds,
inputs=[("a", 1), ("b", 2)],
timeout=1,
)
res
0%| | 0/2 [00:00<?, ?it/s]
50%|█████ | 1/2 [00:01<00:01, 1.02s/it]
100%|██████████| 2/2 [00:01<00:00, 1.73it/s]
[('a', 1)]
res = parallelise(
fn,
inputs=[("a", 1), ("b", 2)],
disable_tqdm=True,
)
res
[('a', 1), ('b', 4)]
and customise the loop descriptor using tqdm_desc
res = parallelise(
fn,
inputs=[("a", 1), ("b", 2)],
tqdm_desc="Loop name",
)
res
Loop name: 0%| | 0/2 [00:00<?, ?it/s]
Loop name: 100%|██████████| 2/2 [00:00<00:00, 14.50it/s]
[('a', 1), ('b', 4)]
Running sequentially¶
You can run a function sequentially using parallel=False.
While it might seems odd in the beginning to do this, the point is easy refactoring in cases where you might nest analyses.
You generally don't want n processes to spawn n new processes each.
The parallel=False flag allows you to re-use analyses written using parallel=True with only a single change.
This can also occasionally be useful for de-bugging functions running in parallel
def other_fn(x: int) -> dict[str, int]:
res = parallelise(
fn,
inputs=[("a1", x), ("b1", x**2)],
parallel=False, # this shouldn't be set to True!
)
return dict(res)
res = parallelise(
other_fn,
inputs=[("x1", 2), ("x2", 3)],
parallel=True,
)
res
0%| | 0/2 [00:00<?, ?it/s]
0%| | 0/2 [00:00<?, ?it/s]
100%|██████████| 2/2 [00:00<00:00, 13911.46it/s]
0%| | 0/2 [00:00<?, ?it/s]
100%|██████████| 2/2 [00:00<00:00, 22982.49it/s]
100%|██████████| 2/2 [00:00<00:00, 15.54it/s]
[('x1', {'a1': 4, 'b1': 16}), ('x2', {'a1': 9, 'b1': 81})]