Error received sigterm terminating subprocesses

I am using the PythonOperator to call a function that parallelizes data engineering process as an Airflow task. This is done simply by wrapping a simple function with a callable wrapper function ca...

I am using the PythonOperator to call a function that parallelizes data engineering process as an Airflow task. This is done simply by wrapping a simple function with a callable wrapper function called by Airflow.

def wrapper(ds, **kwargs):
    process_data()

process_data achieves parallelization using the multiprocessing module that spawns subprocesses. When I run process_data all by itself from jupyter notebook, it runs to the end with no problem. However when I run it using Airflow, the task fails and the log for the task shows something like this.

[2019-01-22 17:16:46,966] {models.py:1610} ERROR - Received SIGTERM. Terminating subprocesses.
[2019-01-22 17:16:46,969] {logging_mixin.py:95} WARNING - Process ForkPoolWorker-129:

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING - Traceback (most recent call last):

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING -   File "/home/airflow/.env/lib/python3.5/site-packages/airflow/models.py", line 1612, in signal_handler
    raise AirflowException("Task received SIGTERM signal")

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING - airflow.exceptions.AirflowException: Task received SIGTERM signal

[2019-01-22 17:16:46,993] {models.py:1610} ERROR - Received SIGTERM. Terminating subprocesses.
[2019-01-22 17:16:46,996] {logging_mixin.py:95} WARNING - Process ForkPoolWorker-133:

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - Traceback (most recent call last):

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/queues.py", line 343, in get
    res = self._reader.recv_bytes()

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/synchronize.py", line 99, in __exit__
    return self._semlock.__exit__(*args)

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/home/airflow/.env/lib/python3.5/site-packages/airflow/models.py", line 1612, in signal_handler
    raise AirflowException("Task received SIGTERM signal")

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - airflow.exceptions.AirflowException: Task received SIGTERM signal

[2019-01-22 17:16:47,086] {logging_mixin.py:95} INFO - file parsing and processing 256.07

[2019-01-22 17:17:12,938] {logging_mixin.py:95} INFO - combining and sorting 25.85

I am not quite sure why the task receives SIGTERM. My guess is that some higher level process is sending those to the subprocesses. What should I do to debug this issue?

Just noticed that towards the end of the log for the task, it clearly states that

airflow.exceptions.AirflowException: Task received SIGTERM signal
[2019-01-22 12:31:39,196] {models.py:1764} INFO - Marking task as FAILED.

Hi @potiuk , we have seen a similar issue since we upgraded from Airflow 2.0.2 to Airflow 2.2.3, for example in a BashOperator.

[2022-01-24, 16:18:18 UTC] {subprocess.py:93} INFO - Command exited with return code 0
[2022-01-24, 16:18:18 UTC] {taskinstance.py:1267} INFO - Marking task as SUCCESS. dag_id=XXX, task_id=XXX, execution_date=20220124T120500, start_date=20220124T161701, end_date=20220124T161818
[2022-01-24, 16:18:18 UTC] {local_task_job.py:211} WARNING - State of this instance has been externally set to success. Terminating instance.
[2022-01-24, 16:18:18 UTC] {process_utils.py:120} INFO - Sending Signals.SIGTERM to group 553436. PIDs of all processes in the group: [553436]
[2022-01-24, 16:18:18 UTC] {process_utils.py:75} INFO - Sending the signal Signals.SIGTERM to group 553436
[2022-01-24, 16:18:18 UTC] {taskinstance.py:1408} ERROR - Received SIGTERM. Terminating subprocesses.
[2022-01-24, 16:18:18 UTC] {subprocess.py:99} INFO - Sending SIGTERM signal to process group
[2022-01-24, 16:18:18 UTC] {process_utils.py:70} INFO - Process psutil.Process(pid=553436, status='terminated', exitcode=1, started='16:17:01') (553436) terminated with exit code 1

Besides, we have seen a related error

ProcessLookupError: [Errno 3] No such process
  File "airflow/executors/celery_executor.py", line 121, in _execute_in_fork
    args.func(args)
  File "airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "airflow/cli/commands/task_command.py", line 298, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "airflow/cli/commands/task_command.py", line 105, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "airflow/cli/commands/task_command.py", line 163, in _run_task_by_local_task_job
    run_job.run()
  File "airflow/jobs/base_job.py", line 245, in run
    self._execute()
  File "airflow/jobs/local_task_job.py", line 103, in _execute
    self.task_runner.start()
  File "airflow/task/task_runner/standard_task_runner.py", line 41, in start
    self.process = self._start_by_fork()
  File "airflow/task/task_runner/standard_task_runner.py", line 96, in _start_by_fork
    Sentry.flush()
  File "airflow/sentry.py", line 188, in flush
    sentry_sdk.flush()
  File "threading.py", line 306, in wait
    gotit = waiter.acquire(True, timeout)
  File "airflow/models/taskinstance.py", line 1409, in signal_handler
    self.task.on_kill()
  File "airflow/operators/bash.py", line 193, in on_kill
    self.subprocess_hook.send_sigterm()
  File "airflow/hooks/subprocess.py", line 101, in send_sigterm
    os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)

Cloud Composer 1 | Cloud Composer 2

This page provides troubleshooting steps and information for common workflow
issues.

Many DAG execution issues are caused by non-optimal environment performance.
You can optimize your Cloud Composer 2 environment by following the Optimize
environment performance and costs guide.

Some DAG executions issues might be caused by the Airflow scheduler
not working correctly or optimally. Please, follow
Scheduler troubleshooting instructions
to solve these issues.

Troubleshooting workflow

To begin troubleshooting:

  1. Check the Airflow logs.

    You can increase the logging level of Airflow by overriding the
    following Airflow configuration option.

    Airflow 2

    Section Key Value
    logging

    logging_level

    The default value is INFO.
    Set to DEBUG to get more verbosity in log messages.

    Airflow 1

    Section Key Value
    core

    logging_level

    The default value is INFO.
    Set to DEBUG to get more verbosity in log messages.

  2. Check the Monitoring Dashboard.

  3. Review the Cloud Monitoring.

  4. In Google Cloud console, check for errors on the pages for
    the components of your environment.

  5. In the Airflow web interface,
    check in the DAG’s Graph View for
    failed task instances.

    Section Key Value
    webserver dag_orientation LR, TB, RL, or BT

Debugging operator failures

To debug an operator failure:

  1. Check for task-specific errors.
  2. Check the Airflow logs.
  3. Review the Cloud Monitoring.
  4. Check operator-specific logs.
  5. Fix the errors.
  6. Upload the DAG to the dags/ folder.
  7. In the Airflow web interface,
    clear the past states for
    the DAG.
  8. Resume or run the DAG.

Troubleshooting task execution

Airflow is a distributed system with many entities like scheduler, executor,
workers that communicate to each other through a task queue and the Airflow
database and send signals (like SIGTERM). The following diagram shows an
overview of interconnections between Airflow components.

Interaction between Airflow components

Figure 1. Interaction between Airflow components (click to enlarge)

In a distributed system like Airflow there might be some network connectivity
issues, or the underlying infrastructure might experience intermittent issues;
this can lead to situations when tasks can fail and be rescheduled for
execution, or tasks might not be successfully completed (for exampe, Zombie
tasks, or tasks that got stuck in execution). Airflow has mechanisms to deal
with such situations and automatically resume the normal functioning. Following
sections explain common problems that occur during task execution by Airflow:
Zombie tasks, Poison Pills and SIGTERM signals.

Troubleshooting Zombie tasks

Airflow detects two kinds of mismatch between a task and a process that
executes the task:

  • Zombie tasks are tasks that are supposed to be running but are not running.
    This might happen if the task’s process was terminated or is not
    responding, if the Airflow worker didn’t report a task status in time
    because it is overloaded, or if VM where the task is executed was shut
    down. Airflow finds such tasks periodically, and either fails or retries
    the task, depending on the task’s settings.

  • Undead tasks are tasks that are not supposed to be running. Airflow finds
    such tasks periodically and terminates them.

The most common reason for Zombie tasks is the shortage of CPU and memory
resources in your environment’s cluster. As a result, an Airflow worker
might not be able to report the status of a task, or a sensor might be
interrupted abruptly. In this case, the scheduler marks a given task as
a Zombie task. To avoid Zombie tasks, assign more resources to your environment.

For more information about scaling your Cloud Composer environment,
see Scaling environment guide.
If you experience Zombie tasks, one possible solution is to increase the
timeout for Zombie tasks. As a result, the scheduler waits longer before it
considers a task as a Zombie. In this way, an Airflow worker has more time to
report the status of the task.

To increase the timeout for Zombie tasks, override the value
of the [scheduler]scheduler_zombie_task_threshold Airflow configuration
option:

Section Key Value Notes
scheduler scheduler_zombie_task_threshold New timeout (in seconds) The default value is 300

Troubleshooting Poison Pill

Poison Pill is a mechanism used by Airflow to shut down Airflow tasks.

Airflow uses Poison Pill in these situations:

  • When a scheduler terminates a task that did not complete on time.
  • When a task times out or is executed for too long.

When Airflow uses Poison Pill, you can see the following log entries in the logs of an Airflow worker that executed the task:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Taking the poison pill.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

Possible solutions:

  • Check the task code for errors that might cause it to run for too long.
  • (Cloud Composer 2)
    Increase the CPU and memory for Airflow
    workers, so that tasks execute faster.
  • Increase the value of the
    [celery_broker_transport_options]visibility-timeout Airflow configuration
    option.

    As a result, the scheduler waits longer for a task to be finished,
    before considering the task to be a Zombie task. This option is especially
    useful for time-consuming tasks that last many hours. If
    the value is too low (for example, 3 hours), then the scheduler considers
    tasks that run for 5 or 6 hours as «hanged» (Zombie tasks).

  • Increase the value of the [core]killed_task_cleanup_time Airflow
    configuration option.

    A longer value provides more time to Airflow workers to finish their tasks
    gracefully. If the value is too low, Airflow tasks might be interrupted
    abruptly, without enough time to finish their work gracefully.

Troubleshooting SIGTERM signals

SIGTERM signals are used by Linux,
Kubernetes, Airflow scheduler and Celery to terminate processes responsible for
running Airflow workers or Airflow tasks.

There might be several reasons why SIGTERM signals are sent in an environment:

  • A task became a Zombie task and must be stopped.

  • The scheduler discovered a duplicate of a task and sends Poison Pill and
    SIGTERM signals to the task to stop it.

  • In Horizontal Pod Autoscaling, the GKE
    Control Plane sends SIGTERM signals to remove Pods that are no longer
    needed.

  • The scheduler can send SIGTERM signals to DagFileProcessorManager process.
    Such SIGTERM signals are used by the Scheduler to manage
    DagFileProcessorManager process lifecycle and can be safely ignored.

    Example:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • Race condition between the heartbeat callback and exit callbacks in the
    local_task_job, which monitors the execution of the task. If the heartbeat
    detects that a task was marked as success, it cannot distinguish whether
    the task itself succeeded or that Airflow was told to consider the task
    successful. Nonetheless, it will terminate a task runner, without waiting
    for it to exit.

    Such SIGTERM signals can be safely ignored. The task is already in the
    successful state and the execution of the DAG run as a whole will not be
    affected.

    The log entry Received SIGTERM. is the only difference between the regular
    exit and the termination of task in the successful state.

    Race condition between the heartbeat and exit callbacks

    Figure 2. Race condition between the heartbeat and exit callbacks (click to enlarge)
  • An Airflow component uses more resources (CPU, memory) than permitted by the
    cluster node.

  • GKE service performs maintenance operations and
    sends SIGTERM signals to Pods that run on a node that is about to be upgraded.
    When a task instance is terminated with SIGTERM, you can see the following log
    entries in the logs of an Airflow worker that executed the task:

{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception

Possible solutions:

This issue happens when a VM that runs the task is out of memory. This is not
related to Airflow configurations but to the amount of memory available to the
VM.

Increasing the memory is dependent on the Cloud Composer version
that you use. For example:

  • In Cloud Composer 2, you can assign more CPU and memory resources to Airflow
    workers.

  • In case of Cloud Composer 1, you can re-create your environment using a
    machine type with more performance.

  • In both versions of Cloud Composer, you can lower the value of
    the [celery]worker_concurrency concurrency Airflow configuration option.
    This option determines how many tasks are executed concurrently by a given
    Airflow worker.

For more information about optimizing your Cloud Composer 2 environment, see
Optimize environment performance and costs

Cloud Logging queries to discover reasons for Pod restarts or evictions

Cloud Composer’s environments use GKE clusters as compute infrastructure
layer. In this section you will be able to find useful queries that can help to
find reasons for Airflow worker or Airflow scheduler restarts or evictions.

Queries presented below could be tuned in the following way:

  • you can specify timeline interesting for you in Cloud Logging;
    for example, the last 6 hours, 3 days, or you can define your custom time range

  • you should specify the Cloud Composer’s
    CLUSTER_NAME

  • you can also limit the search to a specific Pod by adding the
    POD_NAME

Discover restarted containers

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
  

Alternative query to limit the results to a specific Pod:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
  

Discover containers shutdown as a result of Out-of-Memory event

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    

Alternative query to limit the results to a specific Pod:

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Discover containers that stopped executing

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    

Alternative query to limit the results to a specific Pod:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Common issues

The following sections describe symptoms and potential fixes for some common
DAG issues.

Airflow task was interrupted by Negsignal.SIGKILL

Sometimes your task might be using more memory than Airflow worker is allocated.
In such a situation it might be interrupted by Negsignal.SIGKILL. The system
sends this signal to avoid further memory consumption which might impact
the execution of other Airflow tasks. In the Airflow worker’s log you might see
the following log entry:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Possible solutions:

  • Lower worker_concurrency of Airflow worker

  • In the case of Cloud Composer 2, increase memory of Airflow workers

  • In the case of Cloud Composer 1, upgrade to bigger machine type used in Composer cluster

  • Optimize your tasks to use less memory

Task fails without emitting logs due DAG parsing errors

Sometimes there might be subtle DAG errors that lead to a situation where
an Airflow scheduler and DAG processor are able to schedule tasks for execution
and to parse a DAG file (respectively) but Airflow worker fails to execute tasks
from such a DAG as there are programming errors in python DAG file. This might
lead to a situation where an Airflow task is marked as Failed
and there is no log from its execution.

Solution: verify in Airflow worker logs that there are no errors raised by
Airflow worker related to missing DAG or DAG parsing errors.

Task fails without emitting logs due to resource pressure

Symptom: during execution of a task, Airflow worker’s subprocess responsible
for Airflow task execution is interrupted abruptly.
The error visible in Airflow worker’s log might look similar to the one below:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Solution:

  • In Cloud Composer 1, create a new environment with
    a larger machine type than the current machine
    type. Consider adding more nodes to your environment and lower [celery]worker_concurrency for your workers.
  • In Cloud Composer 2, increase memory limits
    for Airflow workers.

Task fails without emitting logs due to Pod eviction

Google Kubernetes Engine pods are subject to the
Kubernetes Pod Lifecycle and pod eviction. Task
spikes and co-scheduling of workers are two most common causes for pod eviction
in Cloud Composer.

Pod eviction can occur when a particular pod overuses resources of a node,
relative to the configured resource consumption expectations for the node. For
example, eviction might happen when several memory-heavy tasks run in a pod,
and their combined load causes the node where this pod runs to exceed the
memory consumption limit.

If an Airflow worker pod is evicted, all task instances running on that
pod are interrupted, and later marked as failed by Airflow.

Logs are buffered. If a worker pod is evicted before the buffer flushes, logs
are not emitted. Task failure without logs is an indication that the Airflow
workers are restarted due to out-of-memory (OOM). Some logs might be present
in Cloud Logging even though the Airflow logs were not emitted.

To view logs:

  1. In Google Cloud console, go to the Environments page.

    Go to Environments

  2. In the list of environments, click the name of your environment.
    The Environment details page opens.

  3. Go to the Logs tab.

  4. View logs of individual workers under All logs ->
    Airflow logs -> Workers -> (individual worker).

DAG execution is memory-limited. Each task execution starts with two Airflow
processes: task execution and monitoring. Each node can take up to 6
concurrent tasks (approximately 12 processes loaded with Airflow modules).
More memory can be consumed, depending on the nature of the DAG.

Symptom:

  1. In Google Cloud console, go to the Workloads page.

    Go to Workloads

  2. If there are airflow-worker pods that show Evicted, click each evicted
    pod and look for the The node was low on resource: memory
    message at the top of the window.

Fix:

  • In Cloud Composer 1, create a new Cloud Composer environment with
    a larger machine type than the current machine
    type.
  • In Cloud Composer 2, increase memory limits
    for Airflow workers.
  • Check logs from airflow-worker pods for possible eviction causes. For more
    information about fetching logs from individual pods, see
    Troubleshooting issues with deployed workloads.
  • Make sure that the tasks in the DAG are idempotent and retriable.
  • Avoid downloading unnecessary files to the local file system of Airflow workers.

    Airflow workers have limited local file system capacity. For example, in
    Cloud Composer 2, a worker can have from 1 GB to 10 GB of storage. When the
    storage space runs out, the Airflow worker pod is evicted by
    GKE Control Plane. This fails all tasks that the evicted
    worker was executing.

    Examples, of problematic operations:

    • Downloading files or objects and storing them locally in an Airflow
      worker. Instead, store these objects directly in a suitable service such as a Cloud Storage bucket.
    • Accessing big objects in the /data folder from an Airflow worker.
      The Airflow worker downloads the object into its local filesystem. Instead, implement your DAGs so that big files are processed outside of the Airflow worker Pod.

DAG load import timeout

Symptom:

  • In the Airflow web interface, at the top of the DAGs list page, a red alert
    box shows Broken DAG: [/path/to/dagfile] Timeout.
  • In Cloud Monitoring: The airflow-scheduler logs contain entries
    similar to:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

Fix:

Override the dag_file_processor_timeout Airflow
configuration option and allow more time for DAG parsing:

Section Key Value
core dag_file_processor_timeout New timeout value

Increased network traffic to and from the Airflow database

The amount of traffic network between your environment’s GKE
cluster and the Airflow database depends on the number of DAGs, number of
tasks in DAGs, and the way DAGs access data in the Airflow database. The
following factors might influence the network usage:

  • Queries to the Airflow database. If your DAGs do a lot of queries, they
    generate large amounts of traffic. Examples: checking the status of tasks
    before proceeding with other tasks, querying the XCom table, dumping Airflow
    database content.

  • Large number of tasks. The more tasks are there to schedule, the more
    network traffic is generated. This consideration applies both to the total
    number of tasks in your DAGs and to the scheduling frequency. When the
    Airflow scheduler schedules DAG runs, it makes queries to the Airflow
    database and generates traffic.

  • Airflow web interface generates network traffic because it makes queries to
    the Airflow database. Intensively using pages with graphs, tasks, and
    diagrams can generate large volumes of network traffic.

DAG crashes the Airflow web server or causes it to return a 502 gateway timeout error

Web server failures can occur for several different reasons. Check
the airflow-webserver logs in
Cloud Logging to determine the cause of the
502 gateway timeout error.

Heavyweight computation

This section applies only to Cloud Composer 1.

Avoid running heavyweight computation at DAG parse time.

Unlike the worker and scheduler nodes, whose machine types can be customized to
have greater CPU and memory capacity, the web server uses a fixed machine type,
which can lead to DAG parsing failures if the parse-time computation is too
heavyweight.

Note that the web server has 2 vCPUs and 2 GB of memory.
The default value for core-dagbag_import_timeout is 30 seconds. This timeout
value defines the upper limit for how long Airflow spends loading a
Python module in the dags/ folder.

Incorrect permissions

This section applies only to Cloud Composer 1.

The web server does not run under the same service account as the workers and
scheduler. As such, the workers and scheduler might be able to access
user-managed resources that the web server cannot access.

We recommend that you avoid accessing non-public resources during
DAG parsing. Sometimes, this is unavoidable, and you will need to grant
permissions to the web server’s service account. The service
account name is derived from your web server domain. For example, if the domain
is example-tp.appspot.com, the service account is
example-tp@appspot.gserviceaccount.com.

DAG errors

This section applies only to Cloud Composer 1.

The web server runs on App Engine and is separate from
your environment’s GKE cluster. The web server parses the DAG
definition files, and a 502 gateway timeout can occur if there are errors
in the DAG. Airflow works normally without a functional web server if the
problematic DAG is not breaking any processes running in GKE.
In this case, you can use gcloud composer environments run to retrieve
details from your environment and as a workaround if the web server becomes
unavailable.

In other cases, you can run DAG parsing in GKE and look for
DAGs that throw fatal Python exceptions or that time out (default 30 seconds).
To troubleshoot, connect to a remote shell in an Airflow worker container and
test for syntax errors. For more information,
see Testing DAGs.

Handling a large number of DAGs and plugins in dags and plugins folders

Contents of /dags and /plugins folders are synchronized from
your environment’s bucket to local file systems of Airflow workers and
schedulers.

The more data stored in these folders, the longer it takes to perform the
synchronization. To address such situations:

  • Limit the number of files in /dags and /plugins folders. Store only the
    minimum of required files.

  • If possible, increase the disk space available to Airflow schedulers and
    workers.

  • If possible, increase CPU and memory of Airflow schedulers and workers, so
    that the sync operation is performed faster.

  • In case of a very large number of DAGs, divide DAGs into batches, compress
    them into zip archives and deploy these archives into the /dags folder.
    This approach speeds up the DAGs syncing process. Airflow components
    uncompress zip archives before processing DAGs.

  • Generating DAGs in a programmatic might also be a method for limiting
    the number of DAG files stored in the /dags folder.
    See the section about Programmatic DAGs to avoid
    problems with scheduling and executing DAGs generated programmatically.

Do not schedule programmatically generated DAGs at the same time

Generating DAG objects programmatically from a DAG file is an efficient method
to author many similar DAGs that only have small differences.

It’s important to not schedule all such DAGs for execution immediately. There is a high chance that Airflow workers do not have enough CPU and memory
resources to execute all tasks that scheduled at the same time.

To avoid issues with scheduling programmatic DAGs:

  • Increase worker concurrency and scale up your environment, so that it can
    execute more tasks simultaneously.
  • Generate DAGs in a way to distribute their schedules evenly over time, to
    avoid scheduling hundreds of tasks at the same time, so that Airflow workers
    have time to execute all scheduled tasks.

Error 504 when accessing the Airflow web server

See Error 504 when accessing the Airflow UI.

Lost connection to MySQL server during query exception is thrown during the task execution or right after it

Lost connection to MySQL / PostgreSQL server during query exceptions often
happen when the following conditions are met:

  • Your DAG uses PythonOperator or a custom operator.
  • Your DAG makes queries to the Airflow database.

If several queries are made from a callable function, tracebacks might
incorrectly point to self.refresh_from_db(lock_for_update=True) line in the
Airflow code; it is the first database query after the task execution. The
actual cause of the exception happens before this, when an SQLAlchemy session
is not properly closed.

SQLAlchemy sessions are scoped to a thread and created in a callable function
session can be later continued inside the Airflow code. If there are significant
delays between queries within one session, the connection might be already
closed by the MySQL or PostgreSQL server. The connection timeout in
Cloud Composer environments is set to approximately 10 minutes.

Fix:

  • Use the airflow.utils.db.provide_session decorator. This decorator
    provides a valid session to the Airflow database in the session
    parameter and correctly closes the session at the end of the function.
  • Do not use a single long-running function. Instead, move all database
    queries to separate functions, so that there are multiple functions with
    the airflow.utils.db.provide_session decorator. In this case, sessions
    are automatically closed after retrieving query results.

Controlling execution time of DAGs, tasks and parallel executions of the same DAG

If you want to control how long a single DAG execution for a particular DAG
lasts, then you can use
the dagrun_timeout DAG parameter to do
so. For example, if you expect that a single DAG run (irrespective, whether
execution finishes with success or failure) must not last longer than 1 hour,
then set this parameter to 3600 seconds.

You can also control how long you allow for a single Airflow task to last. To do
so, you can use execution_timeout.

If you want to control how many active DAG runs you want to have for a
particular DAG then you can use the [core]max-active-runs-per-dag
Airflow configuration option to do so.

If you want to have only a single instance of a DAG run in a give moment, set
max-active-runs-per-dag parameter to 1.

Issues impacting DAGs and plugins syncing to schedulers, workers and web servers

Cloud Composer syncs the content of /dags and /plugins folders
to scheduler(s) and workers. Certain objects in /dags and /plugins folders
might prevent this synchronization to work correctly or at least slow it down.

  • /dags folder is synced to schedulers and workers. This folder is not synced
    to web servers in Cloud Composer 2 or if you turn on DAG Serialization in Cloud Composer 1.

  • /plugins folder is synced to schedulers, workers and web servers.

You might encounter the following issues:

  • You uploaded gzip-compressed files that use
    compression transcoding to /dags and /plugins
    folders. It usually happens if you use the gsutil cp -Z command to upload
    data to the bucket.

    Solution: Delete the object that used compression transcoding and re-upload
    it to the bucket.

  • One of the objects is named ‘.’. Such an object is not synced to schedulers
    and workers and it might stop syncing at all.

    Solution: Rename a problematic object.

  • One of the objects in /dags or /plugins folders contains a / symbol
    at the end of the object’s name. Such objects can mislead syncing process
    because the / symbol means that an object is a folder, not a file.

    Solution: Remove the / symbol from the name of the problematic object.

  • Don’t store unnecessary files in /dags and /plugins folders.

    Sometimes DAGs and plugins that you implement are accompanied with
    additional files, such as files storing tests for these components. These
    files are synced to workers and schedulers and impact the time needed to
    copy these files to schedulers, workers and web servers.

    Solution: Don’t store any additional and unnecessary files in /dags and
    /plugins folders.

Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' error is generated by schedulers and workers

This problem happens because objects can have
overlapping namespace in Cloud Storage, while at the same time
schedulers and workers use traditional file systems. For example, it is possible
to add both a folder and an object with the same name to an environment’s
bucket. When the bucket is synced to the environment’s schedulers and workers,
this error is generated, which can lead to task failures.

To fix this problem, make sure that there are no overlapping namespaces in the
environment’s bucket. For example, if both /dags/misc (a file) and
/dags/misc/example_file.txt (another file) are in a bucket, an error is
generated by the scheduler.

Transient interruptions when connecting to Airflow Metadata DB

Cloud Composer runs on top of distributed cloud infrastructure.
It means that from time to time some transient issues may appear and they might
interrupt execution of your Airflow tasks.

In such situations you might see the following error messages in Airflow workers’ logs:

"Can't connect to MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

or

"Can't connect to MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Such intermittent issues might be also caused by maintenance operations
performed for your Cloud Composer environments.

Usually such errors are intermittent and if your Airflow tasks are idempotent
and you have retries configured, you should be immune to them. You can also
consider defining maintenance windows.

One additional reason for such errors might be the lack of resources in your
environment’s cluster. In such cases, you might scale up or optimize your
environment as described in
Scaling environments or
Optimizing your environment instructions.

A DAG is not visible in Airflow UI or DAG UI and the scheduler does not schedule it

The DAG processor parses each DAG before it can be scheduled by the scheduler
and before a DAG becomes visible in
the Airflow UI or DAG UI. The
[core]dag_file_processor_timeout
Airflow configuration option defines how much time the DAG processor has to
parse a single DAG.

If a DAG is not visible in the Airflow UI or DAG UI:

  • Check DAG processor logs if the DAG processor is able to correctly process
    your DAG. In case of problems, you might see the following log entries
    in the DAG processor or scheduler logs:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.

This error might be caused by one of the following reasons:

  • Your DAG is not correctly implemented and the DAG processor is not able to
    parse it. In this case, correct your DAG.

  • Parsing your DAG takes more than the amount of seconds defined in
    [core]dag_file_processor_timeout.

    In this case you can increase this timeout.

  • If your DAG takes a long time to parse, it can also mean that it is not
    implemented in an optimal way. For example, if it reads read many
    environment variables, or performs calls to external services. If this is
    the case, then optimize your DAG so the DAG processor can process it
    quickly.

Symptoms of Airflow database being under heavy load

Sometimes in the Airflow worker logs you might see the following warning log
entries.

For MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

For PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

Such an error or warning might indicate that the Airflow database is
overwhelmed by the number of queries that it must handle.

Possible solutions:

  • Perform the maintenance of the Airflow database.
  • (Cloud Composer 1)
    Change the machine type of the Cloud SQL instance
    that stores the Airflow database of your environment.
  • (Cloud Composer 2)
    Adjust the size of your environment.

What’s next

  • Troubleshooting PyPI package installation
  • Troubleshooting environment updates and upgrades

#airflow #airflow-scheduler #airflow-2.x

Вопрос:

У меня есть одна задача воздушного потока, журнал которой показывает, что она выполнена успешно, но в том же журнале также отображается SIGTERM.

 [2021-11-30 00:52:25,481] {taskinstance.py:1087} INFO -  -------------------------------------------------------------------------------- [2021-11-30 00:52:25,481] {taskinstance.py:1088} INFO - Starting attempt 1 of 11 [2021-11-30 00:52:25,482] {taskinstance.py:1089} INFO -  -------------------------------------------------------------------------------- [2021-11-30 00:52:25,492] {taskinstance.py:1107} INFO - Executing lt;Task(PythonOperator): gt; on 2021-11-29T00:30:00 00:00 [2021-11-30 00:52:25,500] {standard_task_runner.py:52} INFO - Started process 11633 to run task  [2021-11-30 00:52:25,509] {standard_task_runner.py:77} INFO - Job 1580251: Subtask  [2021-11-30 00:52:25,609] {logging_mixin.py:104} INFO - Running lt;TaskInstance: DWH_D_V1.1.0.transform 2021-11-29T00:30:00 00:00 [running]gt; on host  [2021-11-30 00:52:25,718] {taskinstance.py:1300} INFO - Exporting the following env vars:  AIRFLOW_CTX_EXECUTION_DATE=2021-11-29T00:30:00 00:00 AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-11-29T00:30:00 00:00 [2021-11-30 00:52:26,105] {transform.py:55} INFO - Empty  [2021-11-30 00:52:26,755] {transform.py:58} INFO - Inserting values [2021-11-30 01:07:54,098] {db_table.py:561} INFO - Datadog metrics push [2021-11-30 01:07:54,099] {hostname.py:60} INFO - No agent or invalid configuration file found [2021-11-30 01:07:54,113] {db_table.py:573} INFO - Datadog push: Number of rows: 47951835  [2021-11-30 01:07:54,405] {api_client.py:138} INFO - 202 POST  [2021-11-30 01:07:54,405] {datadog_logger.py:65} INFO - Datadog push: {'status': 'ok'} [2021-11-30 01:07:54,405] {db_table.py:586} INFO - Datadog push: Execution time: 928.146014213562 s [2021-11-30 01:07:54,477] {api_client.py:138} INFO - 202 POST https://api.datadoghq.com/api/v1/series (71.0046ms) [2021-11-30 01:07:54,477] {datadog_logger.py:65} INFO - Datadog push: {'status': 'ok'} [2021-11-30 01:07:54,477] {transform_dim_simple.py:66} INFO - Done [2021-11-30 01:07:54,478] {python.py:151} INFO - Done. Returned value was: None [2021-11-30 01:07:54,501] {taskinstance.py:1204} INFO - Marking task as SUCCESS. dag_id=DWH_D_V1.1.0, task_id=trasnform, execution_date=20211129T003000, start_date=20211130T005225, end_date=20211130T010754 [2021-11-30 01:07:54,705] {local_task_job.py:197} WARNING - State of this instance has been externally set to success. Terminating instance. [2021-11-30 01:07:54,707] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 11633 [2021-11-30 01:07:54,753] {taskinstance.py:1284} ERROR - Received SIGTERM. Terminating subprocesses. [2021-11-30 01:07:54,753] {logging_mixin.py:104} WARNING - Exception ignored in: lt;function _collection_gced at 0x7f4e1ca08f70gt; [2021-11-30 01:07:54,753] {logging_mixin.py:104} WARNING - Traceback (most recent call last): [2021-11-30 01:07:54,753] {logging_mixin.py:104} WARNING - File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/event/registry.py", line 53, in _collection_gced [2021-11-30 01:07:54,754] {logging_mixin.py:104} WARNING - def _collection_gced(ref): [2021-11-30 01:07:54,754] {logging_mixin.py:104} WARNING - File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1286, in signal_handler [2021-11-30 01:07:54,755] {logging_mixin.py:104} WARNING - raise AirflowException("Task received SIGTERM signal") [2021-11-30 01:07:54,755] {logging_mixin.py:104} WARNING - airflow.exceptions.AirflowException: Task received SIGTERM signal [2021-11-30 01:07:54,802] {taskinstance.py:1265} INFO - 1 downstream tasks scheduled from follow-on schedule check [2021-11-30 01:07:54,840] {process_utils.py:66} INFO - Process psutil.Process(pid=11633, status='terminated', exitcode=0, started='00:52:25') (11633) terminated with exit code 0  

Я знаю, что задача была выполнена успешно, потому что datadog показывает толчок показателя в этот день для этого процесса. После этого воздушный поток получает внешний триггер или что-то в этом роде, но я уверен, что никто не помечает задачу как успешную вручную, и у нас нет процесса, который изменяет состояние этого процесса.

Это произошло только дважды случайным образом.

Известно ли это поведение или есть какой-либо намек на расследование. Я хотя и увеличивал AIRFLOW_CORE_KILLED_TASK_CLEANUP_TIME , но так как это случайно, и это случалось несколько раз, я даже не знал, как проверить, было ли это решением, если оно работает нормально.

Thuy Le

unread,

Aug 25, 2019, 6:34:44 PM8/25/19

to cloud-composer-discuss

Hi all, I meet this issue ERROR - Task received SIGTERM signal 
[2019-08-25 15:15:36,136] {base_task_runner.py:101} INFO - Job 293: Subtask wait_for_staging_bq_act_card_acct_map [2019-08-25 15:15:36,136] {models.py:1641} ERROR - Received SIGTERM. Terminating subprocesses. [2019-08-25 15:15:36,269] {models.py:1796}
ERROR - Task received SIGTERM signal Traceback (most recent call last) File "/usr/local/lib/airflow/airflow/models.py", line 1664, in _run_raw_tas result = task_copy.execute(context=context File "/usr/local/lib/airflow/airflow/sensors/base_sensor_operator.py", line 112, in execut sleep(self.poke_interval File "/usr/local/lib/airflow/airflow/models.py", line 1643, in signal_handle raise AirflowException("Task received SIGTERM signal" airflow.exceptions.AirflowException: Task received SIGTERM signa [2019-08-25 15:15:36,270] {base_task_runner.py:101} INFO - Job 293: Subtask wait_for_staging_bq_act_card_acct_map [2019-08-25 15:15:36,269] {models.py:1796} ERROR - Task received SIGTERM signal
Do I need some special configuration?
Any suggestion to solved that issue?

many thanks

Ethan Lyon

unread,

Aug 25, 2019, 6:41:54 PM8/25/19

to Thuy Le, cloud-composer-discuss

I ran into the same issue, increased the Composer environment and it was resolved. Have you tried that?

Thuy Le

unread,

Aug 25, 2019, 6:46:08 PM8/25/19

to Ethan Lyon, cloud-composer-discuss

Hi Ethan,

thanks for your suggestion, can you specify how did you increase the composer environment?

many thanks

Ethan Lyon

unread,

Aug 25, 2019, 7:13:14 PM8/25/19

to Thuy Le, cloud-composer-discuss

I created a new environment with a higher memory and nodes and killed the old one. Does that make sense?

Thuy Le

unread,

Aug 25, 2019, 7:21:56 PM8/25/19

to cloud-composer-discuss

My environment has 3 node (each node 30GB) so total 90GB,

Machine type
n1-standard-8
Do you think it is enough?

Kiran Arshewar

unread,

Aug 30, 2019, 7:59:46 AM8/30/19

to Ethan Lyon, Thuy Le, cloud-composer-discuss

Getting the same error on three node, my config are as below

Privileged/Confidential Information may be contained in this message. If you are
not the addressee indicated in this message (or responsible for delivery of the
message to such person), you may not copy or deliver this message to anyone. In
such case, you should destroy this message and kindly notify the sender by reply
email. Please advise immediately if you or your employer does not consent to email
for messages of this kind. Opinions, conclusions and other information in this
message that do not relate to the official business of Group M Worldwide LLC and/or
other members of the GroupM group of companies shall be understood as neither given
nor endorsed by it. GroupM is the global media investment management arm of WPP.
For more information on our business ethical standards and Corporate Responsibility
policies please refer to WPP’s website at http://www.wpp.com/WPP/About/

Jiawei Zhang

unread,

Jun 24, 2020, 10:54:12 PM6/24/20

to cloud-composer-discuss

Hi,

I’m getting the same error ‘ERROR — Received SIGTERM. Terminating subprocesses.’ when running an auto-sklearn model on composer. I’m using the following configurations:

Screen Shot 2020-06-24 at 15.51.41.png

Anyone knows how to solve this? How should I update my environment?

Thanks!

Ethan Lyon

unread,

Jun 25, 2020, 5:08:48 PM6/25/20

to Jiawei Zhang, cloud-composer-discuss

We ran into this issue because of a single process consuming all the resources. We moved to an autoscaling environment with 5 max nodes and haven’t had an issue yet.

Have you tried autoscaling? 

Ethan Lyon

unread,

Jun 25, 2020, 5:20:42 PM6/25/20

to Jiawei Zhang, cloud-composer-discuss

Sorry for the double email. You mentioned using auto-sklearn. Have you tried moving your ML to another service? Something like ML Engine.

That might prevent your Composer environment from doing the heavy lifting. 

Я использую PythonOperator для вызова функции, которая распараллеливает процесс разработки данных как задачу Airflow. Это делается просто путем обёртывания простой функции вызываемой функцией-оберткой, вызываемой Airflow.

def wrapper(ds, **kwargs):
    process_data()

process_data достигает распараллеливания, используя многопроцессорный модуль, который порождает подпроцессы. Когда я сам запускаю process_data из ноутбука Jupyter, он без проблем запускается до конца. Однако, когда я запускаю его с помощью Airflow, задача не выполняется, и журнал задачи показывает что-то вроде этого.

[2019-01-22 17:16:46,966] {models.py:1610} ERROR - Received SIGTERM. Terminating subprocesses.
[2019-01-22 17:16:46,969] {logging_mixin.py:95} WARNING - Process ForkPoolWorker-129:

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING - Traceback (most recent call last):

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING -   File "/home/airflow/.env/lib/python3.5/site-packages/airflow/models.py", line 1612, in signal_handler
    raise AirflowException("Task received SIGTERM signal")

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING - airflow.exceptions.AirflowException: Task received SIGTERM signal

[2019-01-22 17:16:46,993] {models.py:1610} ERROR - Received SIGTERM. Terminating subprocesses.
[2019-01-22 17:16:46,996] {logging_mixin.py:95} WARNING - Process ForkPoolWorker-133:

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - Traceback (most recent call last):

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/queues.py", line 343, in get
    res = self._reader.recv_bytes()

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/synchronize.py", line 99, in __exit__
    return self._semlock.__exit__(*args)

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/home/airflow/.env/lib/python3.5/site-packages/airflow/models.py", line 1612, in signal_handler
    raise AirflowException("Task received SIGTERM signal")

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - airflow.exceptions.AirflowException: Task received SIGTERM signal

[2019-01-22 17:16:47,086] {logging_mixin.py:95} INFO - file parsing and processing 256.07

[2019-01-22 17:17:12,938] {logging_mixin.py:95} INFO - combining and sorting 25.85

Я не совсем уверен, почему задание получает SIGTERM. Я предполагаю, что какой-то процесс более высокого уровня отправляет их в подпроцессы. Что я должен сделать, чтобы решить эту проблему?

Просто заметил, что ближе к концу журнала для задачи, он четко заявляет, что

airflow.exceptions.AirflowException: Task received SIGTERM signal
[2019-01-22 12:31:39,196] {models.py:1764} INFO - Marking task as FAILED.

Понравилась статья? Поделить с друзьями:
  • Error received nack on transmit of address
  • Error received invalid response from npm
  • Error received from peer file too large
  • Error received data timeout
  • Error receive read timeout