做者 | 阿里云智能事业群高级开发工程师 元毅git
基于事件驱动是Serveless的核心功能之一,经过事件驱动服务,知足了用户按需付费(Pay-as-you-go)的需求。在以前的文章中咱们介绍过 Knative Eventing 由事件源、事件处理模型和事件消费 3 个主要部分构成,那么事件如何经过这 3 个组件产生、处理以及消费呢? 本文经过 Kubernetes Event Source 示例介绍一下 Knative Eventing 中如何获取事件,而且将事件传递给 Serving 进行消费。其中事件处理基于 Broker/Trigger 模型。github
先了解一下Broker/Trigger 事件处理模型。从 v0.5 开始,Knative Eventing 定义 Broker 和 Trigger 对象,从而能方便的对事件进行过滤。web
Broker/Tiggger 模型流程处理如图所示:
json
先看一下 Kubernetes Event Source 示例处理流程,如图所示:api
接下来介绍一下各个阶段如何进行操做处理。app
为 ApiServerSource
建立 Service Account, 用于受权 ApiServerSource 获取 Kubernetes Events 。
serviceaccount.yaml 以下:less
apiVersion: v1 kind: ServiceAccount metadata: name: events-sa namespace: default --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: event-watcher rules: - apiGroups: - "" resources: - events verbs: - get - list - watch --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: k8s-ra-event-watcher roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: event-watcher subjects: - kind: ServiceAccount name: events-sa namespace: default
执行以下操做:ui
kubectl apply --filename serviceaccount.yaml
建立 Event Source阿里云
Knative Eventing 中 经过 Event Source 对接第三方系统产生统一的事件类型。当前支持 ApiServerSource,GitHub 等多种数据源。这里咱们建立一个 ApiServerSource 事件源用于接收 Kubernetes Events 事件并进行转发。k8s-events.yaml 以下:url
apiVersion: sources.eventing.knative.dev/v1alpha1 kind: ApiServerSource metadata: name: testevents namespace: default spec: serviceAccountName: events-sa mode: Resource resources: - apiVersion: v1 kind: Event sink: apiVersion: eventing.knative.dev/v1alpha1 kind: Broker name: default
这里经过 sink 参数指定事件接收方,支持 Broker 和 k8s service。
执行命令:
kubectl apply --filename k8s-events.yaml
首先构建你的事件处理服务,能够参考 knative-sample/event-display开源项目。
这里的 Service 服务仅把接收到的事件打印出来,处理逻辑以下:
package main import ( "context" "fmt" "log" cloudevents "github.com/cloudevents/sdk-go" "github.com/knative-sample/event-display/pkg/kncloudevents" ) /* Example Output: cloudevents.Event: Validation: valid Context Attributes, SpecVersion: 0.2 Type: dev.knative.eventing.samples.heartbeat Source: https://github.com/knative/eventing-sources/cmd/heartbeats/#local/demo ID: 3d2b5a1f-10ca-437b-a374-9c49e43c02fb Time: 2019-03-14T21:21:29.366002Z ContentType: application/json Extensions: the: 42 beats: true heart: yes Transport Context, URI: / Host: localhost:8080 Method: POST Data { "id":162, "label":"" } */ func display(event cloudevents.Event) { fmt.Printf("Hello World: \n") fmt.Printf("cloudevents.Event\n%s", event.String()) } func main() { c, err := kncloudevents.NewDefaultClient() if err != nil { log.Fatal("Failed to create client, ", err) } log.Fatal(c.StartReceiver(context.Background(), display)) }
经过上面的代码,能够轻松构建你本身的镜像。镜像构建完成以后,接下来能够建立一个简单的 Knative Service, 用于消费 ApiServerSource
产生的事件。
service.yaml 示例以下:
apiVersion: serving.knative.dev/v1alpha1 kind: Service metadata: name: event-display namespace: default spec: template: spec: containers: - image: {yourrepo}/{yournamespace}/event-display:latest
执行命令:
kubectl apply --filename service.yaml
在所选命名空间下,建立 default
Broker。假如选择 default
命名空间, 执行操做以下。
kubectl label namespace default knative-eventing-injection=enabled
这里 Eventing Controller 会根据设置knative-eventing-injection=enabled
标签的 namepace, 自动建立 Broker。而且使用在webhook中默认配置的 ClusterChannelProvisioner(in-memory)。
Trigger 能够理解为 Broker 和Service 之间的过滤器,能够设置一些事件的过滤规则。这里为默认的 Broker 建立一个最简单的 Trigger,而且使用 Service 进行订阅。trigger.yaml 示例以下:
apiVersion: eventing.knative.dev/v1alpha1 kind: Trigger metadata: name: testevents-trigger namespace: default spec: subscriber: ref: apiVersion: serving.knative.dev/v1alpha1 kind: Service name: event-display
执行命令:
kubectl apply --filename trigger.yaml
注意:若是没有使用默认的 Broker, 在 Trigger 中能够经过 spec.broker 指定 Broker 名称。
执行以下命令,生成 k8s events。
kubectl run busybox --image=busybox --restart=Never -- ls kubectl delete pod busybox
能够经过下述方式查看 Knative Service 是否接收到事件。
kubectl get pods kubectl logs -l serving.knative.dev/service=event-display -c user-container
日志输出相似下面,说明已经成功接收事件。
Hello World: ️ CloudEvent: valid Context Attributes, SpecVersion: 0.2 Type: dev.knative.apiserver.resource.add Source: https://10.39.240.1:443 ID: 716d4536-3b92-4fbb-98d9-14bfcf94683f Time: 2019-05-10T23:27:06.695575294Z ContentType: application/json Extensions: knativehistory: default-broker-b7k2p-channel-z7mqq.default.svc.cluster.local subject: /apis/v1/namespaces/default/events/busybox.159d7608e3a3572c Transport Context, URI: / Host: auto-event-display.default.svc.cluster.local Method: POST Data, { "apiVersion": "v1", "count": 1, "eventTime": null, "firstTimestamp": "2019-05-10T23:27:06Z", "involvedObject": { "apiVersion": "v1", "fieldPath": "spec.containers{busybox}", "kind": "Pod", "name": "busybox", "namespace": "default", "resourceVersion": "28987493", "uid": "1efb342a-737b-11e9-a6c5-42010a8a00ed" }, "kind": "Event", "lastTimestamp": "2019-05-10T23:27:06Z", "message": "Started container", "metadata": { "creationTimestamp": "2019-05-10T23:27:06Z", "name": "busybox.159d7608e3a3572c", "namespace": "default", "resourceVersion": "506088", "selfLink": "/api/v1/namespaces/default/events/busybox.159d7608e3a3572c", "uid": "2005af47-737b-11e9-a6c5-42010a8a00ed" }, "reason": "Started", "reportingComponent": "", "reportingInstance": "", "source": { "component": "kubelet", "host": "gke-knative-auto-cluster-default-pool-23c23c4f-xdj0" }, "type": "Normal" }
相信经过上面的例子你已经了解了 Knative Eventing 如何产生事件、处理事件以及消费事件。对 Eventing 中的事件处理模型也有了初步的了解。固然你能够本身定义一个事件消费服务,来处理事件。
你是否已对 Knative Eventing 产生了兴趣,是否有点意犹未尽的样子? 别急,接下来咱们会继续深刻分析 Knative Eventing。包括:
让咱们后续一块儿探索 Knative Eventing,欢迎持续关注。
本文做者:jessie筱姜
本文为云栖社区原创内容,未经容许不得转载。