Parallel execution
from __future__ import annotations
import pickle
from pathlib import Path
from typing import TYPE_CHECKING, Any
from mxlpy.parallel import Cache, parallelise
if TYPE_CHECKING:
from collections.abc import Hashable
Parallelization¶
mxlpy allows easy parallelisation of functions using the parallel module.
The API of this is built in a way to allow for easy mapping, saving results and stopping execution of long running functions.
Sharp edge: be aware that the usual issues of multiprocessing on Windows apply
Basic API¶
The most basic call to parallelise just requires
- A function that consumes an input and returns it's result
- A tuple of
(key, input)
def fn(x: int) -> int:
return x**2
res = parallelise(fn, inputs=[("a", 1), ("b", 2)])
print(dict(res))
0%| | 0/2 [00:00<?, ?it/s]
50%|█████ | 1/2 [00:03<00:03, 3.48s/it]
100%|██████████| 2/2 [00:03<00:00, 1.74s/it]
{'a': 1, 'b': 4}
Restricting cores¶
By default parallelise will use all available CPU cores.
In case you want to limit that, you can use the max_workers parameter to set an explicit number.
res = parallelise(
fn,
inputs=[("a", 1), ("b", 2)],
max_workers=2,
)
print(dict(res))
0%| | 0/2 [00:00<?, ?it/s]
50%|█████ | 1/2 [00:03<00:03, 3.31s/it]
100%|██████████| 2/2 [00:03<00:00, 1.67s/it]
{'a': 1, 'b': 4}
Caching results¶
By supplying the Cache class, you can automatically save and retrieve your calculated values.
The default settings are
- Write results into
.cachefolder - Write and load a
picklefile, using{key}.pofinputsas the name
res = parallelise(
fn,
inputs=[("a", 1), ("b", 2)],
cache=Cache(),
)
print(dict(res))
0%| | 0/2 [00:00<?, ?it/s]
50%|█████ | 1/2 [00:03<00:03, 3.31s/it]
100%|██████████| 2/2 [00:03<00:00, 1.44s/it]
100%|██████████| 2/2 [00:03<00:00, 1.72s/it]
{'a': 1, 'b': 4}
You can overrwite all of this.
Shown here is how you set a custom temporary directory
res = parallelise(
fn,
inputs=[("a", 1), ("b", 2)],
cache=Cache(
tmp_dir=Path(".cache"),
),
)
print(dict(res))
0%| | 0/2 [00:00<?, ?it/s]
50%|█████ | 1/2 [00:03<00:03, 3.46s/it]
100%|██████████| 2/2 [00:03<00:00, 1.73s/it]
{'a': 1, 'b': 4}
And shown here is how you can implement custom load and save functions.
Be careful to always update both load_fn and save_fn if you change one of them
def name_fn(k: Hashable) -> str:
return f"{k}.p"
def load_fn(file: Path) -> Any:
with file.open("rb") as fp:
return pickle.load(fp) # nosec
def save_fn(file: Path, data: Any) -> None:
with file.open("wb") as fp:
pickle.dump(data, fp)
res = parallelise(
fn,
inputs=[("a", 1), ("b", 2)],
cache=Cache(
name_fn=name_fn,
load_fn=load_fn,
save_fn=save_fn,
),
)
print(dict(res))
0%| | 0/2 [00:00<?, ?it/s]
50%|█████ | 1/2 [00:03<00:03, 3.54s/it]
100%|██████████| 2/2 [00:03<00:00, 1.79s/it]
{'a': 1, 'b': 4}
Timeouts¶
You can set a timeout for long-running functions.
This timeout runs per function call, so you can recover all other runs without any problems.
E.g. check below how a is retrieved, while b caused a timeout and is thus missing.
import time
def delay_for_seconds(t: int) -> int:
time.sleep(t)
return t
res = parallelise(
delay_for_seconds,
inputs=[("a", 1), ("b", 2)],
timeout=1,
)
res
0%| | 0/2 [00:00<?, ?it/s]
50%|█████ | 1/2 [00:01<00:01, 1.02s/it]
100%|██████████| 2/2 [00:02<00:00, 1.01s/it]
100%|██████████| 2/2 [00:02<00:00, 1.01s/it]
[]
res = parallelise(
fn,
inputs=[("a", 1), ("b", 2)],
disable_tqdm=True,
)
res
[('a', 1), ('b', 4)]
and customise the loop descriptor using tqdm_desc
res = parallelise(
fn,
inputs=[("a", 1), ("b", 2)],
tqdm_desc="Loop name",
)
res
Loop name: 0%| | 0/2 [00:00<?, ?it/s]
Loop name: 50%|█████ | 1/2 [00:03<00:03, 3.42s/it]
Loop name: 100%|██████████| 2/2 [00:03<00:00, 1.71s/it]
[('a', 1), ('b', 4)]
Running sequentially¶
You can run a function sequentially using parallel=False.
While it might seems odd in the beginning to do this, the point is easy refactoring in cases where you might nest analyses.
You generally don't want n processes to spawn n new processes each.
The parallel=False flag allows you to re-use analyses written using parallel=True with only a single change.
This can also occasionally be useful for de-bugging functions running in parallel
def other_fn(x: int) -> dict[str, int]:
res = parallelise(
fn,
inputs=[("a1", x), ("b1", x**2)],
parallel=False, # this shouldn't be set to True!
)
return dict(res)
res = parallelise(
other_fn,
inputs=[("x1", 2), ("x2", 3)],
parallel=True,
)
res
0%| | 0/2 [00:00<?, ?it/s]
0%| | 0/2 [00:00<?, ?it/s] 100%|██████████| 2/2 [00:00<00:00, 57852.47it/s] 0%| | 0/2 [00:00<?, ?it/s] 100%|██████████| 2/2 [00:00<00:00, 42153.81it/s] 50%|█████ | 1/2 [00:03<00:03, 3.48s/it]
100%|██████████| 2/2 [00:03<00:00, 1.74s/it]
[('x1', {'a1': 4, 'b1': 16}), ('x2', {'a1': 9, 'b1': 81})]