众所周知,Spark是一个快速、通用的大规模数据处理平台,和Hadoop的MapReduce计算框架相似。可是相对于MapReduce,Spark凭借其可伸缩、基于内存计算等特色,以及能够直接读写Hadoop上任何格式数据的优点,使批处理更加高效,并有更低的延迟。实际上,Spark已经成为轻量级大数据快速处理的统一平台。
Spark做为一个数据计算平台和框架,更多的是关注Spark Application的管理,而底层实际的资源调度和管理更多的是依靠外部平台的支持:html
Spark官方支持四种Cluster Manager:Spark standalone cluster manager、Mesos、YARN和Kubernetes。因为咱们TalkingData是使用Kubernetes做为资源的调度和管理平台,因此Spark On Kubernetes对于咱们是最好的解决方案。node
目前市面上有不少搭建Kubernetes的方法,好比Scratch、Kubeadm、Minikube或者各类托管方案。由于咱们须要简单快速地搭建功能验证集群,因此选择了Kubeadm做为集群的部署工具。部署步骤很简单,在master上执行:git
kubeadm init
在node上执行:github
kubeadm join --token : --discovery-token-ca-cert-hash sha256:
具体配置可见官方文档:https://kubernetes.io/docs/setup/independent/create-cluster-kubeadm/。
须要注意的是因为国内网络限制,不少镜像没法从k8s.gcr.io获取,咱们须要将之替换为第三方提供的镜像,好比:https://hub.docker.com/u/mirrorgooglecontainers/。docker
Kubernetes网络默认是经过CNI实现,主流的CNI plugin有:Linux Bridge、MACVLAN、Flannel、Calico、Kube-router、Weave Net等。Flannel主要是使用VXLAN tunnel来解决pod间的网络通讯,Calico和Kube-router则是使用BGP。因为软VXLAN对宿主机的性能和网络有不小的损耗,BGP则对硬件交换机有必定的要求,且咱们的基础网络是VXLAN实现的大二层,因此咱们最终选择了MACVLAN。
CNI MACVLAN的配置示例以下:apache
{ "name": "mynet", "type": "macvlan", "master": "eth0", "ipam": { "type": "host-local", "subnet": "10.0.0.0/17", "rangeStart": "10.0.64.1", "rangeEnd": "10.0.64.126", "gateway": "10.0.127.254", "routes": [ { "dst": "0.0.0.0/0" }, { "dst": "10.0.80.0/24", "gw": "10.0.0.61" } ] } }
Pod subnet是10.0.0.0/17,实际pod ip pool是10.0.64.0/20。cluster cidr是10.0.80.0/24。咱们使用的IPAM是host-local,规则是在每一个Kubernetes node上创建/25的子网,能够提供126个IP。咱们还配置了一条到cluster cidr的静态路由10.0.80.0/24,网关是宿主机。这是由于容器在macvlan配置下egress并不会经过宿主机的iptables,这点和Linux Bridge有较大区别。在Linux Bridge模式下,只要指定内核参数net.bridge.bridge-nf-call-iptables = 1,全部进入bridge的流量都会经过宿主机的iptables。通过分析kube-proxy,咱们发现可使用KUBE-FORWARD这个chain来进行pod到service的网络转发:后端
-A FORWARD -m comment --comment "kubernetes forward rules" -j KUBE-FORWARD -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT -A KUBE-FORWARD -s 10.0.0.0/17 -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT -A KUBE-FORWARD -d 10.0.0.0/17 -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
最后经过KUBE-SERVICES使用DNAT到后端的pod。pod访问其余网段的话,就经过物理网关10.0.127.254。
还有一个须要注意的地方是出于kernel security的考虑,link物理接口的macvlan是没法直接和物理接口通讯的,这就致使容器并不能将宿主机做为网关。咱们采用了一个小技巧,避开了这个限制。咱们从物理接口又建立了一个macvlan,将物理IP移到了这个接口上,物理接口只做为网络入口:api
$ cat /etc/sysconfig/network-scripts/ifcfg-eth0 DEVICE=eth0 IPV6INIT=no BOOTPROTO=none $ cat /etc/sysconfig/network-scripts/ifcfg-macvlan DEVICE=macvlan NAME=macvlan BOOTPROTO=none ONBOOT=yes TYPE=macvlan DEVICETYPE=macvlan DEFROUTE=yes PEERDNS=yes PEERROUTES=yes IPV4_FAILURE_FATAL=no IPADDR=10.0.0.61 PREFIX=17 GATEWAY=10.0.127.254 MACVLAN_PARENT=eth0 MACVLAN_MODE=bridge
这样两个macvlan是能够互相通讯的。网络
默认配置下,Kubernetes使用kube-dns进行DNS解析和服务发现。但在实际使用时,咱们发如今pod上经过service domain访问service老是有5秒的延迟。使用tcpdump抓包,发现延迟出如今DNS AAAA。进一步排查,发现问题是因为netfilter在conntrack和SNAT时的Race Condition致使。简言之,DNS A和AAAA记录请求报文是并行发出的,这会致使netfilter在_nf_conntrack_confirm时认为第二个包是重复的(由于有相同的五元组),从而丢包。具体可看我提的issue:https://github.com/kubernetes/kubernetes/issues/62628。一个简单的解决方案是在/etc/resolv.conf中增长options single-request-reopen,使DNS A和AAAA记录请求报文使用不一样的源端口。我提的PR在:https://github.com/kubernetes/kubernetes/issues/62628,你们能够参考。咱们的解决方法是不使用Kubernetes service,设置hostNetwork=true使用宿主机网络提供DNS服务。由于咱们的基础网络是大二层,因此pod和node能够直接通讯,这就避免了conntrack和SNAT。框架
因为Spark的抽象设计,咱们可使用第三方资源管理平台调度和管理Spark做业,好比Yarn、Mesos和Kubernetes。目前官方有一个experimental项目,能够将Spark运行在Kubernetes之上:https://spark.apache.org/docs/latest/running-on-kubernetes.html。
当咱们经过spark-submit将Spark做业提交到Kubernetes集群时,会执行如下流程:
因为Spark driver和executor都运行在Kubernetes pod中,而且咱们使用Docker做为container runtime enviroment,因此首先咱们须要创建Spark的Docker镜像。
在Spark distribution中已包含相应脚本和Dockerfile,能够经过如下命令构建镜像:
$ ./bin/docker-image-tool.sh -r <repo> -t my-tag build $ ./bin/docker-image-tool.sh -r <repo> -t my-tag push
在构建Spark镜像后,咱们能够经过如下命令提交做业:
$ bin/spark-submit \ --master k8s://https://: \ --deploy-mode cluster \ --name spark-pi \ --class org.apache.spark.examples.SparkPi \ --jars https://path/to/dependency1.jar,https://path/to/dependency2.jar --files hdfs://host:port/path/to/file1,hdfs://host:port/path/to/file2 --conf spark.executor.instances=5 \ --conf spark.kubernetes.container.image= \ https://path/to/examples.jar
其中,Spark master是Kubernetes api server的地址,能够经过如下命令获取:
$ kubectl cluster-info Kubernetes master is running at http://127.0.0.1:6443
Spark的做业代码和依赖,咱们能够在--jars、--files和最后位置指定,协议支持http、https和HDFS。
执行提交命令后,会有如下输出:
任务结束,会输出:
咱们能够在本地使用kubectl port-forward访问Driver UI:
$ kubectl port-forward <driver-pod-name> 4040:4040
执行完后经过http://localhost:4040访问。
Spark的全部日志均可以经过Kubernetes API和kubectl CLI进行访问:
$ kubectl -n=<namespace> logs -f <driver-pod-name>
在Kubernetes中,咱们可使用namespace在多用户间实现资源分配、隔离和配额。Spark On Kubernetes一样支持配置namespace建立Spark做业。
首先,建立一个Kubernetes namespace:
$ kubectl create namespace spark
因为咱们的Kubernetes集群使用了RBAC,因此还需建立serviceaccount和绑定role:
$ kubectl create serviceaccount spark -n spark $ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=spark:spark --namespace=spark
并在spark-submit中新增如下配置:
$ bin/spark-submit \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ --conf spark.kubernetes.namespace=spark \ ...
考虑到咱们Spark做业的一些特色和计算资源隔离,前期咱们仍是选择了较稳妥的物理隔离方案。具体作法是为每一个组提供单独的Kubernetes namespace,计算任务都在各自namespace里提交。计算资源以物理机为单位,折算成cpu和内存,归入Kubernetes统一管理。在Kubernetes集群里,经过node label和PodNodeSelector将计算资源和namespace关联。从而实如今提交Spark做业时,计算资源老是选择namespace关联的node。
具体作法以下:
一、建立node label
$ kubectl label nodes <node_name> spark:spark
二、开启Kubernetes admission controller
咱们是使用kubeadm安装Kubernetes集群,因此修改/etc/kubernetes/manifests/kube-apiserver.yaml,在--admission-control后添加PodNodeSelector。
$ cat /etc/kubernetes/manifests/kube-apiserver.yaml apiVersion: v1 kind: Pod metadata: annotations: scheduler.alpha.kubernetes.io/critical-pod: "" creationTimestamp: null labels: component: kube-apiserver tier: control-plane name: kube-apiserver namespace: kube-system spec: containers: - command: - kube-apiserver - --secure-port=6443 - --proxy-client-cert-file=/etc/kubernetes/pki/front-proxy-client.crt - --admission-control=Initializers,NamespaceLifecycle,LimitRanger,ServiceAccount,DefaultStorageClass,DefaultTolerationSeconds,NodeRestriction,ResourceQuota,MutatingAdmissionWebhook,ValidatingAdmissionWebhook,PodNodeSelector ...
三、配置PodNodeSelector
在namespace的annotations中添加scheduler.alpha.kubernetes.io/node-selector: spark=spark。
apiVersion: v1 kind: Namespace metadata: annotations: scheduler.alpha.kubernetes.io/node-selector: spark=spark name: spark
完成以上配置后,能够经过spark-submit测试结果:
$ spark-submit --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace=spark --master k8s://https://xxxx:6443 --deploy-mode cluster --name spark-pi --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=5 --conf spark.kubernetes.container.image=xxxx/library/spark:v2.3 http://xxxx:81/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar
咱们能够看到,Spark做业全分配到了关联的hadooptest-001到003三个node上。
Kubernetes的集群状态基本都保存在etcd中,因此etcd是HA的关键所在。因为咱们目前还处在半生产状态,HA这方面未过多考虑。有兴趣的同窗能够查看:https://kubernetes.io/docs/setup/independent/high-availability/。
在Spark On Yarn下,能够开启yarn.log-aggregation-enable将日志收集聚合到HDFS中,以供查看。可是在Spark On Kubernetes中,则缺乏这种日志收集机制,咱们只能经过Kubernetes pod的日志输出,来查看Spark的日志:
$ kubectl -n=<namespace> logs -f <driver-pod-name>
收集和聚合日志,咱们后面会和ES结合。
监控
咱们TalkingData内部有本身的监控平台OWL[2](已开源),将来咱们计划编写metric plugin,将Kubernetes接入OWL中。
混合部署
为了保证Spark做业时刻有可用的计算资源,咱们前期采用了物理隔离的方案。显而易见,这种方式大幅下降了物理资源的使用率。下一步咱们计划采用混部方案,经过如下三种方式实现:
在采用如下两种方法增长资源使用率时,集群可能会面临资源短缺和可用性的问题:
这会致使运行资源大于实际物理资源的状况(我称之为资源挤兑)。一种作法是给资源划分等级,优先保证部分等级的资源供给。另外一种作法是实现资源的水平扩展,动态补充可用资源,并在峰值事后自动释放。
原文连接 本文为云栖社区原创内容,未经容许不得转载。