rekall.runtime module

An Extensible Parallel Runtime Library for Rekall.

Runtime: the entrypoint for running large tasks.
Its construction takes a factory function for a worker pool, which are described in the following section.
WorkerPool Factories:
inline_pool_factory
A factory for a pool that executes tasks in sequence in the main process.
get_spawned_process_pool_factory
Returns a factory function for a pool that creates worker processes by spawning new python interpreters. The worker processes do not inherit any context from the main process.
get_forked_process_pool_factory
Returns a factory function for a pool that creates worker processes by forking. The worker processes thus inherits all the global context from the main process such as global variables. However, safely forking a multithreaded program is problematic.
Combiners for Runtime:

Runtime.run() is a MapReduce routine where a large task is divided into smaller chunks and each chunk is fed to a worker (the map step). A combiner is responsible for merging the results of each chunk, i.e. the reduction step. The provided combiners are the following.

union_combiner
The default combiner. It calls union method on the per-chunk results to merge them, assuming the results have type IntervalSet or IntervalSetMapping.
disjoint_domain_combiner
A faster combiner than union_combiner which assumes that the results are of type IntervalSetMapping and every chunk produces its own set of domain keys that are disjoint from results of other chunks.
WorkerPool classes:

Besides using the provided factory functions, one can create their own factory by using the provided WorkerPool classes, or even write their own implementation of the WorkerPool interface, which are described below.

InlineSingleProcessPool
This worker pool uses the main process to execute tasks in sequence.
ForkedProcessPool
This pool uses forking to create worker processes.
SpawnedProcessPool
This pool spawns fresh python interpreter processes. It can run custom initializers when creating the child processes.
AbstractWorkerPool
The abstract interface for all WorkerPool implementations.
AsyncTaskResult interface:
This represents a Future-like object. Custom implementations of WorkerPool interface need to return objects with this interface in map method.
Exception classes:
TaskException
raised when a worker throws during task execution.
RekallRuntimeException
raised when there is error in the Runtime.
class rekall.runtime.AbstractAsyncTaskResult

Bases: object

Definition of the AsyncTaskResult interface

This represents a Future-like object. Custom implementations of WorkerPool interface need to return objects with this interface in map method.

done()

Returns whether the value is ready

get()

Returns the value inside the AsyncTaskResult.

This blocks until the value is ready.

Raises:TaskException – if the AsyncTaskResult contains error.
class rekall.runtime.AbstractWorkerPool

Bases: object

Definition of the WorkerPool interface

Notes

A WorkerPool instance is specialized to running one function, which is why the function to execute is not here in the interface but is instead passed to the worker pool factory function.

map(tasks, done)

Maps the tasks over the available workers in the pool

Parameters:
  • tasks – A list of tasks to execute. Each task is a set of arguments to run the function with.
  • done – A callback function that is called when any task finishes. It takes the set of arguments for the finished task, and optionally an error that the task encountered if there is one.
Returns:

A list of AsyncTaskResults, one for each task in tasks.

shut_down()

Clean up the worker pool after all tasks have finished.

Implementations should release any resources used by the worker pool.

class rekall.runtime.ForkedProcessPool(fn, num_workers)

Bases: rekall.runtime.AbstractWorkerPool

A WorkerPool implementation using forking.

The worker processes will inherit the global context from the main process such as global variables. However, forking a multithreaded program safely is very tricky. In particular, any global thread pool object in the parent process is forked but the actual threads are not available in the forked child processes.

map(tasks, done_callback)

Maps the tasks over the available workers in the pool

Parameters:
  • tasks – A list of tasks to execute. Each task is a set of arguments to run the function with.
  • done – A callback function that is called when any task finishes. It takes the set of arguments for the finished task, and optionally an error that the task encountered if there is one.
Returns:

A list of AsyncTaskResults, one for each task in tasks.

shut_down()

Clean up the worker pool after all tasks have finished.

Implementations should release any resources used by the worker pool.

class rekall.runtime.InlineSingleProcessPool(fn)

Bases: rekall.runtime.AbstractWorkerPool

A single-process implmentation of WorkerPool interface.

map(tasks, done)

Maps the tasks over the available workers in the pool

Parameters:
  • tasks – A list of tasks to execute. Each task is a set of arguments to run the function with.
  • done – A callback function that is called when any task finishes. It takes the set of arguments for the finished task, and optionally an error that the task encountered if there is one.
Returns:

A list of AsyncTaskResults, one for each task in tasks.

shut_down()

Clean up the worker pool after all tasks have finished.

Implementations should release any resources used by the worker pool.

exception rekall.runtime.RekallRuntimeException

Bases: Exception

Exception raised when Runtime encounters error.

class rekall.runtime.Runtime(worker_pool_factory)

Bases: object

Manages execution of function on large number of inputs.

Given a function that can return results for a batch of inputs, and a potentially long list of inputs to run the function with, Runtime helps to divide the inputs into small chunks, also called tasks, and dispatches the tasks to a pool of workers created by worker_pool_factory. It also gracefully handles exceptions in workers and can assemble the partial results.

An example function:

def query(video_ids):
    # Gets the intervals in the input batch of videos
    frames_with_opposing_faces = ...
    # Returns a IntervalSetMapping with video_id as domain key.
    return frames_with_opposing_faces

# A list of 100K video_ids
ALL_VIDEO_IDS = ...

In the example, query(ALL_VIDEO_IDS) is not practical to run in one go. To get the same results, one can use Runtime in one of two ways.

The first way is to dispatch all tasks and wait:

# Running the query on all videos, in chunks of 5 on 16 processes.
rt = Runtime(get_forked_process_pool_factory(num_workers=16))
# Will block until everything finishes
# results is a IntervalSetMapping with all intervals found.
results, failed_video_ids = rt.run(
    query, ALL_VIDEO_IDS, combiner=disjoint_domain_combiner,
    chunksize=5)

The second way is to use iterator:

# Get an iterator that yields partial results from each chunk of 5.
rt = Runtime(get_forked_process_pool_factory(num_workers=16))
gen = rt.get_result_iterator(query, ALL_VIDEO_IDS, chunksize=5)
# Blocks until the first batch is done.
# results_from_one_batch is a IntervalSetMapping with intervals
# found in one task (a chunk of 5 videos).
results_from_one_batch = next(gen)
get_result_iterator(query, args, randomize=True, chunksize=1, print_error=True, dispatch_size=2)

Incrementally dispatches tasks as partial results are consumed.

See class documentation for an example of how to use get_result_iterator(). Exception raised in query are suppressed and if any tasks failed, will raise a RekallRuntimeException after all successful tasks’ results have been yielded. However, such errors can be printed as soon as they occur.

Parameters:
  • args, randomize, chunksize, print_error (query,) – Same as in run().
  • dispatch_size (int, optional) – Number of tasks to dispatch at a time. In this mode, tasks are incrementally dispatched as partial results from preivous tasks are yielded. If not positive, will dispatch all tasks at once. Defaults to the number of CPU cores.
Yields:

Partial results from each task.

Raises:

RekallRuntimeException – Raised after all successful task results have been yielded if there have been failed tasks.

classmethod inline()

Inline Runtime executes each chunk in sequence in one process.

run(query, args, combiner=<function union_combiner>, randomize=True, chunksize=1, progress=False, profile=False, print_error=True)

Dispatches all tasks to workers and waits until everything finishes.

See class documentation for an example of how to use run(). Exception raised in query are suppressed and the unsuccessful subset of args is returned at the end. However, such errors can be printed as soon as they occur.

Parameters:
  • query – A function that can return partial results for any batch of input arguments.
  • args – A potentially long list of input arguments to execute the query with.
  • combiner (optional) – A function that takes two partial results and returns the combination of the two. Defaults to union_combiner which assumes the partial results have a union method.
  • randomize (optional) – Whether to create and dispatch tasks in random order. Defaults to True.
  • chunksize (optional) – The size of the input batch for each task. Defaults to 1.
  • progress (optional) – Whether to display a progress bar. Defaults to False.
  • profile (optional) – Whether to output wall time of various internal stages to stdout. Defaults to False.
  • print_error (optional) – Whether to output task errors to stdout. Defaults to True.
Returns:

A pair (query_output, args_with_err) where query_output is the combined results from successful tasks, and args_with_err is a list that is a subset of args that failed to execute.

class rekall.runtime.SpawnedProcessPool(fn, num_workers, initializer=None)

Bases: rekall.runtime.AbstractWorkerPool

A WorkerPool implementation using spawning.

It creates worker processes by spawning new python interpreters. The worker processes do not inherit any context from the main process. In particular, they have no access to the global variables and imported modules in the main process.

map(tasks, done_callback)

Maps the tasks over the available workers in the pool

Parameters:
  • tasks – A list of tasks to execute. Each task is a set of arguments to run the function with.
  • done – A callback function that is called when any task finishes. It takes the set of arguments for the finished task, and optionally an error that the task encountered if there is one.
Returns:

A list of AsyncTaskResults, one for each task in tasks.

shut_down()

Clean up the worker pool after all tasks have finished.

Implementations should release any resources used by the worker pool.

exception rekall.runtime.TaskException

Bases: Exception

Exception to throw when a worker encounters error during task.

Use raise from syntex to wrap the exception around the real error.

Example:

try:
    ...
except Exception as e:
    raise TaskException() from e
rekall.runtime.disjoint_domain_combiner(result1, result2)

A faster combiner than union_combiner for IntervalSetMapping.

Assumes that the results are of type IntervalSetMapping and every chunk produces its own set of domain keys that are disjoint from results of other chunks.

Parameters:result2 (result1,) – partial results from some chunks of total work.
Returns:A IntervalSetMapping that is the union of the two.
Raises:RekallRuntimeException – Raised if results have common domain key.
rekall.runtime.get_forked_process_pool_factory(num_workers=2)

Returns a factory for ForkedProcessPool.

Parameters:num_workers (optional) – Number of child processes to fork. Defaults to the number of CPU cores on the machine.
Returns:A factory for ForkedProcessPool.
rekall.runtime.get_spawned_process_pool_factory(num_workers=2)

Returns a factory for SpawnedProcessPool.

Parameters:num_workers (optional) – Number of child processes to spawn. Defaults to the number of CPU cores on the machine.
Returns:A factory for SpawnedProcessPool.
rekall.runtime.inline_pool_factory(fn)

Creates a InlineSingleProcessPool.

rekall.runtime.union_combiner(result1, result2)

Combiner that calls union method on the result.