Skip to content

Advanced Features

Progress Tracking

Track progress for long-running operations:

from pyarallel import parallel_map

def my_progress(done, total):
    print(f"\r{done}/{total} ({100*done//total}%)", end="", flush=True)

results = parallel_map(process, items, workers=8, on_progress=my_progress)

If items has a known length, total is the final input size. If items is an unsized iterable (for example a generator) and you also set batch_size, Pyarallel keeps input consumption lazy; in that mode total is the number of items discovered so far.

Works with both parallel_map and .map():

@parallel(workers=4)
def fetch(url): ...

results = fetch.map(urls, on_progress=lambda d, t: print(f"{d}/{t}"))

tqdm Integration

Wire on_progress to a tqdm progress bar:

from tqdm import tqdm
from pyarallel import parallel_map

def fetch(url):
    return requests.get(url, timeout=10).json()

with tqdm(total=len(urls)) as pbar:
    result = parallel_map(
        fetch, urls, workers=10,
        on_progress=lambda done, total: (setattr(pbar, 'n', done), pbar.refresh()),
    )

Note

tqdm works best with known-size inputs (lists, ranges). For generators with batch_size, the total changes as batches are consumed, which makes the progress bar jump — use a plain on_progress callback instead.

Timeouts

Total Timeout (sync)

Set a wall-clock limit for the entire operation:

results = parallel_map(fetch, urls, workers=10, timeout=30.0)

# Tasks that didn't complete are marked as failures
if not results.ok:
    for idx, exc in results.failures():
        if isinstance(exc, TimeoutError):
            print(f"Item {idx} timed out")

Per-Task Timeout (async)

The async API supports per-task timeouts via asyncio.wait_for:

from pyarallel import async_parallel_map

results = await async_parallel_map(fetch, urls, concurrency=10, task_timeout=5.0)
# Each individual task gets 5 seconds before timing out

Method Support

The @parallel decorator works with instance methods via the descriptor protocol:

Instance Methods

class Scraper:
    def __init__(self, session):
        self.session = session

    @parallel(workers=4)
    def fetch(self, url):
        return self.session.get(url).text

s = Scraper(requests.Session())
s.fetch("http://example.com")       # normal — returns str
s.fetch.map(urls)                   # parallel — returns ParallelResult

Static Methods

class MathUtils:
    @staticmethod
    @parallel(workers=4)
    def square(x):
        return x ** 2

MathUtils.square(5)                 # 25
MathUtils.square.map([1, 2, 3])    # ParallelResult([1, 4, 9])

Using parallel_map with Methods

You can always use parallel_map directly with bound methods:

scraper = Scraper(session)
results = parallel_map(scraper.fetch, urls, workers=8)

Overriding Decorator Defaults

Per-call overrides on .map():

@parallel(workers=2, rate_limit=RateLimit(10, "second"))
def process(item): ...

# Override workers and rate limit for this specific call
results = process.map(big_list, workers=16, rate_limit=RateLimit(1000, "minute"))

Async Decorator

Same pattern as sync, with async/await:

from pyarallel import async_parallel

@async_parallel(concurrency=10)
async def fetch(url):
    async with httpx.AsyncClient() as c:
        return (await c.get(url)).json()

data = await fetch("http://example.com")       # single call
results = await fetch.map(urls)                 # parallel
results = await fetch.map(urls, task_timeout=5.0)  # with per-task timeout

Retry

Built-in per-item retry with exponential backoff and jitter:

from pyarallel import parallel_map, Retry

# Retry up to 3 times with 1s base exponential backoff
# Jitter is ON by default — randomizes delay ±50% to prevent thundering herd
results = parallel_map(fetch, urls, workers=10, retry=Retry(attempts=3, backoff=1.0))

# Only retry transient network errors — fail immediately on bad input
results = parallel_map(fetch, urls, workers=10,
                       retry=Retry(on=(ConnectionError, TimeoutError)))

# Cap max delay and disable jitter (useful for testing)
results = parallel_map(fetch, urls, workers=10,
                       retry=Retry(attempts=5, backoff=2.0, max_delay=30.0, jitter=False))

How backoff works: delay = backoff * 2^attempt, capped at max_delay. With jitter=True (default), the delay is multiplied by a random factor between 0.5 and 1.5 — this prevents all workers from retrying at the exact same moment when a service recovers.

Retries happen inside the worker — only the failing item is retried, not the entire batch. This composes cleanly with rate limiting and batching.

For the full Retry API, see API Reference.

Batching

Control memory for large datasets by processing in chunks:

# Submit 500 items at a time instead of 500,000 at once
results = parallel_map(process, huge_list, workers=8, batch_size=500)

With batch_size set, unsized iterables are consumed lazily one batch at a time instead of being materialized up front.

Errors in one batch don't prevent subsequent batches from running.

Starmap — Multi-Argument Functions

For functions that take multiple arguments, use parallel_starmap or .starmap():

from pyarallel import parallel_starmap

def fetch_with_auth(url, token):
    return requests.get(url, headers={"Authorization": token}).json()

results = parallel_starmap(fetch_with_auth,
                           [(url, token) for url in urls],
                           workers=10)

# Or with the decorator
@parallel(workers=10)
def fetch_with_auth(url, token): ...

results = fetch_with_auth.starmap([(url1, token), (url2, token)])

Streaming — Constant Memory

For large-scale processing where results shouldn't accumulate in memory, use parallel_iter or .stream():

from pyarallel import parallel_iter

# Process 10M items — only one batch of results in memory at a time
for item in parallel_iter(process, ten_million_items,
                          workers=8, batch_size=1000):
    if item.ok:
        db.save(item.value)
    else:
        log_error(item.index, item.error)

# Or with the decorator
@parallel(workers=8)
def process(item): ...

for item in process.stream(huge_list, batch_size=1000):
    if item.ok:
        db.save(item.value)
    else:
        log_error(item.index, item.error)

Results arrive in completion order (fastest tasks first), not input order. Each ItemResult includes the original .index so you can match results to inputs.

When to use which:

API Memory Use case
.map() / parallel_map All results in memory Results fit in memory, need .ok, .failures()
.stream() / parallel_iter Constant (one batch) ETL, streaming to DB, 10M+ items

Structured Error Handling

ParallelResult never silently drops errors:

result = parallel_map(process, items, workers=4)

# Inspect successes and failures separately
for idx, value in result.successes():
    save(idx, value)

for idx, error in result.failures():
    log_error(idx, error)

# Retry just the failed items
failed_items = [items[idx] for idx, _ in result.failures()]
retry_result = parallel_map(process, failed_items, workers=2)

When you iterate or call .values(), failures raise an ExceptionGroup:

try:
    values = list(result)
except ExceptionGroup as eg:
    print(f"{len(eg.exceptions)} tasks failed")
    for exc in eg.exceptions:
        print(f"  {type(exc).__name__}: {exc}")