Skip to content

Best Practices

Choosing the Right Executor

Thread vs Process

  • Use threads for I/O-bound tasks:
  • Network requests
  • File operations
  • Database queries

  • Use processes for CPU-bound tasks:
  • Data processing
  • Image manipulation
  • Complex calculations
# I/O-bound example
@parallel(executor_type="thread")
def fetch_data(urls: list) -> list:
    return [requests.get(url).json() for url in urls]

# CPU-bound example
@parallel(executor_type="process")
def process_images(images: list) -> list:
    return [heavy_image_processing(img) for img in images]

Worker Pool Management

Optimal Worker Count

For CPU-bound tasks:

  • Use multiprocessing.cpu_count() as a baseline
  • Consider leaving 1-2 cores free for system tasks

For I/O-bound tasks:

  • Can use more workers than CPU cores
  • Monitor system resources to find optimal number
import multiprocessing

# CPU-bound optimization
@parallel(
    max_workers=max(1, multiprocessing.cpu_count() - 1),
    executor_type="process"
)
def cpu_intensive_task(): ...

# I/O-bound optimization
@parallel(
    max_workers=32,  # Higher count for I/O tasks
    executor_type="thread"
)
def io_intensive_task(): ...

Memory Management

Batch Processing

  • Use batching for large datasets
  • Adjust batch size based on memory constraints
  • Monitor memory usage during processing
# Memory-efficient processing
@parallel(
    max_workers=4,
    batch_size=100,  # Adjust based on item size
    executor_type="process"
)
def process_large_dataset(items: list) -> list:
    return [process_item(item) for item in items]

Resource Cleanup

  • Let Pyarallel handle worker lifecycle
  • Avoid manual executor management
  • Use context managers when needed

Rate Limiting

API Considerations

  • Respect API rate limits
  • Add buffer to prevent limit breaches
  • Use appropriate time intervals
# Safe API usage
@parallel(
    max_workers=4,
    rate_limit=(90, "minute")  # 90% of 100/minute limit
)
def api_call(item_id: str) -> dict:
    return api.get_item(item_id)

Error Handling

Graceful Failure

  • Implement proper error handling
  • Log errors for debugging
  • Consider retry mechanisms
import logging
from tenacity import retry, stop_after_attempt

@retry(stop=stop_after_attempt(3))
@parallel(max_workers=4)
def resilient_process(item):
    try:
        result = process_item(item)
        if not validate_result(result):
            raise ValueError(f"Invalid result for {item}")
        return result
    except Exception as e:
        logging.error(f"Error processing {item}: {e}")
        raise

Performance Optimization

Prewarming

  • Use prewarming for latency-sensitive applications
  • Consider startup cost vs benefit
@parallel(
    max_workers=4,
    prewarm=True,  # Prewarm for faster initial response
    executor_type="process"
)
def latency_sensitive_task(): ...

Batch Size Optimization

  • Small batches for low latency
  • Larger batches for high throughput
  • Balance based on use case
# Low latency needs
@parallel(batch_size=5)
def realtime_processing(): ...

# High throughput needs
@parallel(batch_size=100)
def bulk_processing(): ...

Testing and Monitoring

Unit Testing

  • Test with different worker counts
  • Verify error handling
  • Check resource cleanup
def test_parallel_processing():
    @parallel(max_workers=2)
    def test_func(x):
        return x * 2

    # Test with various inputs
    assert test_func([1, 2, 3]) == [2, 4, 6]

    # Test error handling
    with pytest.raises(ValueError):
        test_func(['invalid'])

Production Monitoring

  • Monitor worker pool health
  • Track memory usage
  • Log performance metrics
import logging

@parallel(max_workers=4)
def monitored_task(item):
    start_time = time.time()
    try:
        result = process_item(item)
        duration = time.time() - start_time
        logging.info(f"Processed {item} in {duration:.2f}s")
        return result
    except Exception as e:
        logging.error(f"Failed to process {item}: {e}")
        raise