Apache Airflow#
Airflow is a platform created by the community to programmatically author, schedule and monitor workflows. The Airflow/Lithops integration allows Airflow users to keep all of their Ray code in Python functions and define task dependencies by moving data through python functions.
Refer to the integration repository .
Examples#
Airflow/Lithops example:
Define a function in a separate file (my_functions.py
):
def add(x, y):
return x + y
Import the Lithops operator and the function, and create the DAG to execute:
from airflow.operators.python_operator import PythonOperator
from airflow.operators.lithops_airflow_plugin import LithopsMapOperator
from my_functions import add
args = {
'owner': 'lithops',
'start_date': days_ago(2),
}
dag = DAG(
dag_id='LithopsTest',
default_args=args,
schedule_interval=None,
)
gen_list = PythonOperator(
task_id='gen_list',
python_callable=lambda: [random.randint(1, 100) for _ in range(10)],
dag=dag
)
mult_num_map = LithopsMapOperator(
task_id='mult_num_map',
map_function=example_functions.add_num,
iterdata_from_task={'a': 'gen_list'},
extra_args={'b': 10},
dag=dag
)
gen_list >> mult_num_map