Lithops high-level architecture design:#
The chart below presents the main components of the architecture of Lithops. The components are largely divided into three sets: * Components inside the dashed purple frame comprise the client code (running e.g., on your laptop). * Components inside the dashed orange frame comprise a Lithops worker. Each worker executes a “call”, one unit of computation (e.g., processing one dataset record, or one object) within a larger map or reduce job of Lithops. Workers execute in the compute backend of choice, such as IBM Cloud Functions, Knative Serving or even your local laptop. * Outside both frames are various external facilities or services with which Lithops components interact. These external components are marked in ellipses, each one with its name and logo.
Note that the sets of Lithops components in both dashed frames are partially overlapping. In particular, the storage components are shared since Lithops’s main communication between the client and the workers relies on storage.
The Lithops components themselves consist of key classes and modules. Both classes and modules are shown using UML class symbols, but class names start with uppercase letters and have an
__init__() method, while modules do not.
The top-level client API module of Lithops,
lithops.__init__, is shown as a green module. It provides the API functions described in the documentation. Each of its functions creates an instance of
FunctionExecutor, which is the main class implementing Lithops logic, shown in with a gray background.
A worker is deployed by the client code, but independently of the client - separate process, container, etc. Therefore, it requires a separate entry point to its logic, defined in the
entry_point module, with a yellow background. As part of its execution, it eventually creates an instance of
JobRunner, which is the main class implementing the Lithops worker logic, also shown with a gray background.
The last components worth special mentioning are
StorageBackend. Each of these components has a light blue background and is surrounded by a compute or storage symbol, respectively, in dashed dark blue.
ComputeBackend pseudo-interface represents a backend medium in which Lithops workers are deployed for executing computation - such as IBM Cloud Functions, Knative Serving, a Docker cloud, or your local laptop. A specific compute backend is chosen by using the specific API executor function.
StorageBackend pseudo-interface represents a backend medium which Lithops uses for communicating between the client and the workers. When a client invokes a job, which is then passed to multiple workers for execution, the job data and specific call data is stored in this medium. Upon a worker completing a call, results are also stored in this medium. A specific storage backend is selected through Lithops configuration - IBM Cloud Object Storage, OpenStack Swift or local laptop’s file-system. All storage usage in Lithops adheres to object storage semantics, including objects, buckets, writing each object once, etc.
In Lithops, each map or reduce computation is executed as a separate compute job. This means that calling a
FunctionExecutor.map() results in one job, and calling
FunctionExecutor.map_reduce() results in two jobs, one of
map() and one of
reduce(), executed one after the other.
As mentioned above, the
FunctionExecutor class is responsible for orchestrating the computation in Lithops. One
FunctionExecutor object is instantiated prior to any use of Lithops. Its initialization includes these important steps: 1. It sets up the workers (depending on the specific compute backend), such as constructing docker images, defining IBM Cloud Functions, etc. This step may not include actually creating the workers, as this may be done automatically by the backend on-demand. 2. It defines a bucket in object storage (depending on the storage backend) in which each job will store job and call data (prior to computation) and results (when computation is complete). 3. It creates a
FunctionInvoker object, which is responsible for executing a job as a set of independent per-worker calls.
Compute jobs are created in the functions of the
job module (see chart above), invoked from the respective API method of
FunctionExecutor. Map jobs are created in
create_map_job() and reduce jobs in
create_reduce_job(). The flow in both functions is quite similar. First, data is partitioned, with the intention of each partition be processed by one worker. For map jobs, this is done by invoking the
create_partitions() function of the
partitioner module, yielding a partition map.
For reduce jobs, Lithops currently supports two modes: reduce per object, where each object is processed by a reduce function, and global (default) reduce, where all data is processed by a single reduce function. Respectively, data is partitioned as either one partition per storage object, or one global partition with all data. This process yields a partition map similar to map jobs. Additionally,
create_reduce_job() wraps the reduce function in a special wrapper function that forces waiting for data before the actual reduce function is invoked. This is because reduce jobs follow map jobs, so the output of the map jobs needs to finish before reduce can run.
Eventually, both functions of
create_reduce_job() end up calling
_create_job() which is the main flow of creating a job, described in high-level below: 1. A
job_description record is defined for the job (and is eventually returned from all job creation functions) 2. The partition map and the data processing function (that processes a single partition in either map or reduce jobs) are each pickled (serialized) into a byte sequence. 3. The pickled partition map is stored in the object storage bucket, under
agg_data_key object 4. The pickled processing function and its module dependencies are stored in the same bucket under
Once job creation is done and the
job_description record for the new job is returned to the
FunctionExecutor object, it proceeds to execute the job by calling
run() method of its
FunctionInvoker instance. This triggers the following flow: 1. The job is executed as a set of independent calls (invocations) that are submitted to a
ThreadPoolExecutor object (thread pool size is defined by configuration). This means call invocation is concurrent from the start. 2. Each call executes first a call to an internal
invoke() function defined inside
FunctionInvoker.run(), which builds a
payload (parameter) as a single dictionary with all the data the call needs. The call data includes copy of some of the
job_description data as well as some specific data for the call such as: *
call_id (integer ranging from 0 to
total_calls - 1) *
data_byte_range - defines the specific partition for this call, as defined by the partitioner during job creation *
output_key - specific storage object (in the bucket) for computation output *
status_key - specific storage object (in the bucket) for computation logs 3. Invocation proceeds to
Compute.invoke(), which adds a retry mechanism for the current call, with random delays between retries (all configurable). 4. Invocation proceeds to
ComputeBackend.invoke(). Further execution depends on the compute backend: * On IBM Cloud Functions,
invoke() is performed as a standard non-blocking action invocation, with the payload being included as a single JSON parameter. * On Knative Serving,
invoke() is performed as an HTTP POST request delivered over a connection that lasts for the entire time of the computation. * On a localhost (your laptop),
invoke() is performed as posting the call on a queue. A master process continuously pulls calls from the queue and dispatches them onto processes from a pool of configurable size. * On a Docker cloud,
invoke() is performed similar to localhost above, except the processes in the pool controlled by the master further delegate execution to a Docker container they create. 5. When computation completes, each call commits the result to object storage in the configured bucket under
output_key object 6. Each
invoke() returns a
ResponseFuture object, which is a future object to wait on for the computed result of each call 7. A list of
ResponseFuture objects returned by
FunctionInvoker.run() is stored in the
FunctionExecutor object and also returned by its respective method for map [+reduce] job. Later calls to
get_result() can be used to wait for job completion and retrieve the results, respectively.
Detecting Completion of Job#
Completion of a computation job in Lithops is detected in one of two techniques: using RabbitMQ or polling object storage. The choice of either technique is configurable. A waiting part is implemented in
FunctionExecutor.wait(). A notification part is implemented in the worker code, depending on the chosen technique. This way, waiting in
FunctionExecutor completes when all calls have notified completion, or a pre-configured timeout has expired.
RabbitMQ: A unique RabbitMQ topic is defined for each job. combining the executor id and job id. Each worker, once completes a call, posts a notification message on that topic (code in
handler module, called from
entry_point module of the worker). The
wait_rabbitmq() function from
wait_rabbitmq module, which is called from
FunctionExecutor.wait(), consumes a number of messages on that topic equal to
total_calls and determines completion.
Object Storage: As explained above, each call persists its computation results in a specific object. Determining completion of a job is by the
FunctionExecutor.wait() invoking the
wait_storage() function from the
wait_storage module. This function repeatedly, once per fixed period (controllable), polls the executor’s bucket for status objects of a subset of calls that have still not completed. This allows control of resource usage and eventual detection of all calls.