Combining hourly and daily tasks in the same Airflow DAG

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
daily_tasks = {}

def check_execution_date(task_refresh_hour, **kwargs):
if kwargs['execution_date'].hour <= task_refresh_hour < kwargs['next_execution_date'].hour:
return True
else:
return False


args = {
'owner': 'airflow',
'start_date': datetime(2022, 1, 1) # always a best practice a static date
}

dag = DAG(
'hourly_dag',
schedule_interval="@hourly",
default_args=args
)
# t1 Task executes each hour
t1 = DummyOperator(
task_id='hourly_task',
dag=dag
)
# But t2 and t3 Tasks execute only once per day
t2 = DummyOperator(
task_id='daily_task',
dag=dag
)

t3 = DummyOperator(
task_id='daily_task2',
dag=dag
)


refresh_hour = 10 # Run at 10:00 daily
daily_tasks[refresh_hour] = ShortCircuitOperator(
task_id=f'task_runs_daily_{refresh_hour}',
python_callable=check_execution_date,
provide_context=True,
op_kwargs={'task_refresh_hour': refresh_hour},
dag=dag
)
group2 = DummyOperator(
task_id=f'group_hour_{refresh_hour}',
dag=dag)
daily_tasks[refresh_hour] >> group2 >> t2
refresh_hour = 11 # Run at 11:00 daily
daily_tasks[refresh_hour] = ShortCircuitOperator(
task_id=f'task_runs_daily_{refresh_hour}',
python_callable=check_execution_date,
provide_context=True,
op_kwargs={'task_refresh_hour': refresh_hour},
dag=dag
)
group3 = DummyOperator(
task_id=f'group_hour_{refresh_hour}',
dag=dag)
daily_tasks[refresh_hour] >> group3 >> t3
daily_tasks[refresh_hour] >> group >> [t3,t4,t5,t6]

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store