Combining hourly and daily tasks in the same Airflow DAG

Angelos Alexopoulos
2 min readMar 1, 2022

This is my attempt to combine daily and hourly executed tasks in the same airflow dag. Of course, the first question that comes to mind is why we could ever need such a DAG? The truth is that this kind of behavior should be extremely limited and not preferred at all! The best practice approach is to always differentiate DAGs if they have separate schedules so that it is clear to everyone what is happening.

However, in this short article, I am describing how I have combined a dag with 3 tasks: the task t1 that executes each hour and the tasks t2 and t3 that execute once per day at 10 and 11 o’clock.

To accomplish that we can use the ShortCircuitOperator that allows a workflow to continue only if a condition is met. Otherwise, the workflow “short-circuits” and downstream tasks are skipped.

The ShortCircuitOperator calls the check_execution_date and if we are in the desired execution hour we execute the task. Otherwise, we return False and short-circuit the other tasks.

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
daily_tasks = {}

def check_execution_date(task_refresh_hour, **kwargs):
if kwargs['execution_date'].hour <= task_refresh_hour < kwargs['next_execution_date'].hour:
return True
else:
return False


args = {
'owner': 'airflow',
'start_date': datetime(2022, 1, 1) # always a best practice a static date
}

dag = DAG(
'hourly_dag',
schedule_interval="@hourly",
default_args=args
)
# t1 Task executes each hour
t1 = DummyOperator(
task_id='hourly_task',
dag=dag
)
# But t2 and t3 Tasks execute only once per day
t2 = DummyOperator(
task_id='daily_task',
dag=dag
)

t3 = DummyOperator(
task_id='daily_task2',
dag=dag
)


refresh_hour = 10 # Run at 10:00 daily
daily_tasks[refresh_hour] = ShortCircuitOperator(
task_id=f'task_runs_daily_{refresh_hour}',
python_callable=check_execution_date,
provide_context=True,
op_kwargs={'task_refresh_hour': refresh_hour},
dag=dag
)
group2 = DummyOperator(
task_id=f'group_hour_{refresh_hour}',
dag=dag)
daily_tasks[refresh_hour] >> group2 >> t2
refresh_hour = 11 # Run at 11:00 daily
daily_tasks[refresh_hour] = ShortCircuitOperator(
task_id=f'task_runs_daily_{refresh_hour}',
python_callable=check_execution_date,
provide_context=True,
op_kwargs={'task_refresh_hour': refresh_hour},
dag=dag
)
group3 = DummyOperator(
task_id=f'group_hour_{refresh_hour}',
dag=dag)
daily_tasks[refresh_hour] >> group3 >> t3

As a bonus point, we have also added a grouping task that is useful in cases we need to run in parallel a lot of daily tasks.

For e.g. we can do something like the following

daily_tasks[refresh_hour] >> group >> [t3,t4,t5,t6]

--

--