Kubernetes 博客

Airflow在Kubernetes中的使用(第一部分):一种不同的操作器

作者: Daniel Imberman (Bloomberg LP)

介绍

作为Bloomberg [继续致力于开发Kubernetes生态系统]的一部分(https://www.techatbloomberg.com/blog/bloomberg-awarded-first-cncf-end-user-award-contributions-kubernetes/),我们很高兴能够宣布Kubernetes Airflow Operator的发布; Apache Airflow的机制,一种流行的工作流程编排框架,使用Kubernetes API可以在本机启动任意的Kubernetes Pod。

什么是Airflow?

Apache Airflow是DevOps“Configuration As Code”理念的一种实现。 Airflow允许用户使用简单的Python对象DAG(有向无环图)启动多步骤流水线。 您可以在易于阅读的UI中定义依赖关系,以编程方式构建复杂的工作流,并监视调度的作业。

为什么在Kubernetes上使用Airflow?

自成立以来,Airflow的最大优势在于其灵活性。 Airflow提供广泛的服务集成,包括Spark和HBase,以及各种云提供商的服务。 Airflow还通过其插件框架提供轻松的可扩展性。但是,该项目的一个限制是Airflow用户仅限于执行时Airflow站点上存在的框架和客户端。单个组织可以拥有各种Airflow工作流程,范围从数据科学流到应用程序部署。用例中的这种差异会在依赖关系管理中产生问题,因为两个团队可能会在其工作流程使用截然不同的库。

为了解决这个问题,我们使Kubernetes允许用户启动任意Kubernetes pod和配置。 Airflow用户现在可以在其运行时环境,资源和机密上拥有全部权限,基本上将Airflow转变为“您想要的任何工作”工作流程协调器。

Kubernetes运营商

在进一步讨论之前,我们应该澄清Airflow中的Operator是一个任务定义。 当用户创建DAG时,他们将使用像“SparkSubmitOperator”或“PythonOperator”这样的operator分别提交/监视Spark作业或Python函数。 Airflow附带了Apache Spark,BigQuery,Hive和EMR等框架的内置运算符。 它还提供了一个插件入口点,允许DevOps工程师开发自己的连接器。

Airflow用户一直在寻找更易于管理部署和ETL流的方法。 在增加监控的同时,任何解耦流程的机会都可以减少未来的停机等问题。 以下是Airflow Kubernetes Operator提供的好处:

  • 提高部署灵活性:

Airflow的插件API一直为希望在其DAG中测试新功能的工程师提供了重要的福利。 不利的一面是,每当开发人员想要创建一个新的operator时,他们就必须开发一个全新的插件。 现在,任何可以在Docker容器中运行的任务都可以通过完全相同的运算符访问,而无需维护额外的Airflow代码。

  • 配置和依赖的灵活性:

对于在静态Airflow工作程序中运行的operator,依赖关系管理可能变得非常困难。 如果开发人员想要运行一个需要SciPy的任务和另一个需要NumPy的任务,开发人员必须维护所有Airflow节点中的依赖关系或将任务卸载到其他计算机(如果外部计算机以未跟踪的方式更改,则可能导致错误)。 自定义Docker镜像允许用户确保任务环境,配置和依赖关系完全是幂等的。

  • 使用kubernetes Secret以增加安全性:

处理敏感数据是任何开发工程师的核心职责。 Airflow用户总有机会在严格条款的基础上隔离任何API密钥,数据库密码和登录凭据。 使用Kubernetes运算符,用户可以利用Kubernetes Vault技术存储所有敏感数据。 这意味着Airflow工作人员将永远无法访问此信息,并且可以容易地请求仅使用他们需要的密码信息构建pod。

#架构

Kubernetes Operator使用Kubernetes Python客户端生成由APIServer处理的请求(1)。 然后,Kubernetes将使用您定义的需求启动您的pod(2)。映像文件中将加载环境变量,Secret和依赖项,执行单个命令。 一旦启动作业,operator只需要监视跟踪日志的状况(3)。 用户可以选择将日志本地收集到调度程序或当前位于其Kubernetes集群中的任何分布式日志记录服务。

#使用Kubernetes Operator

##一个基本的例子

以下DAG可能是我们可以编写的最简单的示例,以显示Kubernetes Operator的工作原理。 这个DAG在Kubernetes上创建了两个pod:一个带有Python的Linux发行版和一个没有它的基本Ubuntu发行版。 Python pod将正确运行Python请求,而没有Python的那个将向用户报告失败。 如果Operator正常工作,则应该完成“passing-task”pod,而“falling-task”pod则向Airflow网络服务器返回失败。

from airflow import DAG

from datetime import datetime, timedelta

from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

from airflow.operators.dummy_operator import DummyOperator


default_args = {

    'owner': 'airflow',

    'depends_on_past': False,

    'start_date': datetime.utcnow(),

    'email': ['airflow@example.com'],

    'email_on_failure': False,

    'email_on_retry': False,

    'retries': 1,

    'retry_delay': timedelta(minutes=5)

}



dag = DAG(

    'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))

start = DummyOperator(task_id='run_this_first', dag=dag)
passing = KubernetesPodOperator(namespace='default',

                          image="Python:3.6",

                          cmds=["Python","-c"],

                          arguments=["print('hello world')"],

                          labels={"foo": "bar"},

                          name="passing-test",

                          task_id="passing-task",

                          get_logs=True,

                          dag=dag

                          )

 failing = KubernetesPodOperator(namespace='default',

                          image="ubuntu:1604",

                          cmds=["Python","-c"],

                          arguments=["print('hello world')"],

                          labels={"foo": "bar"},

                          name="fail",

                          task_id="failing-task",

                          get_logs=True,

                          dag=dag

                          )

passing.set_upstream(start)

failing.set_upstream(start)

##但这与我的工作流程有什么关系?

虽然这个例子只使用基本映像,但Docker的神奇之处在于,这个相同的DAG可以用于您想要的任何图像/命令配对。 以下是推荐的CI / CD管道,用于在Airflow DAG上运行生产就绪代码。

1:github中的PR

使用Travis或Jenkins运行单元和集成测试,请您的朋友PR您的代码,并合并到主分支以触发自动CI构建。

2:CI / CD构建Jenkins - > Docker Image

在Jenkins构建中生成Docker镜像和缓冲版本

3:Airflow启动任务

最后,更新您的DAG以反映新版本,您应该准备好了!

production_task = KubernetesPodOperator(namespace='default',

                          # image="my-production-job:release-1.0.1", <-- old release

                          image="my-production-job:release-1.0.2",

                          cmds=["Python","-c"],

                          arguments=["print('hello world')"],

                          name="fail",

                          task_id="failing-task",

                          get_logs=True,

                          dag=dag

                          )

#启动测试部署

由于Kubernetes运营商尚未发布,我们尚未发布官方helm 图表或operator(但两者目前都在进行中)。 但是,我们在下面列出了基本部署的说明,并且正在积极寻找测试人员来尝试这一新功能。 要试用此系统,请按以下步骤操作:

##步骤1:将kubeconfig设置为指向kubernetes集群

##步骤2:clone Airflow 仓库:

运行git clone https:// github.com / apache / incubator-airflow.git来clone官方Airflow仓库。

##步骤3:运行

为了运行这个基本Deployment,我们正在选择我们目前用于Kubernetes Executor的集成测试脚本(将在本系列的下一篇文章中对此进行解释)。 要启动此部署,请运行以下三个命令:


sed -ie "s/KubernetesExecutor/LocalExecutor/g" scripts/ci/kubernetes/kube/configmaps.yaml

./scripts/ci/kubernetes/Docker/build.sh

./scripts/ci/kubernetes/kube/deploy.sh

在我们继续之前,让我们讨论这些命令正在做什么:

sed -ie“s / KubernetesExecutor / LocalExecutor / g”scripts / ci / kubernetes / kube / configmaps.yaml

Kubernetes Executor是另一种Airflow功能,允许动态分配任务已解决幂等pod的问题。我们将其切换到LocalExecutor的原因只是一次引入一个功能。如果您想尝试Kubernetes Executor,欢迎您跳过此步骤,但我们将在以后的文章中详细介绍。

./scripts/ci/kubernetes/Docker/build.sh

此脚本将对Airflow主分支代码进行打包,以根据Airflow的发行文件构建Docker容器

./scripts/ci/kubernetes/kube/deploy.sh

最后,我们在您的群集上创建完整的Airflow部署。这包括Airflow配置,postgres后端,webserver +调度程序以及之间的所有必要服务。需要注意的一点是,提供的角色绑定是集群管理员,因此如果您没有该集群的权限级别,可以在scripts / ci / kubernetes / kube / airflow.yaml中进行修改。

##步骤4:登录您的网络服务器

现在您的Airflow实例正在运行,让我们来看看UI!用户界面位于Airflow pod的8080端口,因此只需运行即可


WEB=$(kubectl get pods -o go-template --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}' | grep "airflow" | head -1)

kubectl port-forward $WEB 8080:8080

现在,Airflow UI将存在于http://localhost:8080上。 要登录,只需输入airflow /airflow,您就可以完全访问Airflow Web UI。

##步骤5:上传测试文档

要修改/添加自己的DAG,可以使用kubectl cp将本地文件上传到Airflow调度程序的DAG文件夹中。 然后,Airflow将读取新的DAG并自动将其上传到其系统。 以下命令将任何本地文件上载到正确的目录中:

kubectl cp /:/root/airflow/dags -c scheduler

##步骤6:使用它!

#那么我什么时候可以使用它?

虽然此功能仍处于早期阶段,但我们希望在未来几个月内发布该功能以进行广泛发布。

#参与其中

此功能只是将Apache Airflow集成到Kubernetes中的多项主要工作的开始。 Kubernetes Operator已合并到Airflow的1.10发布分支(实验模式中的执行模块),以及完整的k8s本地调度程序称为Kubernetes Executor(即将发布文章)。这些功能仍处于早期采用者/贡献者可能对这些功能的未来产生巨大影响的阶段。

对于有兴趣加入这些工作的人,我建议按照以下步骤:

*加入airflow-dev邮件列表dev@airflow.apache.org。

*在[Apache Airflow JIRA]中提出问题(https://issues.apache.org/jira/projects/AIRFLOW/issues/)

*周三上午10点太平洋标准时间加入我们的SIG-BigData会议。

*在kubernetes.slack.com上的#sig-big-data找到我们。

特别感谢Apache Airflow和Kubernetes社区,特别是Grant Nicholas,Ben Goldberg,Anirudh Ramanathan,Fokko Dreisprong和Bolke de Bruin,感谢您对这些功能的巨大帮助以及我们未来的努力。

Welcome to the Kubernetes Blog!

Welcome to the new Kubernetes Blog. Follow this blog to learn about the Kubernetes Open Source project. We plan to post release notes, how-to articles, events, and maybe even some off topic fun here from time to time.

If you are using Kubernetes or contributing to the project and would like to do a guest post, please let me know.

To start things off, here’s a roundup of recent Kubernetes posts from other sites:

Happy cloud computing!

  • Kit Merker - Product Manager, Google Cloud Platform