JobSink,在事件发生时触发长时间运行的后台 Job¶
通常,与 Knative Service 结合的事件处理被期望在相对较短的时间内(几分钟)完成,因为它需要保持 HTTP 连接打开,否则服务会被缩减。
保持长时间运行的连接会增加失败的可能性,因此处理需要重试,因为请求会被重试。
这个限制并不理想,JobSink 是一个可用于创建长时间运行的异步 Job 和任务的资源。
JobSink 支持完整的 Kubernetes batch/v1 Job 资源和功能以及 Kubernetes Job 排队系统,如 Kueue。
先决条件¶
您必须有权访问已安装 Knative Eventing 的 Kubernetes 集群。
用法¶
当事件发送到 JobSink 时,Eventing 会创建一个 Job,并将收到的事件作为 JSON 文件挂载到 /etc/jobsink-event/event。
- 创建
JobSinkapiVersion: sinks.knative.dev/v1alpha1 kind: JobSink metadata: name: job-sink-logger spec: job: spec: completions: 1 parallelism: 1 template: spec: restartPolicy: Never containers: - name: main image: docker.io/library/bash:5 command: [ "cat" ] args: - "/etc/jobsink-event/event" - 应用
JobSink资源kubectl apply -f <job-sink-file.yaml> - 验证
JobSink是否就绪示例输出kubectl get jobsinks.sinks.knative.devNAME URL AGE READY REASON job-sink-logger http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger 5s True - 触发
JobSinkkubectl run curl --image=curlimages/curl --rm=true --restart=Never -ti -- -X POST -v \ -H "content-type: application/json" \ -H "ce-specversion: 1.0" \ -H "ce-source: my/curl/command" \ -H "ce-type: my.demo.event" \ -H "ce-id: 123" \ -d '{"details":"JobSinkDemo"}' \ http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger - 验证
Job是否已创建并打印事件示例输出kubectl logs job-sink-loggerszoi6-dqbtq{"specversion":"1.0","id":"123","source":"my/curl/command","type":"my.demo.event","datacontenttype":"application/json","data":{"details":"JobSinkDemo"}}
JobSink 的幂等性¶
JobSink 会为每一个不同的收到的事件创建一个 Job。
事件由事件的 source 和 id 属性的组合唯一标识。
如果收到具有相同 source 和 id 属性的事件,并且已经存在 Job,则不会创建另一个 Job。
读取事件文件¶
您可以使用任何 CloudEvents JSON 反序列化器读取文件并进行反序列化。
例如,以下代码片段使用 CloudEvents Go SDK 读取事件并进行处理。
package mytask
import (
"encoding/json"
"fmt"
"os"
cloudevents "github.com/cloudevents/sdk-go/v2"
)
func handleEvent() error {
eventBytes, err := os.ReadFile("/etc/jobsink-event/event")
if err != nil {
return err
}
event := &cloudevents.Event{}
if err := json.Unmarshal(eventBytes, event); err != nil {
return err
}
// Process event ...
fmt.Println(event)
return nil
}
从不同的事件源触发 Job¶
例如,您可以使用 KafkaSource 在 Kafka 记录发送到 Kafka 主题时触发一个 Job
apiVersion: sources.knative.dev/v1
kind: KafkaSource
metadata:
name: kafka-source
spec:
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
topics:
- knative-demo-topic
sink:
ref:
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
name: job-sink-logger
或者当 Knative Broker 收到一个事件时使用 Trigger
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: my-job-sink-trigger
spec:
broker: my-broker
filter:
attributes:
type: dev.knative.foo.bar
myextension: my-extension-value
subscriber:
ref:
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
name: job-sink-logger
甚至作为 Knative Broker 的死信接收器
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: my-broker
spec:
# ...
delivery:
deadLetterSink:
ref:
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
name: job-sink-logger
retry: 5
backoffPolicy: exponential
backoffDelay: "PT1S"
自定义事件文件目录¶
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
name: job-sink-custom-mount-path
spec:
job:
spec:
completions: 1
parallelism: 1
template:
spec:
restartPolicy: Never
containers:
- name: main
image: docker.io/library/bash:5
command: [ "bash" ]
args:
- -c
- echo "Hello world!" && sleep 5
# The event will be available in a file at `/etc/custom-path/event`
volumeMounts:
- name: "jobsink-event"
mountPath: "/etc/custom-path"
readOnly: true
清理已完成的 Job¶
要清理已完成的 Job,您可以设置 spec.job.spec.ttlSecondsAfterFinished: 600 字段,Kubernetes 将在 600 秒(10 分钟)后删除已完成的 Job。
JobSink 示例¶
JobSink 成功示例¶
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
name: job-sink-success
spec:
job:
metadata:
labels:
my-label: my-value
spec:
completions: 12
parallelism: 3
template:
spec:
restartPolicy: Never
containers:
- name: main
image: docker.io/library/bash:5
command: [ "bash" ]
args:
- -c
- echo "Hello world!" && sleep 5
backoffLimit: 6
podFailurePolicy:
rules:
- action: FailJob
onExitCodes:
containerName: main # optional
operator: In # one of: In, NotIn
values: [ 42 ]
- action: Ignore # one of: Ignore, FailJob, Count
onPodConditions:
- type: DisruptionTarget # indicates Pod disruption
JobSink 失败示例¶
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
name: job-sink-failure
spec:
job:
metadata:
labels:
my-label: my-value
spec:
completions: 12
parallelism: 3
template:
spec:
restartPolicy: Never
containers:
- name: main
image: docker.io/library/bash:5
command: [ "bash" ] # example command simulating a bug which triggers the FailJob action
args:
- -c
- echo "Hello world!" && sleep 5 && exit 42
backoffLimit: 6
podFailurePolicy:
rules:
- action: FailJob
onExitCodes:
containerName: main # optional
operator: In # one of: In, NotIn
values: [ 42 ]
- action: Ignore # one of: Ignore, FailJob, Count
onPodConditions:
- type: DisruptionTarget # indicates Pod disruption