Lithops Futures API#

The primary object in Lithops is the executor. The standard way to get everything set up is to import lithops, and create an instance of one of the available modes of executions.

Lithops is shipped with 3 modes of execution: Localhost, Serverless and Standalone. In this sense, each mode of execution has its own executor class:

  • lithops.LocalhostExecutor(): Executor that uses local processes to run jobs in the local machine.

  • lithops.ServerlessExecutor(): Executor to run jobs in one of the available serverless compute backends.

  • lithops.StandaloneExecutor(): Executor to run jobs in one of the available standalone compute backends.

Additionally, Lithops includes a top-level function executor, which encompasses all three previous executors:

  • lithops.FunctionExecutor(): Generic executor that will use the configuration to determine its mode of execution, i.e., based on the configuration it will be localhost, serverless or standalone.

By default, the executor load the configuration from the config file. Alternatively, you can pass the configuration with a python dictionary. In any case, note that all the parameters set in the executor will overwrite those set in the configuration.

Futures API Reference#

class lithops.executors.FunctionExecutor(mode=None, config=None, config_file=None, backend=None, storage=None, monitoring=None, log_level=False, **kwargs)#

Bases: object

Executor abstract class that contains the common logic for the Localhost, Serverless and Standalone executors

Parameters:
  • mode (str | None) – Execution mode. One of: localhost, serverless or standalone

  • config (Dict[str, Any] | None) – Settings passed in here will override those in lithops_config

  • config_file (str | None) – Path to the lithops config file

  • backend (str | None) – Compute backend to run the functions

  • storage (str | None) – Storage backend to store Lithops data

  • monitoring (str | None) – Monitoring system implementation. One of: storage, rabbitmq

  • log_level (str | None) – Log level printing (INFO, DEBUG, …). Set it to None to hide all logs. If this is param is set, all logging params in config are disabled

  • kwargs (Dict[str, Any] | None) – Any parameter that can be set in the compute backend section of the config file, can be set here

call_async(func, data, extra_env=None, runtime_memory=None, timeout=None, include_modules=[], exclude_modules=[])#

For running one function execution asynchronously.

Parameters:
  • func (Callable) – The function to map over the data.

  • data (List[Any] | Tuple[Any, ...] | Dict[str, Any]) – Input data. Arguments can be passed as a list or tuple, or as a dictionary for keyword arguments.

  • extra_env (Dict | None) – Additional env variables for function environment.

  • runtime_memory (int | None) – Memory to use to run the function.

  • timeout (int | None) – Time that the function has to complete its execution before raising a timeout.

  • include_modules (List | None) – Explicitly pickle these dependencies.

  • exclude_modules (List | None) – Explicitly keep these modules from pickled dependencies.

Returns:

Response future.

Return type:

ResponseFuture

clean(fs=None, cs=None, clean_cloudobjects=True, clean_fn=False, force=False, on_exit=False)#

Deletes all the temp files from storage. These files include the function, the data serialization and the function invocation results. It can also clean cloudobjects.

Parameters:
  • fs (ResponseFuture | List[ResponseFuture] | None) – List of futures to clean

  • cs (List[CloudObject] | None) – List of cloudobjects to clean

  • clean_cloudobjects (bool | None) – Delete all cloudobjects created with this executor

  • clean_fn (bool | None) – Delete cached functions in this executor

  • force (bool | None) – Clean all future objects even if they have not benn completed

  • on_exit (bool | None) –

Parma on_exit:

do not print logs on exit

get_result(fs=None, throw_except=True, timeout=None, threadpool_size=64, wait_dur_sec=None, show_progressbar=True)#

For getting the results from all function activations

Parameters:
  • fs (ResponseFuture | FuturesList | List[ResponseFuture] | None) – Futures list. Default None

  • throw_except (bool | None) – Reraise exception if call raised. Default True.

  • timeout (int | None) – Timeout for waiting for results.

  • threadpool_size (int | None) – Number of threads to use. Default 128

  • wait_dur_sec (int | None) – Time interval between each check. Default 1 second

  • show_progressbar (bool | None) – whether or not to show the progress bar.

Returns:

The result of the future/s

job_summary(cloud_objects_n=0)#

Logs information of a job executed by the calling function executor. currently supports: code_engine, ibm_vpc and ibm_cf.

Parameters:

cloud_objects_n (int | None) – number of cloud object used in COS, declared by user.

map(map_function, map_iterdata, chunksize=None, extra_args=None, extra_env=None, runtime_memory=None, obj_chunk_size=None, obj_chunk_number=None, obj_newline='\n', timeout=None, include_modules=[], exclude_modules=[])#

Spawn multiple function activations based on the items of an input list.

Parameters:
  • map_function (Callable) – The function to map over the data

  • map_iterdata (List[List[Any] | Tuple[Any, ...] | Dict[str, Any]]) – An iterable of input data (e.g python list).

  • chunksize (int | None) – Split map_iteradata in chunks of this size. Lithops spawns 1 worker per resulting chunk

  • extra_args (List[Any] | Tuple[Any, ...] | Dict[str, Any] | None) – Additional arguments to pass to each map_function activation

  • extra_env (Dict[str, str] | None) – Additional environment variables for function environment

  • runtime_memory (int | None) – Memory (in MB) to use to run the functions

  • obj_chunk_size (int | None) – Used for data processing. Chunk size to split each object in bytes. Must be >= 1MiB. ‘None’ for processing the whole file in one function activation

  • obj_chunk_number (int | None) – Used for data processing. Number of chunks to split each object. ‘None’ for processing the whole file in one function activation. chunk_n has prevalence over chunk_size if both parameters are set

  • obj_newline (str | None) – new line character for keeping line integrity of partitions. ‘None’ for disabling line integrity logic and get partitions of the exact same size in the functions

  • timeout (int | None) – Max time per function activation (seconds)

  • include_modules (List[str] | None) – Explicitly pickle these dependencies. All required dependencies are pickled if default empty list. No one dependency is pickled if it is explicitly set to None

  • exclude_modules (List[str] | None) – Explicitly keep these modules from pickled dependencies. It is not taken into account if you set include_modules.

Returns:

A list with size len(map_iterdata) of futures for each job (Futures are also internally stored by Lithops).

Return type:

FuturesList

map_reduce(map_function, map_iterdata, reduce_function, chunksize=None, extra_args=None, extra_args_reduce=None, extra_env=None, map_runtime_memory=None, reduce_runtime_memory=None, timeout=None, obj_chunk_size=None, obj_chunk_number=None, obj_newline='\n', obj_reduce_by_key=False, spawn_reducer=20, include_modules=[], exclude_modules=[])#

Map the map_function over the data and apply the reduce_function across all futures.

Parameters:
  • map_function (Callable) – The function to map over the data

  • map_iterdata (List[List[Any] | Tuple[Any, ...] | Dict[str, Any]]) – An iterable of input data

  • reduce_function (Callable) – The function to reduce over the futures

  • chunksize (int | None) – Split map_iteradata in chunks of this size. Lithops spawns 1 worker per resulting chunk. Default 1

  • extra_args (List[Any] | Tuple[Any, ...] | Dict[str, Any] | None) – Additional arguments to pass to function activation. Default None

  • extra_args_reduce (List[Any] | Tuple[Any, ...] | Dict[str, Any] | None) – Additional arguments to pass to the reduce function activation. Default None

  • extra_env (Dict[str, str] | None) – Additional environment variables for action environment. Default None

  • map_runtime_memory (int | None) – Memory to use to run the map function. Default None (loaded from config)

  • reduce_runtime_memory (int | None) – Memory to use to run the reduce function. Default None (loaded from config)

  • timeout (int | None) – Time that the functions have to complete their execution before raising a timeout

  • obj_chunk_size (int | None) – the size of the data chunks to split each object. ‘None’ for processing the whole file in one function activation

  • obj_chunk_number (int | None) – Number of chunks to split each object. ‘None’ for processing the whole file in one function activation

  • obj_newline (str | None) – New line character for keeping line integrity of partitions. ‘None’ for disabling line integrity logic and get partitions of the exact same size in the functions

  • obj_reduce_by_key (bool | None) – Set one reducer per object after running the partitioner. By default there is one reducer for all the objects

  • spawn_reducer (int | None) – Percentage of done map functions before spawning the reduce function

  • include_modules (List[str] | None) – Explicitly pickle these dependencies.

  • exclude_modules (List[str] | None) – Explicitly keep these modules from pickled dependencies.

Returns:

A list with size len(map_iterdata) of futures.

Return type:

FuturesList

plot(fs=None, dst=None, figsize=(10, 6))#

Creates timeline and histogram of the current execution in dst_dir.

Parameters:
  • fs (ResponseFuture | FuturesList | List[ResponseFuture] | None) – list of futures.

  • dst (str | None) – destination path to save .png plots.

  • figsize (tuple | None) –

wait(fs=None, throw_except=True, return_when=100, download_results=False, timeout=None, threadpool_size=64, wait_dur_sec=None, show_progressbar=True)#

Wait for the Future instances (possibly created by different Executor instances) given by fs to complete. Returns a named 2-tuple of sets. The first set, named done, contains the futures that completed (finished or cancelled futures) before the wait completed. The second set, named not_done, contains the futures that did not complete (pending or running futures). timeout can be used to control the maximum number of seconds to wait before returning.

Parameters:
  • fs (ResponseFuture | FuturesList | List[ResponseFuture] | None) – Futures list. Default None

  • throw_except (bool | None) – Re-raise exception if call raised. Default True

  • return_when (Any | None) – Percentage of done futures

  • download_results (bool | None) – Download results. Default false (Only get statuses)

  • timeout (int | None) – Timeout of waiting for results

  • threadpool_size (int | None) – Number of threads to use. Default 64

  • wait_dur_sec (int | None) – Time interval between each check. Default 1 second

  • show_progressbar (bool | None) – whether or not to show the progress bar.

Returns:

(fs_done, fs_notdone) where fs_done is a list of futures that have completed and fs_notdone is a list of futures that have not completed.

Return type:

Tuple[FuturesList, FuturesList]

class lithops.executors.LocalhostExecutor(config=None, config_file=None, storage=None, monitoring=None, log_level=False, **kwargs)#

Bases: FunctionExecutor

Initialize a LocalhostExecutor class.

Parameters:
  • config (Dict[str, Any] | None) – Settings passed in here will override those in config file.

  • config_file (str | None) – Path to the lithops config file

  • storage (str | None) – Name of the storage backend to use.

  • monitoring (str | None) – monitoring system.

  • log_level (str | None) – log level to use during the execution.

  • kwargs (Dict[str, Any] | None) – Any parameter that can be set in the compute backend section of the config file, can be set here

class lithops.executors.ServerlessExecutor(config=None, config_file=None, backend=None, storage=None, monitoring=None, log_level=False, **kwargs)#

Bases: FunctionExecutor

Initialize a ServerlessExecutor class.

Parameters:
  • config (Dict[str, Any] | None) – Settings passed in here will override those in config file

  • config_file (str | None) – Path to the lithops config file

  • backend (str | None) – Name of the serverless compute backend to use

  • storage (str | None) – Name of the storage backend to use

  • monitoring (str | None) – monitoring system

  • log_level (str | None) – log level to use during the execution

  • kwargs (Dict[str, Any] | None) – Any parameter that can be set in the compute backend section of the config file, can be set here

class lithops.executors.StandaloneExecutor(config=None, config_file=None, backend=None, storage=None, monitoring=None, log_level=False, **kwargs)#

Bases: FunctionExecutor

Initialize a StandaloneExecutor class.

Parameters:
  • config (Dict[str, Any] | None) – Settings passed in here will override those in config file

  • config_file (str | None) – Path to the lithops config file

  • backend (str | None) – Name of the standalone compute backend to use

  • storage (str | None) – Name of the storage backend to use

  • monitoring (str | None) – monitoring system

  • log_level (str | None) – log level to use during the execution

  • kwargs (Dict[str, Any] | None) –