Lithops Futures API#
The primary object in Lithops is the executor. The standard way to get everything set up is to import lithops, and create an instance of one of the available modes of executions.
Lithops is shipped with 3 modes of execution: Localhost, Serverless and Standalone. In this sense, each mode of execution has its own executor class:
lithops.LocalhostExecutor(): Executor that uses local processes to run jobs in the local machine.
lithops.ServerlessExecutor(): Executor to run jobs in one of the available serverless compute backends.
lithops.StandaloneExecutor(): Executor to run jobs in one of the available standalone compute backends.
Additionally, Lithops includes a top-level function executor, which encompasses all three previous executors:
lithops.FunctionExecutor(): Generic executor that will use the configuration to determine its mode of execution, i.e., based on the configuration it will be localhost, serverless or standalone.
By default, the executor load the configuration from the config file. Alternatively, you can pass the configuration with a python dictionary. In any case, note that all the parameters set in the executor will overwrite those set in the configuration.
Futures API Reference#
- class lithops.executors.FunctionExecutor(mode=None, config=None, config_file=None, backend=None, storage=None, monitoring=None, log_level=False, **kwargs)#
Bases:
object
Executor abstract class that contains the common logic for the Localhost, Serverless and Standalone executors
- Parameters:
mode (str | None) – Execution mode. One of: localhost, serverless or standalone
config (Dict[str, Any] | None) – Settings passed in here will override those in lithops_config
config_file (str | None) – Path to the lithops config file
backend (str | None) – Compute backend to run the functions
storage (str | None) – Storage backend to store Lithops data
monitoring (str | None) – Monitoring system implementation. One of: storage, rabbitmq
log_level (str | None) – Log level printing (INFO, DEBUG, …). Set it to None to hide all logs. If this is param is set, all logging params in config are disabled
kwargs (Dict[str, Any] | None) – Any parameter that can be set in the compute backend section of the config file, can be set here
- call_async(func, data, extra_env=None, runtime_memory=None, timeout=None, include_modules=[], exclude_modules=[])#
For running one function execution asynchronously.
- Parameters:
func (Callable) – The function to map over the data.
data (List[Any] | Tuple[Any, ...] | Dict[str, Any]) – Input data. Arguments can be passed as a list or tuple, or as a dictionary for keyword arguments.
extra_env (Dict | None) – Additional env variables for function environment.
runtime_memory (int | None) – Memory to use to run the function.
timeout (int | None) – Time that the function has to complete its execution before raising a timeout.
include_modules (List | None) – Explicitly pickle these dependencies.
exclude_modules (List | None) – Explicitly keep these modules from pickled dependencies.
- Returns:
Response future.
- Return type:
ResponseFuture
- clean(fs=None, cs=None, clean_cloudobjects=True, clean_fn=False, force=False, on_exit=False)#
Deletes all the temp files from storage. These files include the function, the data serialization and the function invocation results. It can also clean cloudobjects.
- Parameters:
fs (ResponseFuture | List[ResponseFuture] | None) – List of futures to clean
cs (List[CloudObject] | None) – List of cloudobjects to clean
clean_cloudobjects (bool | None) – Delete all cloudobjects created with this executor
clean_fn (bool | None) – Delete cached functions in this executor
force (bool | None) – Clean all future objects even if they have not benn completed
on_exit (bool | None) –
- Parma on_exit:
do not print logs on exit
- get_result(fs=None, throw_except=True, timeout=None, threadpool_size=64, wait_dur_sec=None, show_progressbar=True)#
For getting the results from all function activations
- Parameters:
fs (ResponseFuture | FuturesList | List[ResponseFuture] | None) – Futures list. Default None
throw_except (bool | None) – Reraise exception if call raised. Default True.
timeout (int | None) – Timeout for waiting for results.
threadpool_size (int | None) – Number of threads to use. Default 128
wait_dur_sec (int | None) – Time interval between each check. Default 1 second
show_progressbar (bool | None) – whether or not to show the progress bar.
- Returns:
The result of the future/s
- job_summary(cloud_objects_n=0)#
Logs information of a job executed by the calling function executor. currently supports: code_engine, ibm_vpc and ibm_cf.
- Parameters:
cloud_objects_n (int | None) – number of cloud object used in COS, declared by user.
- map(map_function, map_iterdata, chunksize=None, extra_args=None, extra_env=None, runtime_memory=None, obj_chunk_size=None, obj_chunk_number=None, obj_newline='\n', timeout=None, include_modules=[], exclude_modules=[])#
Spawn multiple function activations based on the items of an input list.
- Parameters:
map_function (Callable) – The function to map over the data
map_iterdata (List[List[Any] | Tuple[Any, ...] | Dict[str, Any]]) – An iterable of input data (e.g python list).
chunksize (int | None) – Split map_iteradata in chunks of this size. Lithops spawns 1 worker per resulting chunk
extra_args (List[Any] | Tuple[Any, ...] | Dict[str, Any] | None) – Additional arguments to pass to each map_function activation
extra_env (Dict[str, str] | None) – Additional environment variables for function environment
runtime_memory (int | None) – Memory (in MB) to use to run the functions
obj_chunk_size (int | None) – Used for data processing. Chunk size to split each object in bytes. Must be >= 1MiB. ‘None’ for processing the whole file in one function activation
obj_chunk_number (int | None) – Used for data processing. Number of chunks to split each object. ‘None’ for processing the whole file in one function activation. chunk_n has prevalence over chunk_size if both parameters are set
obj_newline (str | None) – new line character for keeping line integrity of partitions. ‘None’ for disabling line integrity logic and get partitions of the exact same size in the functions
timeout (int | None) – Max time per function activation (seconds)
include_modules (List[str] | None) – Explicitly pickle these dependencies. All required dependencies are pickled if default empty list. No one dependency is pickled if it is explicitly set to None
exclude_modules (List[str] | None) – Explicitly keep these modules from pickled dependencies. It is not taken into account if you set include_modules.
- Returns:
A list with size len(map_iterdata) of futures for each job (Futures are also internally stored by Lithops).
- Return type:
FuturesList
- map_reduce(map_function, map_iterdata, reduce_function, chunksize=None, extra_args=None, extra_args_reduce=None, extra_env=None, map_runtime_memory=None, reduce_runtime_memory=None, timeout=None, obj_chunk_size=None, obj_chunk_number=None, obj_newline='\n', obj_reduce_by_key=False, spawn_reducer=20, include_modules=[], exclude_modules=[])#
Map the map_function over the data and apply the reduce_function across all futures.
- Parameters:
map_function (Callable) – The function to map over the data
map_iterdata (List[List[Any] | Tuple[Any, ...] | Dict[str, Any]]) – An iterable of input data
reduce_function (Callable) – The function to reduce over the futures
chunksize (int | None) – Split map_iteradata in chunks of this size. Lithops spawns 1 worker per resulting chunk. Default 1
extra_args (List[Any] | Tuple[Any, ...] | Dict[str, Any] | None) – Additional arguments to pass to function activation. Default None
extra_args_reduce (List[Any] | Tuple[Any, ...] | Dict[str, Any] | None) – Additional arguments to pass to the reduce function activation. Default None
extra_env (Dict[str, str] | None) – Additional environment variables for action environment. Default None
map_runtime_memory (int | None) – Memory to use to run the map function. Default None (loaded from config)
reduce_runtime_memory (int | None) – Memory to use to run the reduce function. Default None (loaded from config)
timeout (int | None) – Time that the functions have to complete their execution before raising a timeout
obj_chunk_size (int | None) – the size of the data chunks to split each object. ‘None’ for processing the whole file in one function activation
obj_chunk_number (int | None) – Number of chunks to split each object. ‘None’ for processing the whole file in one function activation
obj_newline (str | None) – New line character for keeping line integrity of partitions. ‘None’ for disabling line integrity logic and get partitions of the exact same size in the functions
obj_reduce_by_key (bool | None) – Set one reducer per object after running the partitioner. By default there is one reducer for all the objects
spawn_reducer (int | None) – Percentage of done map functions before spawning the reduce function
include_modules (List[str] | None) – Explicitly pickle these dependencies.
exclude_modules (List[str] | None) – Explicitly keep these modules from pickled dependencies.
- Returns:
A list with size len(map_iterdata) of futures.
- Return type:
FuturesList
- plot(fs=None, dst=None, figsize=(10, 6))#
Creates timeline and histogram of the current execution in dst_dir.
- Parameters:
fs (ResponseFuture | FuturesList | List[ResponseFuture] | None) – list of futures.
dst (str | None) – destination path to save .png plots.
figsize (tuple | None) –
- wait(fs=None, throw_except=True, return_when=100, download_results=False, timeout=None, threadpool_size=64, wait_dur_sec=None, show_progressbar=True)#
Wait for the Future instances (possibly created by different Executor instances) given by fs to complete. Returns a named 2-tuple of sets. The first set, named done, contains the futures that completed (finished or cancelled futures) before the wait completed. The second set, named not_done, contains the futures that did not complete (pending or running futures). timeout can be used to control the maximum number of seconds to wait before returning.
- Parameters:
fs (ResponseFuture | FuturesList | List[ResponseFuture] | None) – Futures list. Default None
throw_except (bool | None) – Re-raise exception if call raised. Default True
return_when (Any | None) – Percentage of done futures
download_results (bool | None) – Download results. Default false (Only get statuses)
timeout (int | None) – Timeout of waiting for results
threadpool_size (int | None) – Number of threads to use. Default 64
wait_dur_sec (int | None) – Time interval between each check. Default 1 second
show_progressbar (bool | None) – whether or not to show the progress bar.
- Returns:
(fs_done, fs_notdone) where fs_done is a list of futures that have completed and fs_notdone is a list of futures that have not completed.
- Return type:
Tuple[FuturesList, FuturesList]
- class lithops.executors.LocalhostExecutor(config=None, config_file=None, storage=None, monitoring=None, log_level=False, **kwargs)#
Bases:
FunctionExecutor
Initialize a LocalhostExecutor class.
- Parameters:
config (Dict[str, Any] | None) – Settings passed in here will override those in config file.
config_file (str | None) – Path to the lithops config file
storage (str | None) – Name of the storage backend to use.
monitoring (str | None) – monitoring system.
log_level (str | None) – log level to use during the execution.
kwargs (Dict[str, Any] | None) – Any parameter that can be set in the compute backend section of the config file, can be set here
- class lithops.executors.ServerlessExecutor(config=None, config_file=None, backend=None, storage=None, monitoring=None, log_level=False, **kwargs)#
Bases:
FunctionExecutor
Initialize a ServerlessExecutor class.
- Parameters:
config (Dict[str, Any] | None) – Settings passed in here will override those in config file
config_file (str | None) – Path to the lithops config file
backend (str | None) – Name of the serverless compute backend to use
storage (str | None) – Name of the storage backend to use
monitoring (str | None) – monitoring system
log_level (str | None) – log level to use during the execution
kwargs (Dict[str, Any] | None) – Any parameter that can be set in the compute backend section of the config file, can be set here
- class lithops.executors.StandaloneExecutor(config=None, config_file=None, backend=None, storage=None, monitoring=None, log_level=False, **kwargs)#
Bases:
FunctionExecutor
Initialize a StandaloneExecutor class.
- Parameters:
config (Dict[str, Any] | None) – Settings passed in here will override those in config file
config_file (str | None) – Path to the lithops config file
backend (str | None) – Name of the standalone compute backend to use
storage (str | None) – Name of the storage backend to use
monitoring (str | None) – monitoring system
log_level (str | None) – log level to use during the execution
kwargs (Dict[str, Any] | None) –