中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

AirFlow 常見問題

發布時間:2020-03-04 17:14:53 來源:網絡 閱讀:1391 作者:強蕓羽翼 欄目:系統運維

@[toc]

AirFlow 常見問題

安裝問題

1、安裝出現ERROR “python setup.py xxx” 。
問題:

第一需要你更新 pip 版本需要使用'pip install --upgrade pip' command.

第二是 setuptools 版本太舊,所以出現以下問題Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-build-G9yO9Z/tldr/,也是需要你更新


File "/tmp/pip-build-G9yO9Z/tldr/setuptools_scm-3.3.3-py2.7.egg/setuptools_scm/integration.py", line 9, in version_keyword
File "/tmp/pip-build-G9yO9Z/tldr/setuptools_scm-3.3.3-py2.7.egg/setuptools_scm/version.py", line 66, in _warn_if_setuptools_outdated
setuptools_scm.version.SetuptoolsOutdatedWarning: your setuptools is too old (<12)
----------------------------------------

Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-build-G9yO9Z/tldr/
You are using pip version 8.1.2, however version 19.2.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.

##### 解決方法:
> (一)使用“pip install—upgrade pip”命令進行pip版本升級。
>          [xiaokang@localhost ~]$ sudo pip install --upgrade pip
> (二)使用“ pip install --upgrade setuptools”命令進行setuptools 版本升級。
>          [xiaokang@localhost ~]$ sudo pip install --upgrade setuptools
>  解決完以上問題你就可以成功安裝上之前要安裝的軟件了

#### 2、ERROR: Cannot uninstall 'enum34' 。
##### 問題:
```python
        在安裝Airflow的時候,出現如下錯誤:
        ERROR: Cannot uninstall 'enum34'. It is a distutils installed project and thus we cannot accurately determine which files belong to it which would lead to only a partial uninstall.
解決方法:

sudo pip install --ignore-installed enum34
當出現其他無法升級的錯誤時,可以采用以下命令格式進行強制升級:
   sudo pip install --ignore-installed +模塊名

3、安裝軟件報錯,提示軟件包找不到
問題:ERROR: Command errored out with exit status 1:

AirFlow 常見問題

    ERROR: Command errored out with exit status 1:
     command: /usr/bin/python -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-oZ2zgF/flask-appbuilder/setup.py'"'"'; __file__='"'"'/tmp/pip-install-oZ2zgF/flask-appbuilder/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /tmp/pip-install-oZ2zgF/flask-appbuilder/pip-egg-info
         cwd: /tmp/pip-install-oZ2zgF/flask-appbuilder/
    Complete output (3 lines):
    /usr/lib64/python2.7/distutils/dist.py:267: UserWarning: Unknown distribution option: 'long_description_content_type'
      warnings.warn(msg)
    error in Flask-AppBuilder setup command: 'install_requires' must be a string or list of strings containing valid project/version requirement specifiers
    ----------------------------------------
ERROR: Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.
解決方法:

檢查安裝命令,一般此類問題是因為安裝包找不到才會出現的錯誤。

4、提示找不到Python.h 這個文件或目錄
問題:src/spt_python.h:14:20: fatal error: Python.h: No such file or directory

AirFlow 常見問題

    ERROR: Command errored out with exit status 1:
     command: /usr/bin/python -u -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-YmiKzY/setproctitle/setup.py'"'"'; __file__='"'"'/tmp/pip-install-YmiKzY/setproctitle/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' install --record /tmp/pip-record-XTav9_/install-record.txt --single-version-externally-managed --compile
         cwd: /tmp/pip-install-YmiKzY/setproctitle/
    Complete output (15 lines):
    running install
    running build
    running build_ext
    building 'setproctitle' extension
    creating build
    creating build/temp.linux-x86_64-2.7
    creating build/temp.linux-x86_64-2.7/src
    gcc -pthread -fno-strict-aliasing -O2 -g -pipe -Wall -Wp,-D_FORTIFY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -m64 -mtune=generic -D_GNU_SOURCE -fPIC -fwrapv -DNDEBUG -O2 -g -pipe -Wall -Wp,-D_FORTIFY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -m64 -mtune=generic -D_GNU_SOURCE -fPIC -fwrapv -fPIC -DHAVE_SYS_PRCTL_H=1 -DSPT_VERSION=1.1.10 -I/usr/include/python2.7 -c src/setproctitle.c -o build/temp.linux-x86_64-2.7/src/setproctitle.o
    In file included from src/spt.h:15:0,
                     from src/setproctitle.c:14:
    src/spt_python.h:14:20: fatal error: Python.h: No such file or directory
     #include <Python.h>
                        ^
    compilation terminated.
    error: command 'gcc' failed with exit status 1
    ----------------------------------------
ERROR: Command errored out with exit status 1: /usr/bin/python -u -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-YmiKzY/setproctitle/setup.py'"'"'; __file__='"'"'/tmp/pip-install-YmiKzY/setproctitle/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' install --record /tmp/pip-record-XTav9_/install-record.txt --single-version-externally-managed --compile Check the logs for full command output.
解決方法:

因為缺少python的開發包,yum install python-devel 安裝即可解決

 
 

dag 問題

1、bash_command='/root/touch.sh' 執行命令錯誤 。
問題:

AirFlow 常見問題

[2019-12-19 15:15:15,523] {taskinstance.py:1058} ERROR - bash /root/touch.sh
Traceback (most recent call last):
  File "/usr/lib/python2.7/site-packages/airflow/models/taskinstance.py", line 915, in _run_raw_task
    self.render_templates(context=context)
  File "/usr/lib/python2.7/site-packages/airflow/models/taskinstance.py", line 1267, in render_templates
    self.task.render_template_fields(context)
  File "/usr/lib/python2.7/site-packages/airflow/models/baseoperator.py", line 689, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/usr/lib/python2.7/site-packages/airflow/models/baseoperator.py", line 696, in _do_render_template_fields
    rendered_content = self.render_template(content, context, jinja_env, seen_oids)
  File "/usr/lib/python2.7/site-packages/airflow/models/baseoperator.py", line 723, in render_template
    return jinja_env.get_template(content).render(**context)
  File "/usr/lib64/python2.7/site-packages/jinja2/environment.py", line 830, in get_template
    return self._load_template(name, self.make_globals(globals))
  File "/usr/lib64/python2.7/site-packages/jinja2/environment.py", line 804, in _load_template
    template = self.loader.load(self, name, globals)
  File "/usr/lib64/python2.7/site-packages/jinja2/loaders.py", line 113, in load
    source, filename, uptodate = self.get_source(environment, name)
  File "/usr/lib64/python2.7/site-packages/jinja2/loaders.py", line 187, in get_source
    raise TemplateNotFound(template)
TemplateNotFound: bash /root/touch.sh
解決方法:在執行的命令后面多加一個空格

  由于airflow使用了jinja2作為模板引擎導致的一個陷阱,當使用bash命令的時候,尾部必須加一個空格

airflow

1、啟動worker 時報錯
問題:
Running a worker with superuser privileges when the
worker accepts messages serialized with pickle is a very bad idea!

If you really want to continue then you have to set the C_FORCE_ROOT
environment variable (but please think about this before you do).

AirFlow 常見問題

解決方法:

在/etc/profile 內添加 export C_FORCE_ROOT="True"

2、airflow怎么批量unpause大量的dag任務

普通少量任務可以通過命令airflow unpause dag_id命令來啟動,或者在web界面點擊啟動按鈕實現,但是當任務過多的時候,一個個任務去啟動就比較麻煩。其實dag信息是存儲在數據庫中的,可以通過批量修改數據庫信息來達到批量啟動dag任務的效果。假如是用mysql作為sql_alchemy_conn,那么只需要登錄airflow數據庫,然后更新表dag的is_paused字段為0即可啟動dag任務。

示例: update dag set is_paused = 0 where dag_id like "benchmark%";

3、airflow的scheduler進程在執行一個任務后就掛起進入假死狀態

出現這個情況的一般原因是scheduler調度器生成了任務,但是無法發布出去。而日志中又沒有什么錯誤信息。

可能原因是Borker連接依賴庫沒安裝:
如果是redis作為broker則執行pip install apache‐airflow[redis]
如果是rabbitmq作為broker則執行pip install apache-airflow[rabbitmq]
還有要排查scheduler節點是否能正常訪問rabbitmq。

4、當定義的dag文件過多的時候,airflow的scheduler節點運行效率緩慢

airflow的scheduler默認是起兩個線程,可以通過修改配置文件airflow.cfg改進:

[scheduler]
# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run.
#默認是2這里改為100
max_threads = 100
5、airflow日志級別更改
vi airflow.cfg

[core]
#logging_level = INFO
logging_level = WARNING

NOTSET < DEBUG < INFO < WARNING < ERROR < CRITICAL
如果把log的級別設置為INFO, 那么小于INFO級別的日志都不輸出, 大于等于INFO級別的日志都輸出。也就是說,日志級別越高,打印的日志越不詳細。默認日志級別為WARNING。
注意: 如果將logging_level改為WARNING或以上級別,則不僅僅是日志,命令行輸出明細也會同樣受到影響,也只會輸出大于等于指定級別的信息,所以如果命令行輸出信息不全且系統無錯誤日志輸出,那么說明是日志級別過高導致的。

6、AirFlow: jinja2.exceptions.TemplateNotFound

這是由于airflow使用了jinja2作為模板引擎導致的一個陷阱,當使用bash命令的時候,尾部必須加一個空格:

  • Described here : see below. You need to add a space after the script name in cases where you are directly calling a bash scripts in the bash_command attribute of BashOperator - this is because the Airflow tries to apply a Jinja template to it, which will fail.
t2 = BashOperator(
task_id='sleep',
bash_command="/home/batcher/test.sh", // This fails with `Jinja template not found` error
#bash_command="/home/batcher/test.sh ", // This works (has a space after)
dag=dag)
7、AirFlow: Task is not able to be run

任務執行一段時間后突然無法執行,后臺worker日志顯示如下提示:

[2018-05-25 17:22:05,068] {jobs.py:2508} INFO - Task is not able to be run

查看任務對應的執行日志:

cat /home/py/airflow-home/logs/testBashOperator/print_date/2018-05-25T00:00:00/6.log
...
[2018-05-25 17:22:05,067] {models.py:1190} INFO - Dependencies not met for &lt;TaskInstance: testBashOperator.print_date 2018-05-25 00:00:00 [success]&gt;,
dependency 'Task Instance State' FAILED: Task is in the 'success' state which is not a valid state for execution. The task must be cleared in order to be run.

根據錯誤提示,說明依賴任務狀態失敗,針對這種情況有兩種解決辦法:

使用airflow run運行task的時候指定忽略依賴task:

$ airflow run -A dag_id task_id execution_date

使用命令airflow clear dag_id進行任務清理:

$ airflow clear -u testBashOperator
8、CELERY: PRECONDITION_FAILED - inequivalent arg 'x-expires' for queue 'celery@xxxx.celery.pidbox' in vhost ''在升級celery 4.x以后使用rabbitmq為broker運行任務拋出如下異常:
[2018-06-29 09:32:14,622: CRITICAL/MainProcess] Unrecoverable error: PreconditionFailed(406, "PRECONDITION_FAILED - inequivalent arg 'x-expires' for queue 'celery@PQ
SZ-L01395.celery.pidbox' in vhost '/': received the value '10000' of type 'signedint' but current is none", (50, 10), 'Queue.declare')
Traceback (most recent call last):
    File "c:\programdata\anaconda3\lib\site-packages\celery\worker\worker.py", line 205, in start
self.blueprint.start(self)
.......
    File "c:\programdata\anaconda3\lib\site-packages\amqp\channel.py", line 277, in _on_close
        reply_code, reply_text, (class_id, method_id), ChannelError,
amqp.exceptions.PreconditionFailed: Queue.declare: (406) PRECONDITION_FAILED - inequivalent arg 'x-expires' for queue 'celery@PQSZ-L01395.celery.pidbox' in vhost '/'
: received the value '10000' of type 'signedint' but current is none

出現該錯誤的原因一般是因為rabbitmq的客戶端和服務端參數不一致導致的,將其參數保持一致即可。
比如這里提示是x-expires 對應的celery中的配置是control_queue_expires。因此只需要在配置文件中加上control_queue_expires = None即可。

在celery 3.x中是沒有這兩項配置的,在4.x中必須保證這兩項配置的一致性,不然就會拋出如上的異常。

我這里遇到的了兩個rabbitmq的配置與celery配置的映射關系如下表:

rabbitmq celery4.x
x-expires control_queue_expires
x-message-ttl control_queue_ttl
9、CELERY: The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0.Please use RPC backend or a persistent backend

celery升級到4.x之后運行拋出如下異常:

/anaconda/anaconda3/lib/python3.6/site-packages/celery/backends/amqp.py:67: CPendingDeprecationWarning:
        The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0. Please use RPC backend or a persistent backend.
    alternative='Please use RPC backend or a persistent backend.')

原因解析:
在celery 4.0中 rabbitmq 配置result_backbend方式變了:
以前是跟broker一樣:result_backend = 'amqp://guest:guest@localhost:5672//'
現在對應的是rpc配置:result_backend = 'rpc://'

參考鏈接:http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-event_queue_prefix

10、CELERY: ValueError('not enough values to unpack (expected 3, got 0)',)

windows上運行celery 4.x拋出以下錯誤:

[2018-07-02 10:54:17,516: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',)
Traceback (most recent call last):
    ......
    tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)

celery 4.x暫時不支持windows平臺,如果為了調試目的的話,可以通過替換celery的線程池實現以達到在windows平臺上運行的目的:

pip install eventlet
celery -A &lt;module&gt; worker -l info -P eventlet

參考鏈接:
https://stackoverflow.com/questions/45744992/celery-raises-valueerror-not-enough-values-to-unpack
https://blog.csdn.net/qq_30242609/article/details/79047660

11、Airflow: ERROR - 'DisabledBackend' object has no attribute '_get_task_meta_for'

airflow運行中拋出以下異常:

Traceback (most recent call last):
File "/anaconda/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 83, in sync
......
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
File "/anaconda/anaconda3/lib/python3.6/site-packages/celery/backends/base.py", line 307, in get_task_meta
meta = self._get_task_meta_for(task_id)
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
[2018-07-04 10:52:14,746] {celery_executor.py:101} ERROR - Error syncing the celery executor, ignoring it:
[2018-07-04 10:52:14,746] {celery_executor.py:102} ERROR - 'DisabledBackend' object has no attribute '_get_task_meta_for'

這種錯誤有兩種可能原因:

  1. CELERY_RESULT_BACKEND屬性沒有配置或者配置錯誤;
  2. celery版本太低,比如airflow 1.9.0要使用celery4.x,所以檢查celery版本,保持版本兼容;
12、airflow.exceptions.AirflowException dag_id could not be found xxxx. Either the dag did not exist or it failed to parse

查看worker日志airflow-worker.err

airflow.exceptions.AirflowException: dag_id could not be found: bmhttp. Either the dag did not exist or it failed to parse.
[2018-07-31 17:37:34,191: ERROR/ForkPoolWorker-6] Task airflow.executors.celery_executor.execute_command[181c78d0-242c-4265-aabe-11d04887f44a] raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
File "/anaconda/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 52, in execute_command
subprocess.check_call(command, shell=True)
File "/anaconda/anaconda3/lib/python3.6/subprocess.py", line 291, in check_call
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command 'airflow run bmhttp get_op1 2018-07-26T06:28:00 --local -sd /home/ignite/airflow/dags/BenchMark01.py' returned non-zero exit status 1.

通過異常日志中的Command信息得知, 調度節點在生成任務消息的時候同時也指定了要執行的腳本的路徑(通過ds參數指定),也就是說調度節點(scheduler)和工作節點(worker)相應的dag腳本文件必須置于相同的路徑下面,不然就會出現以上錯誤。

參考鏈接:https://stackoverflow.com/questions/43235130/airflow-dag-id-could-not-be-found

13、airlfow 的 REST API調用返回 Airflow 404 = lots of circles

出現這個錯誤的原因是因為URL中未提供origin參數,這個參數用于重定向,例如調用airflow的/run接口,可用示例如下所示:

http://localhost:8080/admin/airflow/run?dag_id=example_hello_world_dag&task_id=sleep_task&execution_date=20180807&ignore_all_deps=true&origin=/admin

14、Broker與Executor選擇

請務必使用RabbitMQ+CeleryExecutor, 畢竟這個也是Celery官方推薦的做法, 這樣就可以使用一些很棒的功能, 比如webui上點擊錯誤的Task然后ReRun

15、pkg_resources.DistributionNotFound: The 'setuptools==0.9.8' distribution was not found and is required by the application

AirFlow 常見問題

pip install distribution

16、Supervisor

在使用supervisor的啟動worker,server,scheduler的時候, 請務必給配置的supervisor任務加上

environment=AIRFLOW_HOME=xxxxxxxxxx

主要原因在于如果你的supervisor是通過調用一個自定義的腳本來運行的, 在啟動worker的時候會另外啟動一個serve_log服務, 如果沒有設置正確的環境變量, serve_log 會在默認的AIRFLOW_HOME里找日志, 導致無法在webui里查看日志

17、Serve_log

如果在多個機器上部署了worker, 那么你需要iptables開啟那些機器的8793端口, 這樣webui才能查看跨機器worker的任務日志

18、AMPQ庫

celery提供了兩種庫來實現amqp, 一種是默認的kombu, 另外一個是librabbitmq, 后者是對其c模塊的綁定, 在1.8.1版本中, 使用的kombu的時候會出現scheduler自動斷掉的問題, 這個應該是其對應版本4.0.2的問題, 當切成librabbitmq的時候, server 與 scheduler運行正常, 但是worker的從來不consume任務, 最后查出原因: Celery4.0.2的協議發生了變化但是librabbitmq還沒有對應修改, 解決方法是, 修改源碼里的 executors/celery_executor.py文件然后加入參數

CELERY_TASK_PROTOCOL = 1
19、RabbitMQ連接卡死

運行一段時間過后, 由于網絡問題導致所有任務都在queued狀態, 除非把worker重啟才能生效, 查資料有人說是clelery的broker pool有問題, 繼續給celery_executor.py加入參數

BROKER_POOL_LIMIT=0 //不使用連接池

另外這樣只會減少卡死的幾率, 最好使用crontab定時重啟worker

20、特定任務只在特殊機器上運行

可以給DAG中的task指定一個queue, 然后在特定的機器上運行 airflow worker -q=QUEUE_NAME 即可實現

21、RabbitMQ中的queue數量過多問題

celery為了讓scheduler知道每個task的結果并且知道結果的時間為 O(1) , 那么唯一的解決方式就是給每一個任務創建一個UUID的queue, 默認這個queue的過期時間是1天, 可以通過更改celery_executor.py的參數來調節這個過期時間

CELERY_TASK_RESULT_EXPIRES = time in seconds
22、airflow worker 角色不能使用根用戶啟動

原因:不能用根用戶啟動的根本原因,在于airflow的worker直接用的celery,而celery 源碼中有參數默認不能使用ROOT啟動,否則將報錯 .

C_FORCE_ROOT = os.environ.get('C_FORCE_ROOT', False)
ROOT_DISALLOWED = """\
Running a worker with superuser privileges when the
worker accepts messages serialized with pickle is a very bad idea!

If you really want to continue then you have to set the C_FORCE_ROOT
environment variable (but please think about this before you do).

User information: uid={uid} euid={euid} gid={gid} egid={egid}
"""

ROOT_DISCOURAGED = """\
You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid={uid} euid={euid} gid={gid} egid={egid}
"""

解決方案一:修改airlfow源碼,在celery_executor.py中強制設置C_FORCE_ROOT

from celery import Celery, platforms
在app = Celery(…)后新增
platforms.C_FORCE_ROOT = True
重啟即可

解決方案二:在容器初始化環境變量的時候,設置C_FORCE_ROOT參數,以零侵入的方式解決問題

強制celery worker運行采用root模式
export C_FORCE_ROOT=True
23、docker in docker

在dags中以docker方式調度任務時,為了container的輕量話,不做重型的docker pull等操作,我們利用了docker cs架構的設計理念,只需要將宿主機的/var/run/docker.sock文件掛載到容器目錄下即可 docker in docker 資料 :https://link.zhihu.com/?target=http://wangbaiyuan.cn/docker-in-docker.html#prettyPhoto

24、多個worker節點進行調度反序列化dag執行的時候,報找不到module的錯誤

當時考慮到文件更新的一致性,采用所有worker統一執行master下發的序列化dag的方案,而不依賴worker節點上實際的dag文件,開啟這一特性操作如下

worker節點上: airflow worker -cn=ip@ip -p //-p為開關參數,意思是以master序列化的dag作為執行文件,而不是本地dag目錄中的文件
master節點上: airflow scheduler -p

錯誤原因: 遠程的worker節點上不存在實際的dag文件,反序列化的時候對于當時在dag中定義的函數或對象找不到module_name
解決方案一:在所有的worker節點上同時發布dags目錄,缺點是dags一致性成問題
解決方案二:修改源碼中序列化與反序列化的邏輯,主體思路還是替換掉不存在的module為main。修改如下:


//models.py 文件,對 class DagPickle(Base) 定義修改
import dill
class DagPickle(Base):
id = Column(Integer, primary_key=True)
# 修改前: pickle = Column(PickleType(pickler=dill))
pickle = Column(LargeBinary)
created_dttm = Column(UtcDateTime, default=timezone.utcnow)
pickle_hash = Column(Text)

tablename = "dag_pickle"
def init(self, dag):
self.dag_id = dag.dag_id
if hasattr(dag, 'template_env'):
dag.template_env = None
self.pickle_hash = hash(dag)
raw = dill.dumps(dag)

修改前: self.pickle = dag

reg_str = 'unusualprefix\w*{0}'.format(dag.dag_id)
result = re.sub(str.encode(reg_str), b'main', raw)
self.pickle =result

//cli.py 文件反序列化邏輯 run(args, dag=None) 函數
// 直接通過dill來反序列化二進制文件,而不是通過PickleType 的result_processor做中轉
修改前: dag = dag_pickle.pickle
修改后:dag = dill.loads(dag_pickle.pickle)

>  解決方案三:源碼零侵入,使用python的types.FunctionType重新創建一個不帶module的function,這樣序列化與反序列化的時候不會有問題

new_func = types.FunctionType((lambda df: df.iloc[:, 0].size == xx).code, {})


#### 25、在master節點上,通過webserver無法查看遠程執行的任務日志
>  原因:由于airflow在master查看task執行日志是通過各個節點的http服務獲取的,但是存入task_instance表中的host_name不是ip,可見獲取hostname的方式有問題.
>  解決方案:修改airflow/utils/net.py 中get_hostname函數,添加優先獲取環境變量中設置的hostname的邏輯
```python
//models.py TaskInstance
self.hostname = get_hostname()
//net.py 在get_hostname里面加入一個獲取環境變量的邏輯
import os
def get_hostname():
"""
Fetch the hostname using the callable from the config or using
`socket.getfqdn` as a fallback.
"""
# 嘗試獲取環境變量
if 'AIRFLOW_HOST_NAME' in os.environ:
return os.environ['AIRFLOW_HOST_NAME']
# First we attempt to fetch the callable path from the config.
try:
callable_path = conf.get('core', 'hostname_callable')
except AirflowConfigException:
callable_path = None

# Then we handle the case when the config is missing or empty. This is the
# default behavior.
if not callable_path:
return socket.getfqdn()

# Since we have a callable path, we try to import and run it next.
module_path, attr_name = callable_path.split(':')
module = importlib.import_module(module_path)
callable = getattr(module, attr_name)
return callable()
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

贵南县| 房山区| 墨脱县| 错那县| 鹤山市| 卢湾区| 乌兰县| 周宁县| 蒙阴县| 新邵县| 金门县| 荔波县| 巴塘县| 东莞市| 松原市| 新晃| 博白县| 桓台县| 无棣县| 富民县| 宜黄县| 共和县| 上高县| 普安县| 大同市| 怀宁县| 绿春县| 庆阳市| 高清| 青浦区| 呼玛县| 鱼台县| 镇安县| 万州区| 乐昌市| 梓潼县| 宜川县| 大渡口区| 文化| 昭平县| 平陆县|