Lithops Futures API

The primary object in Lithops is the executor. The standard way to get everything set up is to import lithops, and create an instance of one of the available modes of executions.

Lithops is shipped with 3 modes of execution: Localhost, Serverless and Standalone. In this sense, each mode of execution has its own executor class:

  • lithops.LocalhostExecutor(): Executor that uses local processes to run jobs in the local machine.

  • lithops.ServerlessExecutor(): Executor to run jobs in one of the available serverless compute backends.

  • lithops.StandaloneExecutor(): Executor to run jobs in one of the available standalone compute backends.

Additionally, Lithops includes a top-level function executor, which encompasses all three previous executors:

  • lithops.FunctionExecutor(): Generic executor that will use the configuration to determine its mode of execution, i.e., based on the configuration it will be localhost, serverless or standalone.

By default, the executor load the configuration from the config file. Alternatively, you can pass the configuration with a python dictionary. In any case, note that all the parameters set in the executor will overwrite those set in the configuration.

Futures API Reference

class lithops.executors.FunctionExecutor(mode=None, config=None, backend=None, storage=None, runtime=None, runtime_memory=None, monitoring=None, max_workers=None, worker_processes=None, remote_invoker=None, log_level=False)

Bases: object

Executor abstract class that contains the common logic for the Localhost, Serverless and Standalone executors

Parameters
  • mode (Optional[str]) – Execution mode. One of: localhost, serverless or standalone

  • config (Optional[Dict[str, Any]]) – Settings passed in here will override those in lithops_config

  • backend (Optional[str]) – Compute backend to run the functions

  • storage (Optional[str]) – Storage backend to store Lithops data

  • runtime (Optional[str]) – Name of the runtime to run the functions

  • runtime_memory (Optional[int]) – Memory (in MB) to use to run the functions

  • monitoring (Optional[str]) – Monitoring system implementation. One of: storage, rabbitmq

  • max_workers (Optional[int]) – Max number of parallel workers

  • worker_processes (Optional[int]) – Worker granularity, number of concurrent/parallel processes in each worker

  • remote_invoker (Optional[bool]) – Spawn a function that will perform the actual job invocation (True/False)

  • log_level (Optional[str]) – Log level printing (INFO, DEBUG, …). Set it to None to hide all logs. If this is param is set, all logging params in config are disabled

call_async(func, data, extra_env=None, runtime_memory=None, timeout=None, include_modules=[], exclude_modules=[])

For running one function execution asynchronously.

Parameters
  • func (collections.abc.Callable) – The function to map over the data.

  • data (Union[List[Any], Tuple[Any, ...], Dict[str, Any]]) – Input data. Arguments can be passed as a list or tuple, or as a dictionary for keyword arguments.

  • extra_env (Optional[Dict]) – Additional env variables for function environment.

  • runtime_memory (Optional[int]) – Memory to use to run the function.

  • timeout (Optional[int]) – Time that the function has to complete its execution before raising a timeout.

  • include_modules (Optional[List]) – Explicitly pickle these dependencies.

  • exclude_modules (Optional[List]) – Explicitly keep these modules from pickled dependencies.

Returns

Response future.

Return type

lithops.future.ResponseFuture

clean(fs=None, cs=None, clean_cloudobjects=True, clean_fn=False, force=False)

Deletes all the temp files from storage. These files include the function, the data serialization and the function invocation results. It can also clean cloudobjects.

Parameters
  • fs (Optional[Union[lithops.future.ResponseFuture, List[lithops.future.ResponseFuture]]]) – List of futures to clean

  • cs (Optional[List[lithops.storage.utils.CloudObject]]) – List of cloudobjects to clean

  • clean_cloudobjects (Optional[bool]) – Delete all cloudobjects created with this executor

  • clan_fn – Delete cached functions in this executor

  • force (Optional[bool]) – Clean all future objects even if they have not benn completed

  • clean_fn (Optional[bool]) –

get_result(fs=None, throw_except=True, timeout=None, threadpool_size=64, wait_dur_sec=1)

For getting the results from all function activations

Parameters
  • fs (Optional[Union[lithops.future.ResponseFuture, lithops.utils.FuturesList, List[lithops.future.ResponseFuture]]]) – Futures list. Default None

  • throw_except (Optional[bool]) – Reraise exception if call raised. Default True.

  • timeout (Optional[int]) – Timeout for waiting for results.

  • threadpool_size (Optional[int]) – Number of threads to use. Default 128

  • wait_dur_sec (Optional[int]) – Time interval between each check.

Returns

The result of the future/s

job_summary(cloud_objects_n=0)

Logs information of a job executed by the calling function executor. currently supports: code_engine, ibm_vpc and ibm_cf.

Parameters

cloud_objects_n (Optional[int]) – number of cloud object used in COS, declared by user.

map(map_function, map_iterdata, chunksize=None, extra_args=None, extra_env=None, runtime_memory=None, obj_chunk_size=None, obj_chunk_number=None, timeout=None, include_modules=[], exclude_modules=[])

Spawn multiple function activations based on the items of an input list.

Parameters
  • map_function (collections.abc.Callable) – The function to map over the data

  • map_iterdata (List[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]]) – An iterable of input data (e.g python list).

  • chunksize (Optional[int]) – Split map_iteradata in chunks of this size. Lithops spawns 1 worker per resulting chunk

  • extra_args (Optional[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]]) – Additional arguments to pass to each map_function activation

  • extra_env (Optional[Dict[str, str]]) – Additional environment variables for function environment

  • runtime_memory (Optional[int]) – Memory (in MB) to use to run the functions

  • obj_chunk_size (Optional[int]) – Used for data processing. Chunk size to split each object in bytes. Must be >= 1MiB. ‘None’ for processing the whole file in one function activation

  • obj_chunk_number (Optional[int]) – Used for data processing. Number of chunks to split each object. ‘None’ for processing the whole file in one function activation. chunk_n has prevalence over chunk_size if both parameters are set

  • timeout (Optional[int]) – Max time per function activation (seconds)

  • include_modules (Optional[List[str]]) – Explicitly pickle these dependencies. All required dependencies are pickled if default empty list. No one dependency is pickled if it is explicitly set to None

  • exclude_modules (Optional[List[str]]) – Explicitly keep these modules from pickled dependencies. It is not taken into account if you set include_modules.

Returns

A list with size len(map_iterdata) of futures for each job (Futures are also internally stored by Lithops).

Return type

lithops.utils.FuturesList

map_reduce(map_function, map_iterdata, reduce_function, chunksize=None, extra_args=None, extra_env=None, map_runtime_memory=None, reduce_runtime_memory=None, obj_chunk_size=None, obj_chunk_number=None, timeout=None, reducer_one_per_object=False, spawn_reducer=20, include_modules=[], exclude_modules=[])

Map the map_function over the data and apply the reduce_function across all futures.

Parameters
  • map_function (collections.abc.Callable) – The function to map over the data

  • map_iterdata (List[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]]) – An iterable of input data

  • reduce_function (collections.abc.Callable) – The function to reduce over the futures

  • chunksize (Optional[int]) – Split map_iteradata in chunks of this size. Lithops spawns 1 worker per resulting chunk. Default 1

  • extra_args (Optional[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]]) – Additional arguments to pass to function activation. Default None

  • extra_env (Optional[Dict[str, str]]) – Additional environment variables for action environment. Default None

  • map_runtime_memory (Optional[int]) – Memory to use to run the map function. Default None (loaded from config)

  • reduce_runtime_memory (Optional[int]) – Memory to use to run the reduce function. Default None (loaded from config)

  • obj_chunk_size (Optional[int]) – the size of the data chunks to split each object. ‘None’ for processing the whole file in one function activation

  • obj_chunk_number (Optional[int]) – Number of chunks to split each object. ‘None’ for processing the whole file in one function activation

  • timeout (Optional[int]) – Time that the functions have to complete their execution before raising a timeout

  • reducer_one_per_object (Optional[bool]) – Set one reducer per object after running the partitioner

  • spawn_reducer (Optional[int]) – Percentage of done map functions before spawning the reduce function

  • include_modules (Optional[List[str]]) – Explicitly pickle these dependencies.

  • exclude_modules (Optional[List[str]]) – Explicitly keep these modules from pickled dependencies.

Returns

A list with size len(map_iterdata) of futures.

Return type

lithops.utils.FuturesList

plot(fs=None, dst=None)

Creates timeline and histogram of the current execution in dst_dir.

Parameters
  • fs (Optional[Union[lithops.future.ResponseFuture, lithops.utils.FuturesList, List[lithops.future.ResponseFuture]]]) – list of futures.

  • dst (Optional[str]) – destination path to save .png plots.

wait(fs=None, throw_except=True, return_when=100, download_results=False, timeout=None, threadpool_size=64, wait_dur_sec=1)

Wait for the Future instances (possibly created by different Executor instances) given by fs to complete. Returns a named 2-tuple of sets. The first set, named done, contains the futures that completed (finished or cancelled futures) before the wait completed. The second set, named not_done, contains the futures that did not complete (pending or running futures). timeout can be used to control the maximum number of seconds to wait before returning.

Parameters
  • fs (Optional[Union[lithops.future.ResponseFuture, lithops.utils.FuturesList, List[lithops.future.ResponseFuture]]]) – Futures list. Default None

  • throw_except (Optional[bool]) – Re-raise exception if call raised. Default True

  • return_when (Optional[Any]) – Percentage of done futures

  • download_results (Optional[bool]) – Download results. Default false (Only get statuses)

  • timeout (Optional[int]) – Timeout of waiting for results

  • threadpool_size (Optional[int]) – Number of threads to use. Default 64

  • wait_dur_sec (Optional[int]) – Time interval between each check

Returns

(fs_done, fs_notdone) where fs_done is a list of futures that have completed and fs_notdone is a list of futures that have not completed.

Return type

Tuple[lithops.utils.FuturesList, lithops.utils.FuturesList]

class lithops.executors.LocalhostExecutor(config=None, runtime=None, storage=None, worker_processes=None, monitoring=None, log_level=False)

Bases: lithops.executors.FunctionExecutor

Initialize a LocalhostExecutor class.

Parameters
  • config (Optional[Dict[str, Any]]) – Settings passed in here will override those in config file.

  • runtime (Optional[int]) – Runtime name to use.

  • storage (Optional[str]) – Name of the storage backend to use.

  • worker_processes (Optional[int]) – Worker granularity, number of concurrent/parallel processes in each worker

  • monitoring (Optional[str]) – monitoring system.

  • log_level (Optional[str]) – log level to use during the execution.

class lithops.executors.ServerlessExecutor(config=None, runtime=None, runtime_memory=None, backend=None, storage=None, max_workers=None, worker_processes=None, monitoring=None, remote_invoker=None, log_level=False)

Bases: lithops.executors.FunctionExecutor

Initialize a ServerlessExecutor class.

Parameters
  • config (Optional[Dict[str, Any]]) – Settings passed in here will override those in config file

  • runtime (Optional[str]) – Runtime name to use

  • runtime_memory (Optional[int]) – memory to use in the runtime

  • backend (Optional[str]) – Name of the serverless compute backend to use

  • storage (Optional[str]) – Name of the storage backend to use

  • max_workers (Optional[int]) – Max number of concurrent workers

  • worker_processes (Optional[int]) – Worker granularity, number of concurrent/parallel processes in each worker

  • monitoring (Optional[str]) – monitoring system

  • remote_invoker (Optional[bool]) – Spawn a function that will perform the actual job invocation (True/False)

  • log_level (Optional[str]) – log level to use during the execution

class lithops.executors.StandaloneExecutor(config=None, runtime=None, backend=None, storage=None, max_workers=None, worker_processes=None, monitoring=None, log_level=False)

Bases: lithops.executors.FunctionExecutor

Initialize a StandaloneExecutor class.

Parameters
  • config (Optional[Dict[str, Any]]) – Settings passed in here will override those in config file

  • runtime (Optional[str]) – Runtime name to use

  • backend (Optional[str]) – Name of the standalone compute backend to use

  • storage (Optional[str]) – Name of the storage backend to use

  • max_workers (Optional[int]) – Max number of concurrent workers

  • worker_processes (Optional[int]) – Worker granularity, number of concurrent/parallel processes in each worker

  • monitoring (Optional[str]) – monitoring system

  • log_level (Optional[str]) – log level to use during the execution